Skip to content

Commit

Permalink
Fix expiration time in async search response (#55435)
Browse files Browse the repository at this point in the history
This change ensures that we return the latest expiration time
when retrieving the response from the index.
This commit also fixes a bug that stops the garbage collection of saved responses if the async search index is deleted.
  • Loading branch information
jimczi committed Apr 21, 2020
1 parent 5696b82 commit 1825c78
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class GetAsyncSearchRequest implements Validatable {
private TimeValue waitForCompletion;
private TimeValue keepAlive;

public static final long MIN_KEEPALIVE = TimeValue.timeValueMinutes(1).millis();

private final String id;

public GetAsyncSearchRequest(String id) {
Expand Down Expand Up @@ -62,14 +60,7 @@ public void setKeepAlive(TimeValue keepAlive) {

@Override
public Optional<ValidationException> validate() {
final ValidationException validationException = new ValidationException();
if (keepAlive != null && keepAlive.getMillis() < MIN_KEEPALIVE) {
validationException.addValidationError("keep_alive must be greater than 1 minute, got: " + keepAlive.toString());
}
if (validationException.validationErrors().isEmpty()) {
return Optional.empty();
}
return Optional.of(validationException);
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
*/
public class SubmitAsyncSearchRequest implements Validatable {

public static final int DEFAULT_BATCHED_REDUCE_SIZE = 5;

public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();

private TimeValue waitForCompletionTimeout;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public SearchShardTarget(String nodeId, ShardId shardId, @Nullable String cluste

@Nullable
public String getNodeId() {
return nodeId.string();
return nodeId != null ? nodeId.string() : null;
}

public Text getNodeIdText() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
Expand All @@ -38,6 +38,8 @@
import java.util.List;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;

public final class AsyncSearch extends Plugin implements ActionPlugin {
private final Settings settings;

Expand Down Expand Up @@ -82,11 +84,16 @@ public Collection<Object> createComponents(Client client,
AsyncSearchIndexService indexService =
new AsyncSearchIndexService(clusterService, threadPool.getThreadContext(), client, namedWriteableRegistry);
AsyncSearchMaintenanceService maintenanceService =
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), threadPool, indexService, TimeValue.timeValueHours(1));
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), settings, threadPool, indexService);
clusterService.addListener(maintenanceService);
return Collections.singletonList(maintenanceService);
} else {
return Collections.emptyList();
}
}

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,16 @@ void getResponse(AsyncSearchId searchId,
return;
}

if (restoreResponseHeaders) {
if (restoreResponseHeaders && get.getSource().containsKey(RESPONSE_HEADERS_FIELD)) {
@SuppressWarnings("unchecked")
Map<String, List<String>> responseHeaders = (Map<String, List<String>>) get.getSource().get(RESPONSE_HEADERS_FIELD);
restoreResponseHeadersContext(securityContext.getThreadContext(), responseHeaders);
}

long expirationTime = (long) get.getSource().get(EXPIRATION_TIME_FIELD);
String encoded = (String) get.getSource().get(RESULT_FIELD);
listener.onResponse(encoded != null ? decodeResponse(encoded) : null);
AsyncSearchResponse response = decodeResponse(encoded, expirationTime);
listener.onResponse(encoded != null ? response : null);
},
listener::onFailure
));
Expand Down Expand Up @@ -331,11 +333,11 @@ String encodeResponse(AsyncSearchResponse response) throws IOException {
/**
* Decode the provided base-64 bytes into a {@link AsyncSearchResponse}.
*/
AsyncSearchResponse decodeResponse(String value) throws IOException {
AsyncSearchResponse decodeResponse(String value, long expirationTime) throws IOException {
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
in.setVersion(Version.readVersion(in));
return new AsyncSearchResponse(in);
return new AsyncSearchResponse(in, expirationTime);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.gateway.GatewayService;
Expand All @@ -26,30 +28,40 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.xpack.search.AsyncSearchIndexService.EXPIRATION_TIME_FIELD;
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX;

/**
* A service that runs a periodic cleanup over the async-search index.
*/
class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener {
private static final Logger logger = LogManager.getLogger(AsyncSearchMaintenanceService.class);

/**
* Controls the interval at which the cleanup is scheduled.
* Defaults to 1h. It is an undocumented/expert setting that
* is mainly used by integration tests to make the garbage
* collection of search responses more reactive.
*/
public static final Setting<TimeValue> ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING =
Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope);

private final String localNodeId;
private final ThreadPool threadPool;
private final AsyncSearchIndexService indexService;
private final TimeValue delay;

private final AtomicBoolean isCleanupRunning = new AtomicBoolean(false);
private boolean isCleanupRunning;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private volatile Scheduler.Cancellable cancellable;

AsyncSearchMaintenanceService(String localNodeId,
Settings nodeSettings,
ThreadPool threadPool,
AsyncSearchIndexService indexService,
TimeValue delay) {
AsyncSearchIndexService indexService) {
this.localNodeId = localNodeId;
this.threadPool = threadPool;
this.indexService = indexService;
this.delay = delay;
this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings);
}

@Override
Expand All @@ -62,39 +74,38 @@ public void clusterChanged(ClusterChangedEvent event) {
tryStartCleanup(state);
}

void tryStartCleanup(ClusterState state) {
synchronized void tryStartCleanup(ClusterState state) {
if (isClosed.get()) {
return;
}
IndexRoutingTable indexRouting = state.routingTable().index(AsyncSearchIndexService.INDEX);
if (indexRouting == null) {
if (isCleanupRunning.compareAndSet(true, false)) {
close();
}
stop();
return;
}
String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId();
if (localNodeId.equals(primaryNodeId)) {
if (isCleanupRunning.compareAndSet(false, true)) {
if (isCleanupRunning == false) {
isCleanupRunning = true;
executeNextCleanup();
}
} else if (isCleanupRunning.compareAndSet(true, false)) {
close();
} else {
stop();
}
}

synchronized void executeNextCleanup() {
if (isClosed.get() == false && isCleanupRunning.get()) {
if (isClosed.get() == false && isCleanupRunning) {
long nowInMillis = System.currentTimeMillis();
DeleteByQueryRequest toDelete = new DeleteByQueryRequest()
DeleteByQueryRequest toDelete = new DeleteByQueryRequest(INDEX)
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis));
indexService.getClient()
.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(() -> scheduleNextCleanup()));
}
}

synchronized void scheduleNextCleanup() {
if (isClosed.get() == false && isCleanupRunning.get()) {
if (isClosed.get() == false && isCleanupRunning) {
try {
cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC);
} catch (EsRejectedExecutionException e) {
Expand All @@ -107,11 +118,18 @@ synchronized void scheduleNextCleanup() {
}
}

synchronized void stop() {
if (isCleanupRunning) {
if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel();
}
isCleanupRunning = false;
}
}

@Override
public void close() {
if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel();
}
stop();
isClosed.compareAndSet(false, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ private void executeCompletionListeners() {
*/
private AsyncSearchResponse getResponse() {
assert searchResponse.get() != null;
checkCancellation();
return searchResponse.get().toAsyncSearchResponse(this, expirationTimeMillis);
}

Expand All @@ -306,15 +307,17 @@ private AsyncSearchResponse getResponse() {
*/
private AsyncSearchResponse getResponseWithHeaders() {
assert searchResponse.get() != null;
checkCancellation();
return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis);
}



// checks if the search task should be cancelled
private void checkCancellation() {
private synchronized void checkCancellation() {
long now = System.currentTimeMillis();
if (expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
if (hasCompleted == false &&
expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
// we cancel the search task if the initial submit task was cancelled,
// this is needed because the task cancellation mechanism doesn't
// handle the cancellation of grand-children.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
Expand All @@ -25,6 +26,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.tasks.CancellableTask;
Expand Down Expand Up @@ -187,7 +189,9 @@ private void onFinalResponse(CancellableTask submitTask,
store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(),response,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
if (exc.getCause() instanceof DocumentMissingException == false) {
Throwable cause = ExceptionsHelper.unwrapCause(exc);
if (cause instanceof DocumentMissingException == false &&
cause instanceof VersionConflictEngineException == false) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
searchTask.getSearchId().getEncoded()), exc);
}
Expand Down
Loading

0 comments on commit 1825c78

Please sign in to comment.