'Bulk'에 해당되는 글 9건

  1. 2022.09.20 [Elasticsearch] Elasticsearch ES-Hadoop 내 Spark Bulk Request Error Handler 1
  2. 2021.07.05 [Elasticsearch] Bulk Request by Restful API
  3. 2017.12.06 [Elasticsearch] Refresh interval while bulk request 2
  4. 2016.07.22 [Elasticsearch] Transport Bulk to Rest Bulk data format 변환
  5. 2015.11.30 [Elasticsearch - The Definitive Guide] bulk-format
  6. 2014.07.15 [ElasticSearch] 색인 성능..
  7. 2013.09.17 [Elasticsearch] Bulk Indexing 후 Data Consistency 관리.
  8. 2013.09.16 [elasticsearch] Java API : Bulk
  9. 2013.08.14 [Elasticsearch] json bulk insert.

[Elasticsearch] Elasticsearch ES-Hadoop 내 Spark Bulk Request Error Handler

Elastic/Elasticsearch 2022. 9. 20. 12:35

Spark 을 이용해서 Elasticsearch 로 Bulk Request 를 사용할 경우 내부에서는 BulkProcessor 를 이용해서 요청을 하게 됩니다.
보통 BulkRequest 사용 시 색인 요청한 문서중 일부 오류가 발생 하는 경우 전체 문서 색인이 실패 하는 것이 아닌 오류 문서만 색인이 안되고, 나머지 문서들은 색인이 완료 되는데요. 
ES-Hadoop 내 Spark 의 경우 기본 Error Handler 가 AbortOnFailure 라서 요청한 모든 문서가 색인이 실패 하게 됩니다.
이를 변경 하기 위해서는 Error Handler 설정을 아래와 같이 변경하고 사용 하시면 됩니다.

[참고문서]
https://www.elastic.co/guide/en/elasticsearch/hadoop/8.4/errorhandlers.html#errorhandlers-bulk-use

[설정]

example)
es.write.data.error.handlers = log

SparkSession.builder().appName("...").config("es.write.rest.error.handlers", "log");


[참고코드]

// BulkWriteHandlerLoader / BulkWriteErrorHandler

@Override
protected IBulkWriteErrorHandler loadBuiltInHandler(AbstractHandlerLoader.NamedHandlers handlerName) {
    ErrorHandler<BulkWriteFailure, byte[], DelayableErrorCollector<byte[]>> genericHandler;
    switch (handlerName) {
        case FAIL:
            genericHandler = AbortOnFailure.create();
            break;
        case LOG:
            genericHandler = DropAndLog.create(new BulkLogRenderer());
            break;
        case ES:
            genericHandler = ElasticsearchHandler.create(getSettings(), new BulkErrorEventConverter());
            break;
        default:
            throw new EsHadoopIllegalArgumentException(
                    "Could not find default implementation for built in handler type [" + handlerName + "]"
            );
    }
    return new DelegatingErrorHandler(genericHandler);
}

기본적으로는 BulkRequest 와 BulkProcessor 에 대해서 문서를 찾아 보시면 도움이 되실 것 같습니다.

 

Thanks, 캉테

:

[Elasticsearch] Bulk Request by Restful API

Elastic/Elasticsearch 2021. 7. 5. 15:41

공홈 문서 보시면 됩니다.

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

 

워낙에 제가 예전에 작성해 놓은 글이 있어서 그냥 최신 버전으로 remind 합니다.

$ curl -s -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/_bulk 
  --data-binary "@ATC.json"

 

ATC.json)

{"index": { "_index": "atc", "_id": "1"}}
{"chosung": "ㄴㅇㅋ", "chosung_eng": "nike", "jamo_kor": "ㄴㅏㅇㅣㅋㅣ", "jamo_eng": "skdlzl", "item_kor": "나이키", "item_eng": "nike"}

 

:

[Elasticsearch] Refresh interval while bulk request

Elastic/Elasticsearch 2017. 12. 6. 14:00

작업 하면서 이상한 현상이 발생을 해서 분석 하다 보니 누구나 경험 할 수 있는 것 같아 올려 봅니다.


참고문서)

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html


보통 bulk request 하기 전에 아래 설정을 적용하게 됩니다.

index.refresh_interval: "-1"

이 설정은 해당 index 의 settings 에서 설정 합니다.


이 설정을 하게 되면 bulk request 시 refresh action 을 수행 하지 않게 되는데요.

제가 경험한 현상은 disable 했음에도 불구하고 refresh thread 수가 증가 한다는 것이였습니다.


문제는 역시 elasticsearch 에 있었던 것이 아닌 저의 잘 못 이였습니다.

이유는 제가 정의한 mapping 정보에서 dynamic field 에 따른 template 구성이 영향을 주는 것이였습니다.

결과적으로 dynamic field 설정으로 색인 시 mapping 정보가 바뀌게 되고 이를 반영 하기위해 IndexService 가 updateMetaData() 를 수행 하게 됩니다. 이 과정에서 자동으로 refresh 가 발생을 하기 때문에 bulk request 시 왜 성능이 안나오지 하지 마시고 어떤 구성을 하셨는지 부터 분석해 보시면 더 좋을 것 같습니다.


