[Elasticsearch] Collaborate Apache Tajo + Available SQL on Elasticsearch
Apache Tajo & Elasticsearch
- Collaborate Apache Tajo + Elasticsearch
Apache Tajo User Group
Apache Tajo Mailing List
Elasticsearch User Group
Master Branch Build Environment
- JDK 6 or later
- Elasticsearch 1.1.2
tajo-es-1.5.2 Branch Build Environment
- JDK 7 or later
- Elasticsearch 1.5.2
tajo-es-1.1.2 Branch Build Environment
- JDK 6 or later
- Elasticsearch 1.1.2
- Hadoop 2.3.0 or higher (up to 2.5.1)
- Java 1.6 or 1.7
- Protocol buffer 2.5.0
- Go to Link
Source Clone & Build
$ git clone https://github.com/gruter/tajo-elasticsearch.git
$ cd tajo-elasticsearch
$ mvn clean package -DskipTests -Pdist -Dtar
$ cd tajo-dist/target/
$ ls -al tajo-0.*.tar.gz
-rw-r--r-- 1 hwjeong staff 59521544 5 14 13:59 tajo-0.11.0-SNAPSHOT.tar.gz
Apache Tajo Installation
$ cd ~/server/app
$ mkdir tajo
$ cd tajo
$ cp ~/git/tajo-elasticsearch/tajo-dist/target/tajo-0.11.0-SNAPSHOT.tar.gz .
$ tar -xvzf tajo-0.11.0-SNAPSHOT.tar.gz
$ ln -s tajo-0.11.0-SNAPSHOT tajo
$ cd tajo
$ vi conf/tajo-env.sh
export HADOOP_HOME=/Users/hwjeong/server/app/hadoop-2.3.0
export JAVA_HOME=`/usr/libexec/java_home -v 1.7`
Apache Tajo Worker - ssh keygen
$ cd ~/.ssh
$ ssh-keygen -t rsa
$ cat id_rsa.pub > authorized_keys
$ chmod 600 authorized_keys
Apache Tajo Run & Sample Data
$ cd ~/server/app/tajo/tajo
$ bin/start-tajo.sh
$ cat > data.csv
Hadoop Run & Make User Directory
$ cd ~/server/app/hadoop-2.3.0
$ sbin/start-all.sh
$ bin/hadoop fs -ls /
$ bin/hadoop fs -mkdir /user/tajo
$ bin/hadoop fs -chown hwjeong /user/tajo
$ bin/hadoop fs -ls /user
drwxr-xr-x - hwjeong supergroup 0 2015-05-14 14:42 /user/tajo
$ bin/hadoop fs -moveFromLocal ~/server/app/tajo/tajo/data.csv /user/tajo/
Apache Tajo CLI
$ cd ~/server/app/tajo/tajo
$ bin/tsql
default> create external table tajodemotbl (id int, name text, score float, type text) using csv with ('csvfile.delimiter'='|') location 'hdfs://localhost:9000/user/tajo/data.csv';
default> \d tajodemotbl;
table name: default.tajodemotbl
table path: hdfs://localhost:9000/user/tajo/data.csv
store type: csv
number of rows: unknown
volume: 60 B
id INT4
name TEXT
score FLOAT4
type TEXT
default> select * from tajodemotbl where id > 2;
Progress: 0%, response time: 1.557 sec
Progress: 0%, response time: 1.558 sec
Progress: 100%, response time: 1.86 sec
id, name, score, type
3, ghi, 3.4, c
4, jkl, 4.5, d
5, mno, 5.6, e
(3 rows, 1.86 sec, 48 B selected)
Elasticsearch Installation & Run
- Go to Link
- Installed elasticsearch-1.1.2
$ cd ~/server/app/elasticsearch/elasticsearch-1.1.2
# no configuration
$ bin/elasticsearch -f
Create Index & Document
package org.gruter.elasticsearch.test;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
public class ElasticsearchCRUDTest {
private static final Logger log = LoggerFactory.getLogger(ElasticsearchCRUDTest.class);
private ImmutableSettings.Builder settings;
private Node node;
private Client client;
private int DOC_SIZE = 1000;
public void setup() throws Exception {
settings = ImmutableSettings.settingsBuilder();
settings.put("cluster.name", "elasticsearch");
node = NodeBuilder.nodeBuilder()
client = node.client();
public void testElasticsearchCRUD() throws Exception {
// delete index
try {
} catch (Exception e) {
} finally {
// create index
Settings indexSettings = ImmutableSettings.settingsBuilder()
.put("number_of_replicas", "0")
XContentBuilder builder = XContentFactory.jsonBuilder()
.field("enabled", "false")
.field("path", "field1")
.field("type", "long").field("store", "no").field("index", "not_analyzed")
.field("type", "string").field("store", "no").field("index", "not_analyzed")
.field("type", "string").field("store", "no").field("index", "analyzed")
CreateIndexResponse res = client.admin().indices().prepareCreate("tajo_es_index")
.addMapping("tajo_es_type", builder)
assertEquals(res.isAcknowledged(), true);
// add document
IndexRequestBuilder indexRequestBuilder = client.prepareIndex().setIndex("tajo_es_index").setType("tajo_es_type");
IndexResponse indexResponse;
for ( int i=0; i<DOC_SIZE; i++ ) {
builder = XContentFactory.jsonBuilder()
.field("field1", i).field("field2", "henry" + i).field("field3", i + ". hello world!! elasticsearch on apache tajo!!")
indexResponse = indexRequestBuilder.setSource(builder)
assertEquals(indexResponse.isCreated(), true);
CountRequest request = new CountRequest();
CountResponse response = client.count(request).actionGet();
long count = response.getCount();
assertEquals(count, DOC_SIZE);
public void tearDown() throws Exception {
Check Status
create external table tajo_es_index (
_type text,
_score double,
_id text,
field1 bigint,
field2 text,
field3 text
using elasticsearch
with (
Elasticsearch "with" Options
public static final String OPT_CLUSTER = "es.cluster";
public static final String OPT_NODES = "es.nodes";
public static final String OPT_INDEX = "es.index";
public static final String OPT_TYPE = "es.type";
public static final String OPT_FETCH_SIZE = "es.fetch.size";
public static final String OPT_PRIMARY_SHARD = "es.primary.shard";
public static final String OPT_REPLICA_SHARD = "es.replica.shard";
public static final String OPT_PING_TIMEOUT = "es.ping.timeout";
public static final String OPT_CONNECT_TIMEOUT = "es.connect.timeout";
public static final String OPT_THREADPOOL_RECOVERY = "es.threadpool.recovery";
public static final String OPT_THREADPOOL_BULK = "es.threadpool.bulk";
public static final String OPT_THREADPOOL_REG = "es.threadpool.reg";
public static final String OPT_TIME_SCROLL = "es.time.scroll";
public static final String OPT_TIME_ACTION = "es.time.action";
