Skip to content

Commit

Permalink
Move timeout to TransportEsqlQueryAction
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Sep 18, 2023
1 parent 2728c85 commit 7b71306
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.search.SearchRequest;
Expand All @@ -17,7 +16,6 @@
import org.elasticsearch.action.search.SearchShardsResponse;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.ListenerTimeouts;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
Expand Down Expand Up @@ -141,64 +139,41 @@ public void execute(
LOGGER.debug("Sending data node plan\n{}\n with filter [{}]", dataNodePlan, requestFilter);

String[] originalIndices = PlannerUtils.planOriginalIndices(physicalPlan);

computeTargetNodes(
rootTask,
requestFilter,
concreteIndices,
originalIndices,
wrapTimeout(
configuration,
transportService.getThreadPool(),
rootTask,
listener.delegateFailureAndWrap((delegate, targetNodes) -> {
final ExchangeSourceHandler exchangeSource = exchangeService.createSourceHandler(
listener.delegateFailureAndWrap((delegate, targetNodes) -> {
final ExchangeSourceHandler exchangeSource = exchangeService.createSourceHandler(
sessionId,
queryPragmas.exchangeBufferSize(),
ESQL_THREAD_POOL_NAME
);
try (
Releasable ignored = exchangeSource::decRef;
RefCountingListener requestRefs = new RefCountingListener(delegate.map(unused -> collectedPages))
) {
final AtomicBoolean cancelled = new AtomicBoolean();
// wait until the source handler is completed
exchangeSource.addCompletionListener(requestRefs.acquire());
// run compute on the coordinator
var computeContext = new ComputeContext(sessionId, List.of(), configuration, exchangeSource, null);
runCompute(rootTask, computeContext, coordinatorPlan, cancelOnFailure(rootTask, cancelled, requestRefs.acquire()));
// run compute on remote nodes
// TODO: This is wrong, we need to be able to cancel
runComputeOnRemoteNodes(
sessionId,
queryPragmas.exchangeBufferSize(),
ESQL_THREAD_POOL_NAME
rootTask,
configuration,
dataNodePlan,
exchangeSource,
targetNodes,
() -> cancelOnFailure(rootTask, cancelled, requestRefs.acquire()).map(unused -> null)
);
try (
Releasable ignored = exchangeSource::decRef;
RefCountingListener requestRefs = new RefCountingListener(delegate.map(unused -> collectedPages))
) {
final AtomicBoolean cancelled = new AtomicBoolean();
// wait until the source handler is completed
exchangeSource.addCompletionListener(requestRefs.acquire());
// run compute on the coordinator
var computeContext = new ComputeContext(sessionId, List.of(), configuration, exchangeSource, null);
runCompute(rootTask, computeContext, coordinatorPlan, cancelOnFailure(rootTask, cancelled, requestRefs.acquire()));
// run compute on remote nodes
// TODO: This is wrong, we need to be able to cancel
runComputeOnRemoteNodes(
sessionId,
rootTask,
configuration,
dataNodePlan,
exchangeSource,
targetNodes,
() -> cancelOnFailure(rootTask, cancelled, requestRefs.acquire()).map(unused -> null)
);
}
})
)
}
})
);

}

private ActionListener<List<TargetNode>> wrapTimeout(
EsqlConfiguration configuration,
ThreadPool pool,
CancellableTask task,
ActionListener<List<TargetNode>> l
) {
if (configuration.timeout() != null) {
return ListenerTimeouts.wrapWithTimeout(pool, configuration.timeout(), esqlExecutor, l, (x) -> {
LOGGER.debug("cancelling ESQL task {} on failure", task);
transportService.getTaskManager().cancelTaskAndDescendants(task, "timeout", false, ActionListener.noop());
l.onFailure(new ElasticsearchTimeoutException("ESQL query timed out after {}", configuration.timeout()));
});
}
return l;
}

private void runComputeOnRemoteNodes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.ListenerTimeouts;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -21,6 +23,7 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.ColumnInfo;
Expand All @@ -47,6 +50,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
private final Executor requestExecutor;
private final EnrichLookupService enrichLookupService;
private final Settings settings;
private final TransportService transportService;

@Inject
public TransportEsqlQueryAction(
Expand Down Expand Up @@ -77,12 +81,27 @@ public TransportEsqlQueryAction(
bigArrays
);
this.settings = settings;
this.transportService = transportService;
}

@Override
protected void doExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
requestExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, l)));
final ActionListener<EsqlQueryResponse> wrappedListener;
final TimeValue timeout = timeout(request);
if (timeout != null) {
final ThreadPool threadPool = transportService.getThreadPool();
final var executor = threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME);
wrappedListener = ListenerTimeouts.wrapWithTimeout(threadPool, timeout, executor, listener, l -> {
logger.debug("cancelling ESQL task {} on timeout", task);
final TaskManager taskManager = transportService.getTaskManager();
taskManager.cancelTaskAndDescendants((CancellableTask) task, "timeout", false, ActionListener.noop());
listener.onFailure(new ElasticsearchTimeoutException("ESQL query timed out after {}", timeout));
});
} else {
wrappedListener = listener;
}
requestExecutor.execute(ActionRunnable.wrap(wrappedListener, l -> doExecuteForked(task, request, l)));
}

private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
Expand Down

0 comments on commit 7b71306

Please sign in to comment.