[Elasticsearch] Elasticsearch ES-Hadoop 내 Spark Bulk Request Error Handler

Elastic/Elasticsearch 2022. 9. 20. 12:35

Spark 을 이용해서 Elasticsearch 로 Bulk Request 를 사용할 경우 내부에서는 BulkProcessor 를 이용해서 요청을 하게 됩니다.
보통 BulkRequest 사용 시 색인 요청한 문서중 일부 오류가 발생 하는 경우 전체 문서 색인이 실패 하는 것이 아닌 오류 문서만 색인이 안되고, 나머지 문서들은 색인이 완료 되는데요. 
ES-Hadoop 내 Spark 의 경우 기본 Error Handler 가 AbortOnFailure 라서 요청한 모든 문서가 색인이 실패 하게 됩니다.
이를 변경 하기 위해서는 Error Handler 설정을 아래와 같이 변경하고 사용 하시면 됩니다.

[참고문서]
https://www.elastic.co/guide/en/elasticsearch/hadoop/8.4/errorhandlers.html#errorhandlers-bulk-use

[설정]

example)
es.write.data.error.handlers = log

SparkSession.builder().appName("...").config("es.write.rest.error.handlers", "log");


[참고코드]

// BulkWriteHandlerLoader / BulkWriteErrorHandler

@Override
protected IBulkWriteErrorHandler loadBuiltInHandler(AbstractHandlerLoader.NamedHandlers handlerName) {
    ErrorHandler<BulkWriteFailure, byte[], DelayableErrorCollector<byte[]>> genericHandler;
    switch (handlerName) {
        case FAIL:
            genericHandler = AbortOnFailure.create();
            break;
        case LOG:
            genericHandler = DropAndLog.create(new BulkLogRenderer());
            break;
        case ES:
            genericHandler = ElasticsearchHandler.create(getSettings(), new BulkErrorEventConverter());
            break;
        default:
            throw new EsHadoopIllegalArgumentException(
                    "Could not find default implementation for built in handler type [" + handlerName + "]"
            );
    }
    return new DelegatingErrorHandler(genericHandler);
}

기본적으로는 BulkRequest 와 BulkProcessor 에 대해서 문서를 찾아 보시면 도움이 되실 것 같습니다.

 

Thanks, 캉테

: