제목 쓰기가 참 어렵내요. logstash를 많이 사용해 보신 분들은 다들 잘 아실 것 같습니다.
저 같은 경우 output 은 거의 elasticsearch를 사용하고 있기 때문에 개발 시 디버깅을 위해서 아래 설정을 많이 이용합니다.
[디버그용 설정] input { http { port => 8080 codec => json_lines {} } }
output { stdout { codec => rubydebug } }
[디버그용 로그 스태쉬 실행] $ bin/logstash -e "input { http { port => 8080 codec => json_lines {} } } output { stdout { codec => rubydebug } }"
잘 아시겠지만 output 은 여러개를 지정해 줄 수 있습니다. 실제 elasticsearch로도 색인을 하고 싶으시다면 아래와 같이 하시면 되겠죠.
[ouput - elasticsearch + stdout] output { elasticsearch { cluster => "xxxxxx" bind_host => "localhost" bind_port => "9300" protocol => "transport" index => "logstash-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } }
elasticsearch 를 주 업으로 삼으면서 매일 보는 페이지들 입니다.
https://www.elastic.co/blog https://github.com/elastic/elasticsearch/issues https://www.elastic.co/guide/en/elasticsearch/resiliency/current/index.html
아주 유용한 정보들이 많이 들어 있고 어떻게 발전해 가는지도 확인이 가능 합니다.
작성하고 보니 최신 내용을 반영할걸 이라는 후회가 밀려 오내요. 이전 글 http://jjeong.tistory.com/1034 이거는 사실 deprecated 예정입니다. 코드에도 명시되어 있어서.. ^^;;
아래 코드로 변경 합니다. 참고하세요.
curl -XPUT localhost:9200/_cluster/settings -d '{ "persistent" : { "cluster.routing.allocation.enable" : "primaries", "cluster.routing.allocation.allow_rebalance" : "indices_all_active", "cluster.routing.allocation.cluster_concurrent_rebalance" : 2, "cluster.routing.allocation.node_initial_primaries_recoveries" : 4, "cluster.routing.allocation.node_concurrent_recoveries" : 2 } }' ※ 위 값들은 default 값입니다. "primaries" 만 빼고요 ^^
상세 내용을 확인 하고 싶으신 분들은 아래 문서를 참고하시길 바랍니다.
그냥 넘어 갈라고 했지만...
▷ cluster.routing.allocation.allow_rebalance 설명에 있지만 이건 shard rebalance에 대한 설정 입니다. 아래 값들에 대한 조건일 때 rebalance 하겠다는 것으로 이해 하시면 됩니다. always : 항상 할 거다. indices_primaries_active : primary shard 가 올라 오면 할 거다. indices_all_active : 전체 shard 가 올라 오면 할 거다.
당연히 indices_all_active 가 default 설정이 되겠죠.
▷ cluster.routing.allocation.cluster_concurrent_rebalance 클러스터를 통틀어서 동시에 rebalance 할 shard 수를 제한 하는 설정 입니다. 기본은 2개로 설정 되어 있습니다.
▷ cluster.routing.allocation.node_initial_primaries_recoveries 각 노드 당 primary shard recovery에 대한 초기 크기를 설정 하는 것입니다. 기본은 4개로 설정 되어 있습니다.
▷ cluster.routing.allocation.node_concurrent_recoveries 노드에서 동시에 수행할 recovery 크기를 설정 하는 것입니다.
▷ cluster.routing.allocation.enable 이게 이전에 deprecated 된 설정의 확장인데요. (위에 설정도 포함 이지만..) shard 종류에 따라 어떻게 처리 할 것인지를 결정 하게 합니다. all : 몽땅 다 하겠다. primaries : primary shard 만 하겠다. new_primaries : 새로 생성한 인덱스의 primary shard 만 하겠다. none : 몽땅 다 안하겠다.
딱 보면 disable_allocation:true = primaries + none 정도로 보면 쉽쥬. 반대는 disable_allocation:false = all 정도로 보면 쉽쥬.
매번 작성 하기 귀찮아서 그냥 남겨 봅니다.
[shard allocation disable] curl -XPUT localhost:9200/_cluster/settings -d '{ "persistent" : { "cluster.routing.allocation.disable_allocation" : true } }'
[모든 종류의 shard allocation disable] curl -XPUT localhost:9200/_cluster/settings -d '{ "transient" : { "cluster.routing.allocation.enable" : "none" } }'
[shard allocation enable] curl -XPUT localhost:9200/_cluster/settings -d '{ "persistent" : { "cluster.routing.allocation.disable_allocation" : false } }'
[모든 종류의 shard allocation enable] curl -XPUT localhost:9200/_cluster/settings -d '{ "transient" : { "cluster.routing.allocation.enable" : "all" } }'
[모든 인덱스의 replica shard disable] curl -XPUT localhost:9200/*/_settings -d '{"number_of_replicas":0}'
또 까먹을까봐 작성해 봅니다. NodeInfo 에서 제공하고 있는 IP 필드에 대해서는 임의 수정이 가능 하지 않습니다. 이유는 데몬 실행 시 내부적으로 노드의 정보를 읽어서 구성을 해주기 때문인데요.
관련 코드는 아래 두 개의 클래스를 보시면 됩니다.
- DiscoveryNode.java - NetworkUtils.java
[Code snippet] private final static InetAddress localAddress;
static { InetAddress localAddressX; try { localAddressX = InetAddress.getLocalHost(); } catch (Throwable e) { logger.warn("failed to resolve local host, fallback to loopback", e); localAddressX = InetAddress.getLoopbackAddress(); } localAddress = localAddressX; }
6월에 바쁘다는 핑계로 글을 하나도 못 올렸내요. 그런 의미에서 하나 올려 볼까 합니다.
오늘의 주제는 Elasticsearch에서 하나의 Index에 여러개의 Type을 생성 사용할 경우 주의할 점입니다.
Elasticsearch 와 RDB 와는 자주 비교가 됩니다. 개념을 쉽게 잡아 주기 위해서 인데요.
가볍게 다시 한번 비교해 보고 넘어 가겠습니다.
Elasticsearch |
Index |
Database |
Type |
Table |
Mapping |
Schema |
Document |
Row |
Field |
Column |
※ 더 있지만 이 정도로 정리 하겠습니다.
이제 오늘의 주제 입니다. Elasticsearch에서는 하나의 Index 생성 시 여러개의 Type을 생성 할 수 있습니다. 동일하게, RDB 도 하나의 Database에 여러개의 Table을 생성 할 수 있습니다.
하지만 여기서 Elasticsearch의 Type과 Database의 Table 사이에는 조금 다른 점이 있는데요. RDB 에서 Table 간 Column은 이름이 같더라도 데이터 형이나 인덱스 유형등에 대해서 서로 독립적으로 사용이 됩니다. 하지만 Elasticsearch의 Type 간 Field는 이름이 같게 되면 데이터 형이나 인덱스 유형등도 같아야 한다는 것입니다.
이유는 내부적으로 Lucene 에서는 Type 간 같은 이름의 Field는 하나의 Field 로 사용이 되기 때문입니다. 단, Type 간 Field 이름이 다르다면 이건 문제가 되지 않습니다. (당연한 이야기 겠죠.)
예) IndexA - Type1, Type2 가 있다고 가정하겠습니다. Type1 에 geo 라는 field 가 있고 데이터 형이 geo_point 라고 가정 하겠습니다. Type2 에도 geo 라는 field 가 있고 데이터 형이 string 이라고 가정을 하면 어떻게 될까요?
"mappings": { "type1": { "properties": { "geo": { "type": "geo_point" } } }, "type2": { "properties": { "geo": { "type": "string" } } } }
이 경우는 에러가 발생을 합니다. 같은 필드에 서로 다른 데이터 형을 선언 했기 때문인데요. 이렇게 사용 하시면 안됩니다.
정확한 사용은 서로 다른 type 에 같은 field 가 있다면 데이터 형도 동일하게 선언을 해주셔야 합니다.
"mappings": { "type1": { "properties": { "geo": { "type": "geo_point" } } }, "type2": { "properties": { "geo": { "type": "geo_point" } } } }
※ 데이터 형만을 가지고 설명을 드렸지만 기타 다른 옵션도 동일하게 선언하셔서 사용하시길 추천 드립니다.
참고 문서) https://www.elastic.co/guide/en/elasticsearch/guide/current/_limiting_memory_usage.html#circuit-breaker https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules-fielddata.html#fielddata-circuit-breaker https://www.elastic.co/guide/en/elasticsearch/guide/current/_monitoring_individual_nodes.html#_circuit_breaker https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-update-settings.html#_field_data_circuit_breaker https://www.elastic.co/guide/en/elasticsearch/resiliency/current/index.html#_circuit_breaker_fielddata_status_done_v1_0_0
결국 하나의 문서만 보셔도 됩니다. https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules-fielddata.html
[Circuit Breaker 요약] - fielddata cache 와 연관된 기능으로 OOM에 대한 대응 방법으로 제공하고 있습니다. - circuit breaker limit size는 cache size 보다 커야 합니다. - circuit breaker는 query에 필요로 하는 memory 크기를 예측/평가 하여 사전에 OOM 문제를 경험하지 않도록 해줍니다. 즉, 요청한 Query를 중시 시킵니다.
[Geek Igor] http://igor.kupczynski.info/2015/04/06/fielddata.html
Bulk UDP has been deprecated and will be removed in 2.0.
The More Like This Field query has been deprecated in favor of the More Like This Query restrained set to a specific field . It will be removed in 2.0.
- The
text query has been removed. Use the match query instead. - The
field query has been removed. Use the query_string query instead. - Per-document boosting with the
_boost field has been removed. You can use thefunction_score instead. - The
path parameter in mappings has been deprecated. Use the copy_to parameter instead. - The
custom_score and custom_boost_score is no longer supported. You can use function_score instead.
원본링크) https://github.com/gruter/tajo-elasticsearch/blob/master/README.KR.md
Apache Tajo & Elasticsearch- Collaborate Apache Tajo + Elasticsearch
- Apache Tajo의 External Storage로 구현된 내용입니다.
- 설치 가이드
Software Stack- 스택이랄 것까지는 없지만 기본은 Apache Tajo + Elasticsearch 입니다.
- 각 open source 별 패키지 종속은 갖기 때문에 사용전에 확인 하시고 필요한 것들을 설치해 주셔야 합니다.
- 기본적으로 소스 받으셔서 빌드 하신 후 설치 사용하시면 됩니다.
Apache Tajo- Hadoop 2.3.0 or higher (up to 2.5.1)
- Java 1.6 or 1.7
- Protocol buffer 2.5.0
Elasticsearch- 버전별로 JDK 종속을 갖습니다.
- 1.2 이상 부터는 JDK 1.7 이상
- 1.1 이하 부터는 JDK 1.6
동작방식- Apache Tajo에 external table로 생성해서 Elasticsearch로 질의하는 방식 입니다.
- 현재 구현된 기능은 아래 두 가지 입니다.
- Meta 정보를 Tajo에서 저장하고 있고 실제 데이터는 Elasticsearch에 위치하게 됩니다.
- SQL 질의 시 Tajo에서 Elasticsearch로 QueryDSL로 변환된 질의를 수행하여 데이터를 가져오게 됩니다.
- 이렇게 획득한 데이터를 WHERE 조건에 맞게 Selection 해서 결과를 리턴하게 됩니다.
어디에 사용하면 될까요?- Elasticsearch에서 JOIN 사용에 대한 아쉬움이 있으셨던 분들
- HDFS에 저장된 데이터와 함께 분석 또는 질의에 대한 요구가 있으신 분들
- HDFS 데이터에 대한 중간 결과를 Elasticsearch로 저장해서 활용하고 싶으셨던 분들
- 검색엔진은 잘 모르겠고 그냥 SQL만 아시는 분들
JDBC Driver 사용은 가능 한가요?- Apache Tajo의 JDBC Driver를 이용해서 사용하시면 됩니다.
사용 시 주의사항- 현재 QUAL에 대한 조건이 내려오지 않아 Full Scan 하기 때문에 실시간 서비스용으로는 적합하지 않습니다.
- Batch 또는 관리/분석 도구에서 사용하는 용도로 쓰시기 바랍니다.
- QUAL을 내려 주는 기능은 Apache Tajo 팀에서 현재 구현중에 있어 완료 되면 반영할 예정입니다.
문의- 요청사항이나 개선요구사항이 있으신 분들은 메일이나 이슈 등록해 주시면 최대한 반영해 보겠습니다.
원본 링크) https://github.com/gruter/tajo-elasticsearch
Apache Tajo & Elasticsearch- Collaborate Apache Tajo + Elasticsearch
Apache Tajo User GroupApache Tajo Mailing ListElasticsearch User GroupRegistererMaster Branch Build Environment- JDK 6 or later
- Elasticsearch 1.1.2
tajo-es-1.5.2 Branch Build Environment- JDK 7 or later
- Elasticsearch 1.5.2
tajo-es-1.1.2 Branch Build Environment- JDK 6 or later
- Elasticsearch 1.1.2
HADOOP$ cd ~/server/app/hadoop-2.3.0
Prerequisites- Hadoop 2.3.0 or higher (up to 2.5.1)
- Java 1.6 or 1.7
- Protocol buffer 2.5.0
- Go to Link
Source Clone & Build$ git clone https://github.com/gruter/tajo-elasticsearch.git
$ cd tajo-elasticsearch
$ mvn clean package -DskipTests -Pdist -Dtar
$ cd tajo-dist/target/
$ ls -al tajo-0.*.tar.gz
-rw-r--r-- 1 hwjeong staff 59521544 5 14 13:59 tajo-0.11.0-SNAPSHOT.tar.gz
Apache Tajo Installation$ cd ~/server/app
$ mkdir tajo
$ cd tajo
$ cp ~/git/tajo-elasticsearch/tajo-dist/target/tajo-0.11.0-SNAPSHOT.tar.gz .
$ tar -xvzf tajo-0.11.0-SNAPSHOT.tar.gz
$ ln -s tajo-0.11.0-SNAPSHOT tajo
$ cd tajo
$ vi conf/tajo-env.sh
export HADOOP_HOME=/Users/hwjeong/server/app/hadoop-2.3.0
export JAVA_HOME=`/usr/libexec/java_home -v 1.7`
Apache Tajo Worker - ssh keygen$ cd ~/.ssh
$ ssh-keygen -t rsa
$ cat id_rsa.pub > authorized_keys
$ chmod 600 authorized_keys
Apache Tajo Run & Sample Data$ cd ~/server/app/tajo/tajo
$ bin/start-tajo.sh
$ cat > data.csv
Hadoop Run & Make User Directory$ cd ~/server/app/hadoop-2.3.0
$ sbin/start-all.sh
$ bin/hadoop fs -ls /
$ bin/hadoop fs -mkdir /user/tajo
$ bin/hadoop fs -chown hwjeong /user/tajo
$ bin/hadoop fs -ls /user
drwxr-xr-x - hwjeong supergroup 0 2015-05-14 14:42 /user/tajo
$ bin/hadoop fs -moveFromLocal ~/server/app/tajo/tajo/data.csv /user/tajo/
Apache Tajo CLI$ cd ~/server/app/tajo/tajo
$ bin/tsql
default> create external table tajodemotbl (id int, name text, score float, type text) using csv with ('csvfile.delimiter'='|') location 'hdfs://localhost:9000/user/tajo/data.csv';
default> \d tajodemotbl;
table name: default.tajodemotbl
table path: hdfs://localhost:9000/user/tajo/data.csv
store type: csv
number of rows: unknown
volume: 60 B
id INT4
name TEXT
score FLOAT4
type TEXT
default> select * from tajodemotbl where id > 2;
Progress: 0%, response time: 1.557 sec
Progress: 0%, response time: 1.558 sec
Progress: 100%, response time: 1.86 sec
id, name, score, type
3, ghi, 3.4, c
4, jkl, 4.5, d
5, mno, 5.6, e
(3 rows, 1.86 sec, 48 B selected)
Elasticsearch Installation & Run$ cd ~/server/app/elasticsearch/elasticsearch-1.1.2
# no configuration
$ bin/elasticsearch -f
Create Index & Documentpackage org.gruter.elasticsearch.test;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
public class ElasticsearchCRUDTest {
private static final Logger log = LoggerFactory.getLogger(ElasticsearchCRUDTest.class);
private ImmutableSettings.Builder settings;
private Node node;
private Client client;
private int DOC_SIZE = 1000;
public void setup() throws Exception {
settings = ImmutableSettings.settingsBuilder();
settings.put("cluster.name", "elasticsearch");
node = NodeBuilder.nodeBuilder()
client = node.client();
public void testElasticsearchCRUD() throws Exception {
// delete index
try {
} catch (Exception e) {
} finally {
// create index
Settings indexSettings = ImmutableSettings.settingsBuilder()
.put("number_of_replicas", "0")
XContentBuilder builder = XContentFactory.jsonBuilder()
.field("enabled", "false")
.field("path", "field1")
.field("type", "long").field("store", "no").field("index", "not_analyzed")
.field("type", "string").field("store", "no").field("index", "not_analyzed")
.field("type", "string").field("store", "no").field("index", "analyzed")
CreateIndexResponse res = client.admin().indices().prepareCreate("tajo_es_index")
.addMapping("tajo_es_type", builder)
assertEquals(res.isAcknowledged(), true);
// add document
IndexRequestBuilder indexRequestBuilder = client.prepareIndex().setIndex("tajo_es_index").setType("tajo_es_type");
IndexResponse indexResponse;
for ( int i=0; i<DOC_SIZE; i++ ) {
builder = XContentFactory.jsonBuilder()
.field("field1", i).field("field2", "henry" + i).field("field3", i + ". hello world!! elasticsearch on apache tajo!!")
indexResponse = indexRequestBuilder.setSource(builder)
assertEquals(indexResponse.isCreated(), true);
CountRequest request = new CountRequest();
CountResponse response = client.count(request).actionGet();
long count = response.getCount();
assertEquals(count, DOC_SIZE);
public void tearDown() throws Exception {
Check StatusCreate External Table for Elasticsearch on Tajo and Test Querycreate external table tajo_es_index (
_type text,
_score double,
_id text,
field1 bigint,
field2 text,
field3 text
using elasticsearch
with (
$ cd ~/server/app/tajo/tajo
$ bin/tsql
Try \? for help.
default> create external table tajo_es_index (
> _type text,
> _score double,
> _id text,
> field1 bigint,
> field2 text,
> field3 text
> )
> using elasticsearch
> with (
> 'es.index'='tajo_es_index',
> 'es.type'='tajo_es_type'
> );
default> select count(*) from tajo_es_index;
Progress: 0%, response time: 1.397 sec
Progress: 0%, response time: 1.398 sec
Progress: 0%, response time: 1.802 sec
Progress: 100%, response time: 1.808 sec
(1 rows, 1.808 sec, 5 B selected)
default> select * from tajo_es_index where field1 > 10 and field1 < 15;
Progress: 100%, response time: 0.583 sec
_type, _score, _id, field1, field2, field3
tajo_es_type, 0.0, 11, 11, henry11, 11. hello world!! elasticsearch on apache tajo!!
tajo_es_type, 0.0, 12, 12, henry12, 12. hello world!! elasticsearch on apache tajo!!
tajo_es_type, 0.0, 13, 13, henry13, 13. hello world!! elasticsearch on apache tajo!!
tajo_es_type, 0.0, 14, 14, henry14, 14. hello world!! elasticsearch on apache tajo!!
(4 rows, 0.583 sec, 320 B selected)
Elasticsearch "with" Options public static final String OPT_CLUSTER = "es.cluster";
public static final String OPT_NODES = "es.nodes";
public static final String OPT_INDEX = "es.index";
public static final String OPT_TYPE = "es.type";
public static final String OPT_FETCH_SIZE = "es.fetch.size";
public static final String OPT_PRIMARY_SHARD = "es.primary.shard";
public static final String OPT_REPLICA_SHARD = "es.replica.shard";
public static final String OPT_PING_TIMEOUT = "es.ping.timeout";
public static final String OPT_CONNECT_TIMEOUT = "es.connect.timeout";
public static final String OPT_THREADPOOL_RECOVERY = "es.threadpool.recovery";
public static final String OPT_THREADPOOL_BULK = "es.threadpool.bulk";
public static final String OPT_THREADPOOL_REG = "es.threadpool.reg";
public static final String OPT_TIME_SCROLL = "es.time.scroll";
public static final String OPT_TIME_ACTION = "es.time.action";