[Elasticsearch] GET API 에 대해서

Elastic/Elasticsearch 2018. 5. 9. 14:22

평소 Get API 에 대해서 설명을 할 때 document id 로 lookup 을 하기 때문에 모든 shard 로 request 를 보내지 않아 빠르다고 설명을 했습니다.

이 과정에서 놓치기 쉬운 부분이 있는데 기본적으로 Get API 는 realtime  동작을 합니다.

즉, refresh 동작과 상관 없이 기본적으로 최신 정보를 가져오게 되는 것입니다.


다시 말해, 내부적으로 refresh 동작을 수행한다는 의미가 됩니다.

그래서 Get API 에서는 realtime 이라는 변수를 통해서 realtime Get 을 사용 할 것인지 말 것인지를 정의 할 수 있습니다.


아래는 위 설명을 이해 하는데 도움을 주기 위해서 소스 코드의 일부를 발췌 하였습니다.


[GetRequest.java]

private boolean refresh = false;

boolean realtime = true;
/**
* Should a refresh be executed before this get operation causing the operation to
* return the latest value. Note, heavy get should not set this to {@code true}. Defaults
* to {@code false}.
*/
public GetRequest refresh(boolean refresh) {
this.refresh = refresh;
return this;
}

public boolean refresh() {
return this.refresh;
}

public boolean realtime() {
return this.realtime;
}

@Override
public GetRequest realtime(boolean realtime) {
this.realtime = realtime;
return this;
}


[RestGetAction.java]

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final boolean includeTypeName = request.paramAsBoolean("include_type_name", true);
final String type = request.param("type");
if (includeTypeName == false && MapperService.SINGLE_MAPPING_NAME.equals(type) == false) {
throw new IllegalArgumentException("You may only use the [include_type_name=false] option with the get APIs with the " +
"[{index}/_doc/{id}] endpoint.");
}
final GetRequest getRequest = new GetRequest(request.param("index"), type, request.param("id"));
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.routing(request.param("routing"));
getRequest.preference(request.param("preference"));
getRequest.realtime(request.paramAsBoolean("realtime", getRequest.realtime()));
if (request.param("fields") != null) {
throw new IllegalArgumentException("the parameter [fields] is no longer supported, " +
"please use [stored_fields] to retrieve stored fields or [_source] to load the field from _source");
}
final String fieldsParam = request.param("stored_fields");
if (fieldsParam != null) {
final String[] fields = Strings.splitStringByCommaToArray(fieldsParam);
if (fields != null) {
getRequest.storedFields(fields);
}
}

getRequest.version(RestActions.parseVersion(request));
getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType()));

getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request));

return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) {
@Override
protected RestStatus getStatus(final GetResponse response) {
return response.isExists() ? OK : NOT_FOUND;
}
});
}


[TransportGetAction.java]

@Override
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
listener.onResponse(shardOperation(request, shardId));
} else {
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}

@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());

if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}

GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
return new GetResponse(result);
}

보시면 아시겠지만 refresh 와 realtime 의 동작에는 차이가 있습니다.

이해 하기 쉽게 정리 하면,

- refresh 는 get 수행 전에 relevant shard 를 대상으로 refresh 동작을 먼저 하기 때문에 성능 저하가 있을 수 있습니다.

- realtime 은 수행 시점에 refresh 동작을 수행 하게 됩니다. (이 경우 refresh searcher 의 대상은 internal searcher 가 됩니다.)

- 여기서 translog 에서 데이터를 읽어야 하는 경우가 있는데 이 경우는 update API 에서 사용 되게 됩니다.


[InternalEngine.java]

@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
SearcherScope scope;
if (get.realtime()) {
VersionValue versionValue = null;
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
}
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
if (get.isReadFromTranslog()) {
// this is only used for updates - API _GET calls will always read form a reader for consistency
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
.getIndexSettings().getIndexVersionCreated());
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
throw new EngineException(shardId, "failed to read operation from translog", e);
}
} else {
trackTranslogLocation.set(true);
}
}
refresh("realtime_get", SearcherScope.INTERNAL);
}
scope = SearcherScope.INTERNAL;
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
scope = SearcherScope.EXTERNAL;
}

// no version, get the version from the index, we know that we refresh on flush
return getFromSearcher(get, searcherFactory, scope);
}
}


[코드의 이해]

RestGetAction -> TransportGetAction (TransportSingleShardAction) -> IndexShard ->

ShardGetService -> InternalEngine (Engine) -> SearcherManager(ReferenceManager)


: