[elasticsearch] Java API : Bulk

Elastic/Elasticsearch 2013. 9. 16. 18:34

본 문서는 개인적인 테스트와 elasticsearch.org 그리고 community 등을 참고해서 작성된 것이며,

정보 교환이 목적입니다.


잘못된 부분에 대해서는 지적 부탁 드립니다.

(예시 코드는 성능 및 보안 검증이 되지 않았습니다.)


벌크 색인 시 actionGet(TIMEOUT)  부분에 TIMEOUT 은 빼고 돌리시는게 좋습니다.

돌리시다 보면 timeout exception 이 발생 할 때가 있는데 timeout 값을 제거 하고 돌리시면 관련 에러는 없어집니다.


new OptimizeRequest()

.indices(indice)

.flush(true)

.onlyExpungeDeletes(true)

.refresh(true)

.waitForMerge(true)

.operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD)

.maxNumSegments(1))




[elasticsearch java api 리뷰]

원문 링크 : http://www.elasticsearch.org/guide/reference/java-api/bulk/


- 이 API는 한번의 요청으로 여러개의 문서를 색인 또는 삭제 할 수 있습니다.


우선 원문에 나와 있는 예제 부터 살펴 보겠습니다.


[원문예제]

BulkRequestBuilder bulkRequest = client.prepareBulk();


// either use client#prepare, or use Requests# to directly build index/delete requests

bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")

        .setSource(jsonBuilder()

                    .startObject()

                        .field("user", "kimchy")

                        .field("postDate", new Date())

                        .field("message", "trying out Elastic Search")

                    .endObject()

                  )

        );


bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")

        .setSource(jsonBuilder()

                    .startObject()

                        .field("user", "kimchy")

                        .field("postDate", new Date())

                        .field("message", "another post")

                    .endObject()

                  )

        );

        

BulkResponse bulkResponse = bulkRequest.execute().actionGet();

if (bulkResponse.hasFailures()) {

    // process failures by iterating through each bulk response item

}


- 기본 client.prepareIndex() 를 이용해서 문서 하나를 색인 요청 하는 것과 유사 합니다.

- 다만, bulkRequest 의 경우 이런 하나의 문서를 여러개 add(document) 해서 한번에 요청을 한다는 것입니다.

- index name : twitter

- index type : tweet

- document id (_id) : 1 과 2

- document id 를 지정 하지 않을 경우 자동으로 생성을 하게 됩니다.


아래 예제는 글 http://jjeong.tistory.com/792 에서 생성한 index 에 bulkRequest 를 보내는 것입니다.


[간단한 Bulk Request 예제]

BulkRequestBuilder bulkRequest;

BulkResponse bulkResponse;

String ts;

bulkRequest = client.prepareBulk();

int i=0;

for (i=0; i<100000; i++) {

ts = String.valueOf(System.currentTimeMillis());


bulkRequest.add(client.prepareIndex("facebook", "facebook_post")

.setRouting(ts)

.setOperationThreaded(false)

.setSource(jsonBuilder()

.startObject()

.field("docid", ts)

.field("title", "document "+String.valueOf(i))

.endObject()

)

)

.setReplicationType(ReplicationType.ASYNC)

.setConsistencyLevel(WriteConsistencyLevel.QUORUM)

.setRefresh(false);

if ( (i%100) == 0 ) {

bulkRequest.execute().actionGet(5000);

bulkRequest = client.prepareBulk();

log.debug("DOCUMENT Sequence : {}", i);

}

}


bulkRequest.execute().actionGet(5000);

log.debug("DOCUMENT Sequence : {}", i);

- 코드를 보시면 아시겠지만 문서 구조는 docid, title 로 아주 심플합니다.

- docid 에 timestamp를 넣고 document 에는 문자열 "document $SEQ"가 들어 갑니다. 

- 문서를 100개 단위로 묶어서 request 하게 됩니다.


[Bulk Request 고려사항]

- threadpool 세팅을 검토해 보셔야 합니다.

http://www.elasticsearch.org/guide/reference/modules/threadpool/


- refresh 설정을 검토해 보셔야 합니다.

disable 또는 10초 이상 설정


- index merge segment 크기를 조정해야 합니다.

20 또는 30 정도


- threadsafe 하지 않다는 걸 인지해야 합니다.

아직까지는 bulk request 시에 multithread 지원이 되지 않습니다.


- async 방식을 추천 합니다.


: