[Elasticsearch] Elasticsearch ES-Hadoop 내 Spark Bulk Request Error Handler
Elastic/Elasticsearch 2022. 9. 20. 12:35Spark 을 이용해서 Elasticsearch 로 Bulk Request 를 사용할 경우 내부에서는 BulkProcessor 를 이용해서 요청을 하게 됩니다.
보통 BulkRequest 사용 시 색인 요청한 문서중 일부 오류가 발생 하는 경우 전체 문서 색인이 실패 하는 것이 아닌 오류 문서만 색인이 안되고, 나머지 문서들은 색인이 완료 되는데요.
ES-Hadoop 내 Spark 의 경우 기본 Error Handler 가 AbortOnFailure 라서 요청한 모든 문서가 색인이 실패 하게 됩니다.
이를 변경 하기 위해서는 Error Handler 설정을 아래와 같이 변경하고 사용 하시면 됩니다.
[참고문서]
https://www.elastic.co/guide/en/elasticsearch/hadoop/8.4/errorhandlers.html#errorhandlers-bulk-use
[설정]
example)
es.write.data.error.handlers = log
SparkSession.builder().appName("...").config("es.write.rest.error.handlers", "log");
[참고코드]
// BulkWriteHandlerLoader / BulkWriteErrorHandler
@Override
protected IBulkWriteErrorHandler loadBuiltInHandler(AbstractHandlerLoader.NamedHandlers handlerName) {
ErrorHandler<BulkWriteFailure, byte[], DelayableErrorCollector<byte[]>> genericHandler;
switch (handlerName) {
case FAIL:
genericHandler = AbortOnFailure.create();
break;
case LOG:
genericHandler = DropAndLog.create(new BulkLogRenderer());
break;
case ES:
genericHandler = ElasticsearchHandler.create(getSettings(), new BulkErrorEventConverter());
break;
default:
throw new EsHadoopIllegalArgumentException(
"Could not find default implementation for built in handler type [" + handlerName + "]"
);
}
return new DelegatingErrorHandler(genericHandler);
}
기본적으로는 BulkRequest 와 BulkProcessor 에 대해서 문서를 찾아 보시면 도움이 되실 것 같습니다.
Thanks, 캉테