:

[Elasticsearch] Transport Bulk to Rest Bulk data format 변환

Elastic/Elasticsearch 2016. 7. 22. 10:02

java 로 bulk indexing 코드를 구현할 경우 색인 데이터 format을 그대로 rest bulk indexing 에서 사용을 할 수가 없습니다.

그래서 변환 하는 스크립트를 간단하게 작성해 봤습니다.


Reference)

https://www.elastic.co/guide/en/elasticsearch/reference/2.3/docs-bulk.html


Java Bulk Indexing Format)

{ "field1" : "value1" }


Rest Bulk Indexing Format)

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }

{ "field1" : "value1" }


보시면 아시겠지만 index/type/id 에 대한 meta 정보가 있느냐 없느냐의 차이 입니다.

당연하겠지만 java api 에서는 meta 정보를 set 하도록 되어 있습니다. 하지만 rest api 에서는 set 하는 과정이 없기 때문에 당연히 정보를 위와 같이 넣어 줘야 합니다.


변환 스크립트)

#!/bin/bash


while read line

do

  header="{ \"index\" : { \"_index\" : \"INDEX_NAME\", \"_type\" : \"TYPE_NAME\" } }"

  echo -e $header >> query_result.txt

  echo -e $line >> query_result.txt

done < $1


실행)

$ ./convertJavaToRestFormat.sh query_result.json


Rest Bulk Action)

$ curl -XPOST 'http://localhost:9200/INDEX_NAME/TYPE_NAME/_bulk' --data-binary "@query_result.txt"


:

[Elasticsearch - The Definitive Guide] bulk-format

Elastic/TheDefinitiveGuide 2015. 11. 30. 11:43

Elasticsearch에서 bulk request 시 사용하는 format 에 대한 이야기 입니다.

이 이야기를 기록하는 이유는 elasticsearch 내부에서 동작하는 원리를 쉽게 이해 할 수 있기 때문 입니다.


원문링크)

https://www.elastic.co/guide/en/elasticsearch/guide/current/distrib-multi-doc.html#bulk-format


[Why the funny format?]

  • Parse the JSON into an array (including the document data, which can be very large)
  • Look at each request to determine which shard it should go to
  • Create an array of requests for each shard
  • Serialize these arrays into the internal transport format
  • Send the requests to each shard


정리하자면,

bulk request 를 받아서 다시 풀고 적절한 node의 shard 로 forward 되도록 생성한 후 send or request 한다 입니다.

:

[ElasticSearch] 색인 성능..

Elastic/Elasticsearch 2014. 7. 15. 14:36

장비 : 32 코어, 64G, 6대

파일 크기 : 8.5GB

문서 수 : 66,661,310개

문서 당 : 100개 이상 필드에 20개 이상 색인(not_analyzed) 필드의 경우..

노드당 : 초당 26,453개 색인.

총 색인 시간 : 7분

:

[Elasticsearch] Bulk Indexing 후 Data Consistency 관리.

Elastic/Elasticsearch 2013. 9. 17. 12:35

벌크 인덱싱 후 간혹 샤드간에 데이터가 왔다 갔다 할 때가 있습니다.

이건 elasticsearch 색인이 잘못 되었다고 보시면 안됩니다.

인덱스와 샤드의 status 정보를 보면

- num_docs

- max_docs

- deleted_docs

이런게 있죠.


기본적으로 벌크 색인 시 empty index 에 색인을 하게 될 텐데요.

색인 도중 문제가 발생해서 일부 색인 데이터가 들어가고 다시 같은 인덱스에 색인을 하게 되면  updateDocument 를 실행 하게 됩니다.

뭐 루씬 API 보시면 이제 add/update 가 하나의 API 로 동작 하긴 합니다.

암튼 이렇게 되다 보니 deleted_docs 가 발생 하게 되고 num_docs 와 max_docs 수치가 안맞게 됩니다.

즉, num_docs = max_docs - deleteed_docs 와 같습니다.


이런 관계로 색인 완료 시 recovery 또는 optimize 작업을 통해서 data consistency 를 보정해 주어야 합니다.

그럼 이걸 어떻게 할까요?


아래 명령어로 해보시고  ES  API 문서 참고 하시면 되겠습니다.

(거의 대부분 default value 입니다.)


[Java API]

new OptimizeRequest()

    .indices(indice)

    .flush(true)

    .onlyExpungeDeletes(false)

    .refresh(true)

    .waitForMerge(true)

    .operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD)

    .maxNumSegments(1)



[Rest API]

curl -XPOST 'http://localhost:9200/indice/_optimize?only_expunge_deletes=false&max_num_segments=1&refresh=true&flush=true&wait_for_merge=true'



[Recovery]

curl -XPOST 'http://localhost:9200/indice/_close'

curl -XPOST 'http://localhost:9200/indice/_open'


client.admin()

    .indices()

    .close(new CloseIndexRequest("indice"));


client.admin()

    .indices()

    .close(new OpenIndexRequest("indice"));



[Reference]

http://www.elasticsearch.org/guide/reference/api/admin-indices-optimize/

http://www.elasticsearch.org/guide/reference/api/admin-indices-open-close/


:

[elasticsearch] Java API : Bulk

Elastic/Elasticsearch 2013. 9. 16. 18:34

본 문서는 개인적인 테스트와 elasticsearch.org 그리고 community 등을 참고해서 작성된 것이며,

정보 교환이 목적입니다.


잘못된 부분에 대해서는 지적 부탁 드립니다.

(예시 코드는 성능 및 보안 검증이 되지 않았습니다.)


벌크 색인 시 actionGet(TIMEOUT)  부분에 TIMEOUT 은 빼고 돌리시는게 좋습니다.

돌리시다 보면 timeout exception 이 발생 할 때가 있는데 timeout 값을 제거 하고 돌리시면 관련 에러는 없어집니다.


new OptimizeRequest()

.indices(indice)

.flush(true)

.onlyExpungeDeletes(true)

.refresh(true)

.waitForMerge(true)

.operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD)

.maxNumSegments(1))




[elasticsearch java api 리뷰]

원문 링크 : http://www.elasticsearch.org/guide/reference/java-api/bulk/


- 이 API는 한번의 요청으로 여러개의 문서를 색인 또는 삭제 할 수 있습니다.


우선 원문에 나와 있는 예제 부터 살펴 보겠습니다.


[원문예제]

BulkRequestBuilder bulkRequest = client.prepareBulk();


// either use client#prepare, or use Requests# to directly build index/delete requests

bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")

        .setSource(jsonBuilder()

                    .startObject()

                        .field("user", "kimchy")

                        .field("postDate", new Date())

                        .field("message", "trying out Elastic Search")

                    .endObject()

                  )

        );


bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")

        .setSource(jsonBuilder()

                    .startObject()

                        .field("user", "kimchy")

                        .field("postDate", new Date())

                        .field("message", "another post")

                    .endObject()

                  )

        );

        

BulkResponse bulkResponse = bulkRequest.execute().actionGet();

if (bulkResponse.hasFailures()) {

    // process failures by iterating through each bulk response item

}


- 기본 client.prepareIndex() 를 이용해서 문서 하나를 색인 요청 하는 것과 유사 합니다.

- 다만, bulkRequest 의 경우 이런 하나의 문서를 여러개 add(document) 해서 한번에 요청을 한다는 것입니다.

- index name : twitter

- index type : tweet

- document id (_id) : 1 과 2

- document id 를 지정 하지 않을 경우 자동으로 생성을 하게 됩니다.


아래 예제는 글 http://jjeong.tistory.com/792 에서 생성한 index 에 bulkRequest 를 보내는 것입니다.


[간단한 Bulk Request 예제]

BulkRequestBuilder bulkRequest;

BulkResponse bulkResponse;

String ts;

bulkRequest = client.prepareBulk();

int i=0;

for (i=0; i<100000; i++) {

ts = String.valueOf(System.currentTimeMillis());


bulkRequest.add(client.prepareIndex("facebook", "facebook_post")

.setRouting(ts)

.setOperationThreaded(false)

.setSource(jsonBuilder()

.startObject()

.field("docid", ts)

.field("title", "document "+String.valueOf(i))

.endObject()

)

)

.setReplicationType(ReplicationType.ASYNC)

.setConsistencyLevel(WriteConsistencyLevel.QUORUM)

.setRefresh(false);

if ( (i%100) == 0 ) {

bulkRequest.execute().actionGet(5000);

bulkRequest = client.prepareBulk();

log.debug("DOCUMENT Sequence : {}", i);

}

}


bulkRequest.execute().actionGet(5000);

log.debug("DOCUMENT Sequence : {}", i);

- 코드를 보시면 아시겠지만 문서 구조는 docid, title 로 아주 심플합니다.

- docid 에 timestamp를 넣고 document 에는 문자열 "document $SEQ"가 들어 갑니다. 

- 문서를 100개 단위로 묶어서 request 하게 됩니다.


[Bulk Request 고려사항]

- threadpool 세팅을 검토해 보셔야 합니다.

http://www.elasticsearch.org/guide/reference/modules/threadpool/


- refresh 설정을 검토해 보셔야 합니다.

disable 또는 10초 이상 설정


- index merge segment 크기를 조정해야 합니다.

20 또는 30 정도


- threadsafe 하지 않다는 걸 인지해야 합니다.

아직까지는 bulk request 시에 multithread 지원이 되지 않습니다.


- async 방식을 추천 합니다.


:

[Elasticsearch] json bulk insert.

Elastic/Elasticsearch 2013. 8. 14. 12:09

참고 사이트 : http://www.elasticsearch.org/guide/reference/api/bulk/


[Run]

curl -s -XPOST 'http://192.168.56.104:9200/test/_bulk' --data-binary @test.json


[test.json]

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" , "_routing" : "1" } }

{"docid":1, "title":"title1"}

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "2" , "_routing" : "1" } }

{"docid":2, "title":"title2"}

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "3" , "_routing" : "1" } }

{"docid":3, "title":"title3"}



: