From a3ccc7f20594070d5e8c00c5e0d5b1bcc505f3fa Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Wed, 13 Sep 2023 14:25:04 +0200 Subject: [PATCH 01/11] Add timeout to ESQL --- .../xpack/esql/qa/single_node/RestEsqlIT.java | 44 +++++++++++++++++++ .../xpack/esql/qa/rest/RestEsqlTestCase.java | 5 +++ .../xpack/esql/EsqlTestUtils.java | 3 +- .../xpack/esql/action/EsqlQueryRequest.java | 14 ++++++ .../xpack/esql/plugin/ComputeService.java | 22 +++++++++- .../esql/plugin/TransportEsqlQueryAction.java | 22 +++++++++- .../xpack/esql/session/EsqlConfiguration.java | 15 ++++++- .../elasticsearch/xpack/esql/CsvTests.java | 3 +- .../optimizer/PhysicalPlanOptimizerTests.java | 3 +- .../xpack/esql/planner/EvalMapperTests.java | 10 ++++- .../planner/LocalExecutionPlannerTests.java | 3 +- .../esql/plugin/DataNodeRequestTests.java | 3 +- .../EsqlConfigurationSerializationTests.java | 7 ++- 13 files changed, 141 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index efb7192bbc3e8..915c6145e3ce2 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -52,6 +52,50 @@ public void testBasicEsql() throws IOException { assertEquals(List.of(List.of(499.5d)), result.get("values")); } + public void testTimeout() throws IOException { + StringBuilder b = new StringBuilder(); + for (int i = 0; i < 10000; i++) { + b.append(String.format(Locale.ROOT, """ + {"create":{"_index":"%s"}} + {"test":"value%s"} + """, testIndexName(), randomAlphaOfLength(20) + " " + randomAlphaOfLength(20) + " " + randomAlphaOfLength(20))); + } + Request bulk = new Request("POST", "/_bulk"); + bulk.addParameter("refresh", "true"); + bulk.addParameter("filter_path", "errors"); + bulk.setJsonEntity(b.toString()); + Response response = client().performRequest(bulk); + Assert.assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + + // let's make it a bit expensive, so that the timeout will most likely trigger + RequestObjectBuilder builder = new RequestObjectBuilder().query(fromIndex() + """ + | grok test "%{NOTSPACE:a} %{NOTSPACE:b} %{NOTSPACE:c}" + | grok test "%{NOTSPACE:c} %{NOTSPACE:d} %{NOTSPACE:e}" + | grok test "%{NOTSPACE:f} %{NOTSPACE:g} %{NOTSPACE:h}" + | grok test "%{NOTSPACE:i} %{NOTSPACE:j} %{NOTSPACE:k}" + | grok test "%{WORD:l} %{WORD:m} %{WORD:n}" + | grok test "%{WORD:o} %{WORD:p} %{WORD:q}" + | grok test "%{USERNAME:r} %{USERNAME:s} %{USERNAME:t}" + | grok test "%{USERNAME:u} %{USERNAME:v} %{USERNAME:w}" + | grok test "%{DATA:x} %{DATA:y} %{DATA:z}" + | sort test + | limit 9999 + | sort b desc + | limit 9998 + | sort c + | limit 9997 + | eval s = length(a) + length(b) * length(c) + | stats max(s) + """); + if (Build.current().isSnapshot()) { + builder.pragmas(Settings.builder().put("data_partitioning", "shard").build()); + } + builder.timeout(0); + builder.build(); + ResponseException re = expectThrows(ResponseException.class, () -> runEsql(builder)); + assertThat(EntityUtils.toString(re.getResponse().getEntity()), containsString("ESQL query timed out after 0s")); + } + public void testInvalidPragma() throws IOException { assumeTrue("pragma only enabled on snapshot builds", Build.current().isSnapshot()); RequestObjectBuilder builder = new RequestObjectBuilder().query("row a = 1, b = 2"); diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index 7d90cf47cae09..05cf95dfbb6bf 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -86,6 +86,11 @@ public RequestObjectBuilder timeZone(ZoneId zoneId) throws IOException { return this; } + public RequestObjectBuilder timeout(int timeout) throws IOException { + builder.field("timeout", timeout); + return this; + } + public RequestObjectBuilder pragmas(Settings pragmas) throws IOException { builder.startObject("pragma"); pragmas.toXContent(builder, ToXContent.EMPTY_PARAMS); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index dd86742785d15..2725600c45100 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -87,7 +87,8 @@ public byte[] max(String field, DataType dataType) { null, null, new QueryPragmas(Settings.EMPTY), - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY) + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY), + null ); private EsqlTestUtils() {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java index 5196cbb0dfd1c..8fd5a5cd3b9d6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.CancellableTask; @@ -35,6 +36,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -65,6 +67,8 @@ public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesR private static final ParseField PARAMS_FIELD = new ParseField("params"); private static final ParseField LOCALE_FIELD = new ParseField("locale"); + private static final ParseField TIMEOUT_FIELD = new ParseField("timeout"); + private static final ObjectParser PARSER = objectParser(EsqlQueryRequest::new); private String query; @@ -72,6 +76,7 @@ public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesR private ZoneId zoneId; private Locale locale; private QueryBuilder filter; + private TimeValue timeout; private QueryPragmas pragmas = new QueryPragmas(Settings.EMPTY); private List params = List.of(); @@ -149,6 +154,14 @@ public void params(List params) { this.params = params; } + public void timeout(TimeValue val) { + this.timeout = val; + } + + public TimeValue timeout() { + return timeout; + } + public static EsqlQueryRequest fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } @@ -166,6 +179,7 @@ private static ObjectParser objectParser(Supplier request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD); + parser.declareInt((request, value) -> request.timeout(new TimeValue(value, TimeUnit.MILLISECONDS)), TIMEOUT_FIELD); return parser; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 27634bf0d4eaa..f8e4446927350 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.search.SearchRequest; @@ -17,6 +18,7 @@ import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.action.support.ListenerTimeouts; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -139,12 +141,13 @@ public void execute( LOGGER.info("Sending data node plan\n{}\n with filter [{}]", dataNodePlan, requestFilter); String[] originalIndices = PlannerUtils.planOriginalIndices(physicalPlan); + computeTargetNodes( rootTask, requestFilter, concreteIndices, originalIndices, - listener.delegateFailureAndWrap((delegate, targetNodes) -> { + wrapTimeout(configuration, transportService.getThreadPool(), listener.delegateFailureAndWrap((delegate, targetNodes) -> { final ExchangeSourceHandler exchangeSource = exchangeService.createSourceHandler( sessionId, queryPragmas.exchangeBufferSize(), @@ -172,8 +175,23 @@ public void execute( () -> cancelOnFailure(rootTask, cancelled, requestRefs.acquire()).map(unused -> null) ); } - }) + })) ); + + } + + private ActionListener> wrapTimeout( + EsqlConfiguration configuration, + ThreadPool pool, + ActionListener> l + ) { + if (configuration.timeout() != null) { + return ListenerTimeouts.wrapWithTimeout(pool, configuration.timeout(), esqlExecutor, l, (x) -> { + String timeoutMessage = "ESQL query timed out after {}"; + l.onFailure(new ElasticsearchTimeoutException(timeoutMessage, configuration.timeout())); + }); + } + return l; } private void runComputeOnRemoteNodes( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 614277e9d7216..b9aed6b3b9b38 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.search.SearchService; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; @@ -35,9 +36,12 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; public class TransportEsqlQueryAction extends HandledTransportAction { + public static TimeValue DEFAULT_TIMEOUT = new TimeValue(4, TimeUnit.MINUTES); + public static TimeValue MAX_TIMEOUT = new TimeValue(4, TimeUnit.MINUTES); private final PlanExecutor planExecutor; private final ComputeService computeService; private final ExchangeService exchangeService; @@ -84,6 +88,8 @@ protected void doExecute(Task task, EsqlQueryRequest request, ActionListener listener) { + TimeValue timeout = timeout(request); + EsqlConfiguration configuration = new EsqlConfiguration( request.zoneId() != null ? request.zoneId() : ZoneOffset.UTC, request.locale() != null ? request.locale() : Locale.US, @@ -91,9 +97,11 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener null, clusterService.getClusterName().value(), request.pragmas(), - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.get(settings) + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.get(settings), + timeout ); String sessionId = sessionID(task); + planExecutor.esql( request, sessionId, @@ -110,6 +118,18 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener ); } + private TimeValue timeout(EsqlQueryRequest request) { + TimeValue rTimeout = request.timeout(); + if (rTimeout == null) { + return DEFAULT_TIMEOUT; + } + TimeValue timeout = rTimeout.compareTo(MAX_TIMEOUT) > 0 ? MAX_TIMEOUT : rTimeout; + if (timeout.duration() < 0) { + return MAX_TIMEOUT; + } + return timeout; + } + /** * Returns the ID for this compute session. The ID is unique within the cluster, and is used * to identify the compute-session across nodes. The ID is just the TaskID of the task that diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java index 3d6f5ce18816f..1859b5646ea62 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.ql.session.Configuration; @@ -26,25 +27,30 @@ public class EsqlConfiguration extends Configuration implements Writeable { private final Locale locale; + private final TimeValue timeout; + public EsqlConfiguration( ZoneId zi, Locale locale, String username, String clusterName, QueryPragmas pragmas, - int resultTruncationMaxSize + int resultTruncationMaxSize, + TimeValue timeout ) { super(zi, username, clusterName); this.locale = locale; this.pragmas = pragmas; this.resultTruncationMaxSize = resultTruncationMaxSize; + this.timeout = timeout; } public EsqlConfiguration(StreamInput in) throws IOException { super(in.readZoneId(), Instant.ofEpochSecond(in.readVLong(), in.readVInt()), in.readOptionalString(), in.readOptionalString()); - locale = Locale.forLanguageTag(in.readString()); + this.locale = Locale.forLanguageTag(in.readString()); this.pragmas = new QueryPragmas(in); this.resultTruncationMaxSize = in.readVInt(); + this.timeout = in.readOptionalTimeValue(); } @Override @@ -58,6 +64,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(locale.toLanguageTag()); pragmas.writeTo(out); out.writeVInt(resultTruncationMaxSize); + out.writeOptionalTimeValue(timeout); } public QueryPragmas pragmas() { @@ -72,6 +79,10 @@ public Locale locale() { return locale; } + public TimeValue timeout() { + return timeout; + } + @Override public boolean equals(Object o) { if (super.equals(o)) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 8a4a53f7b986d..eacc14847e161 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -151,7 +151,8 @@ public class CsvTests extends ESTestCase { null, null, new QueryPragmas(Settings.builder().put("page_size", randomPageSize()).build()), - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY) + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY), + null ); private final FunctionRegistry functionRegistry = new EsqlFunctionRegistry(); private final EsqlParser parser = new EsqlParser(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 87693bffc4433..7a02b4b807a03 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -137,7 +137,8 @@ public static List readScriptSpec() { null, null, new QueryPragmas(settings), - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(settings) + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(settings), + null ) }; }).toList(); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java index f42c86d4b028a..68741925904a6 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java @@ -61,7 +61,15 @@ public class EvalMapperTests extends ESTestCase { private static final FieldAttribute LONG = field("long", DataTypes.LONG); private static final FieldAttribute DATE = field("date", DataTypes.DATETIME); - private static final EsqlConfiguration TEST_CONFIG = new EsqlConfiguration(ZoneOffset.UTC, Locale.US, "test", null, null, 10000000); + private static final EsqlConfiguration TEST_CONFIG = new EsqlConfiguration( + ZoneOffset.UTC, + Locale.US, + "test", + null, + null, + 10000000, + null + ); @ParametersFactory(argumentFormatting = "%1$s") public static List params() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index b1965f19e44f5..15a14bed5aa1f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -133,7 +133,8 @@ private EsqlConfiguration config() { "test_user", "test_cluser", pragmas, - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(null) + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(null), + null ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java index fae2a1caeab4c..166ba0a83213f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java @@ -182,7 +182,8 @@ static PhysicalPlan mapAndMaybeOptimize(LogicalPlan logicalPlan) { null, null, new QueryPragmas(Settings.EMPTY), - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY) + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY), + null ); var physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)); FunctionRegistry functionRegistry = new EsqlFunctionRegistry(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlConfigurationSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlConfigurationSerializationTests.java index bc1146c492a73..2c11ad1107230 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlConfigurationSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlConfigurationSerializationTests.java @@ -10,10 +10,12 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.io.IOException; +import java.util.concurrent.TimeUnit; public class EsqlConfigurationSerializationTests extends AbstractWireSerializingTestCase { @@ -35,7 +37,7 @@ public static EsqlConfiguration randomConfiguration() { var clusterName = randomAlphaOfLengthBetween(3, 10); var truncation = randomNonNegativeInt(); - return new EsqlConfiguration(zoneId, locale, username, clusterName, randomQueryPragmas(), truncation); + return new EsqlConfiguration(zoneId, locale, username, clusterName, randomQueryPragmas(), truncation, null); } @Override @@ -54,7 +56,8 @@ protected EsqlConfiguration mutateInstance(EsqlConfiguration in) throws IOExcept ordinal == 4 ? new QueryPragmas(Settings.builder().put(QueryPragmas.EXCHANGE_BUFFER_SIZE.getKey(), between(1, 10)).build()) : in.pragmas(), - ordinal == 5 ? in.resultTruncationMaxSize() + randomIntBetween(3, 10) : in.resultTruncationMaxSize() + ordinal == 5 ? in.resultTruncationMaxSize() + randomIntBetween(3, 10) : in.resultTruncationMaxSize(), + ordinal == 6 ? new TimeValue(randomIntBetween(0, 1000000), TimeUnit.MILLISECONDS) : in.timeout() ); } } From ac3e392377fab69db374f6188127885bf4c43b10 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Wed, 13 Sep 2023 14:41:31 +0200 Subject: [PATCH 02/11] Update docs/changelog/99526.yaml --- docs/changelog/99526.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/99526.yaml diff --git a/docs/changelog/99526.yaml b/docs/changelog/99526.yaml new file mode 100644 index 0000000000000..6072aeb4125ac --- /dev/null +++ b/docs/changelog/99526.yaml @@ -0,0 +1,5 @@ +pr: 99526 +summary: Add timeout to ESQL +area: ES|QL +type: enhancement +issues: [] From c75823ea60557a5ab12b7954b40c1dd06356fca4 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Thu, 14 Sep 2023 10:37:52 +0200 Subject: [PATCH 03/11] Move timeouts to Settings and cancel tasks on timeout --- .../xpack/esql/plugin/ComputeService.java | 65 ++++++++++--------- .../xpack/esql/plugin/EsqlPlugin.java | 16 +++++ .../esql/plugin/TransportEsqlQueryAction.java | 23 ++++--- 3 files changed, 63 insertions(+), 41 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index f8e4446927350..5ec88b11adafd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -147,35 +147,40 @@ public void execute( requestFilter, concreteIndices, originalIndices, - wrapTimeout(configuration, transportService.getThreadPool(), listener.delegateFailureAndWrap((delegate, targetNodes) -> { - final ExchangeSourceHandler exchangeSource = exchangeService.createSourceHandler( - sessionId, - queryPragmas.exchangeBufferSize(), - ESQL_THREAD_POOL_NAME - ); - try ( - Releasable ignored = exchangeSource::decRef; - RefCountingListener requestRefs = new RefCountingListener(delegate.map(unused -> collectedPages)) - ) { - final AtomicBoolean cancelled = new AtomicBoolean(); - // wait until the source handler is completed - exchangeSource.addCompletionListener(requestRefs.acquire()); - // run compute on the coordinator - var computeContext = new ComputeContext(sessionId, List.of(), configuration, exchangeSource, null); - runCompute(rootTask, computeContext, coordinatorPlan, cancelOnFailure(rootTask, cancelled, requestRefs.acquire())); - // run compute on remote nodes - // TODO: This is wrong, we need to be able to cancel - runComputeOnRemoteNodes( + wrapTimeout( + configuration, + transportService.getThreadPool(), + rootTask, + listener.delegateFailureAndWrap((delegate, targetNodes) -> { + final ExchangeSourceHandler exchangeSource = exchangeService.createSourceHandler( sessionId, - rootTask, - configuration, - dataNodePlan, - exchangeSource, - targetNodes, - () -> cancelOnFailure(rootTask, cancelled, requestRefs.acquire()).map(unused -> null) + queryPragmas.exchangeBufferSize(), + ESQL_THREAD_POOL_NAME ); - } - })) + try ( + Releasable ignored = exchangeSource::decRef; + RefCountingListener requestRefs = new RefCountingListener(delegate.map(unused -> collectedPages)) + ) { + final AtomicBoolean cancelled = new AtomicBoolean(); + // wait until the source handler is completed + exchangeSource.addCompletionListener(requestRefs.acquire()); + // run compute on the coordinator + var computeContext = new ComputeContext(sessionId, List.of(), configuration, exchangeSource, null); + runCompute(rootTask, computeContext, coordinatorPlan, cancelOnFailure(rootTask, cancelled, requestRefs.acquire())); + // run compute on remote nodes + // TODO: This is wrong, we need to be able to cancel + runComputeOnRemoteNodes( + sessionId, + rootTask, + configuration, + dataNodePlan, + exchangeSource, + targetNodes, + () -> cancelOnFailure(rootTask, cancelled, requestRefs.acquire()).map(unused -> null) + ); + } + }) + ) ); } @@ -183,12 +188,14 @@ public void execute( private ActionListener> wrapTimeout( EsqlConfiguration configuration, ThreadPool pool, + CancellableTask task, ActionListener> l ) { if (configuration.timeout() != null) { return ListenerTimeouts.wrapWithTimeout(pool, configuration.timeout(), esqlExecutor, l, (x) -> { - String timeoutMessage = "ESQL query timed out after {}"; - l.onFailure(new ElasticsearchTimeoutException(timeoutMessage, configuration.timeout())); + LOGGER.debug("cancelling ESQL task {} on failure", task); + transportService.getTaskManager().cancelTaskAndDescendants(task, "timeout", false, ActionListener.noop()); + l.onFailure(new ElasticsearchTimeoutException("ESQL query timed out after {}", configuration.timeout())); }); } return l; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 2ab3feef9fa51..0926f244812de 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -76,6 +76,22 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { Setting.Property.NodeScope ); + public static final Setting QUERY_DEFAULT_TIMEOUT = Setting.intSetting( + "esql.query.default_timeout", + -1, + -1, + Integer.MAX_VALUE, + Setting.Property.NodeScope + ); + + public static final Setting QUERY_MAX_TIMEOUT = Setting.intSetting( + "esql.query.max_timeout", + -1, + -1, + Integer.MAX_VALUE, + Setting.Property.NodeScope + ); + @Override public Collection createComponents( Client client, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index b9aed6b3b9b38..7851782b29b10 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -40,8 +40,6 @@ public class TransportEsqlQueryAction extends HandledTransportAction { - public static TimeValue DEFAULT_TIMEOUT = new TimeValue(4, TimeUnit.MINUTES); - public static TimeValue MAX_TIMEOUT = new TimeValue(4, TimeUnit.MINUTES); private final PlanExecutor planExecutor; private final ComputeService computeService; private final ExchangeService exchangeService; @@ -88,8 +86,6 @@ protected void doExecute(Task task, EsqlQueryRequest request, ActionListener listener) { - TimeValue timeout = timeout(request); - EsqlConfiguration configuration = new EsqlConfiguration( request.zoneId() != null ? request.zoneId() : ZoneOffset.UTC, request.locale() != null ? request.locale() : Locale.US, @@ -98,7 +94,7 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener clusterService.getClusterName().value(), request.pragmas(), EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.get(settings), - timeout + timeout(request) ); String sessionId = sessionID(task); @@ -119,15 +115,18 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener } private TimeValue timeout(EsqlQueryRequest request) { - TimeValue rTimeout = request.timeout(); - if (rTimeout == null) { - return DEFAULT_TIMEOUT; + Integer defaultTimeoutMillis = EsqlPlugin.QUERY_DEFAULT_TIMEOUT.get(settings); + Integer maxTimeoutMillis = EsqlPlugin.QUERY_MAX_TIMEOUT.get(settings); + TimeValue defaultTimeout = defaultTimeoutMillis < 0 ? null : new TimeValue(defaultTimeoutMillis, TimeUnit.MILLISECONDS); + TimeValue maxTimeout = maxTimeoutMillis < 0 ? null : new TimeValue(maxTimeoutMillis, TimeUnit.MILLISECONDS); + TimeValue requestTimeout = request.timeout(); + if (requestTimeout == null) { + return defaultTimeout; } - TimeValue timeout = rTimeout.compareTo(MAX_TIMEOUT) > 0 ? MAX_TIMEOUT : rTimeout; - if (timeout.duration() < 0) { - return MAX_TIMEOUT; + if (requestTimeout.millis() < 0) { + return maxTimeout; } - return timeout; + return maxTimeoutMillis >= 0 && requestTimeout.millis() > maxTimeoutMillis ? maxTimeout : requestTimeout; } /** From 2f1856d74043f4a9cd3e3cbbfc1f5f2345f5890a Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Thu, 14 Sep 2023 12:58:57 +0200 Subject: [PATCH 04/11] Fix settings --- .../java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 0926f244812de..09cab48b06a52 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -128,7 +128,7 @@ public Collection createComponents( */ @Override public List> getSettings() { - return List.of(QUERY_RESULT_TRUNCATION_MAX_SIZE); + return List.of(QUERY_RESULT_TRUNCATION_MAX_SIZE, QUERY_DEFAULT_TIMEOUT, QUERY_MAX_TIMEOUT); } @Override From 2728c85e2102eac3bf94cb60f760df0bcfcc3458 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Mon, 18 Sep 2023 16:02:59 +0200 Subject: [PATCH 05/11] Add timeout tests --- .../xpack/esql/qa/single_node/RestEsqlIT.java | 44 ------- .../xpack/esql/action/EsqlTimeoutIT.java | 122 ++++++++++++++++++ 2 files changed, 122 insertions(+), 44 deletions(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlTimeoutIT.java diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 915c6145e3ce2..efb7192bbc3e8 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -52,50 +52,6 @@ public void testBasicEsql() throws IOException { assertEquals(List.of(List.of(499.5d)), result.get("values")); } - public void testTimeout() throws IOException { - StringBuilder b = new StringBuilder(); - for (int i = 0; i < 10000; i++) { - b.append(String.format(Locale.ROOT, """ - {"create":{"_index":"%s"}} - {"test":"value%s"} - """, testIndexName(), randomAlphaOfLength(20) + " " + randomAlphaOfLength(20) + " " + randomAlphaOfLength(20))); - } - Request bulk = new Request("POST", "/_bulk"); - bulk.addParameter("refresh", "true"); - bulk.addParameter("filter_path", "errors"); - bulk.setJsonEntity(b.toString()); - Response response = client().performRequest(bulk); - Assert.assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); - - // let's make it a bit expensive, so that the timeout will most likely trigger - RequestObjectBuilder builder = new RequestObjectBuilder().query(fromIndex() + """ - | grok test "%{NOTSPACE:a} %{NOTSPACE:b} %{NOTSPACE:c}" - | grok test "%{NOTSPACE:c} %{NOTSPACE:d} %{NOTSPACE:e}" - | grok test "%{NOTSPACE:f} %{NOTSPACE:g} %{NOTSPACE:h}" - | grok test "%{NOTSPACE:i} %{NOTSPACE:j} %{NOTSPACE:k}" - | grok test "%{WORD:l} %{WORD:m} %{WORD:n}" - | grok test "%{WORD:o} %{WORD:p} %{WORD:q}" - | grok test "%{USERNAME:r} %{USERNAME:s} %{USERNAME:t}" - | grok test "%{USERNAME:u} %{USERNAME:v} %{USERNAME:w}" - | grok test "%{DATA:x} %{DATA:y} %{DATA:z}" - | sort test - | limit 9999 - | sort b desc - | limit 9998 - | sort c - | limit 9997 - | eval s = length(a) + length(b) * length(c) - | stats max(s) - """); - if (Build.current().isSnapshot()) { - builder.pragmas(Settings.builder().put("data_partitioning", "shard").build()); - } - builder.timeout(0); - builder.build(); - ResponseException re = expectThrows(ResponseException.class, () -> runEsql(builder)); - assertThat(EntityUtils.toString(re.getResponse().getEntity()), containsString("ESQL query timed out after 0s")); - } - public void testInvalidPragma() throws IOException { assumeTrue("pragma only enabled on snapshot builds", Build.current().isSnapshot()); RequestObjectBuilder builder = new RequestObjectBuilder().query("row a = 1, b = 2"); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlTimeoutIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlTimeoutIT.java new file mode 100644 index 0000000000000..8d939ceb48f82 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlTimeoutIT.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.ScriptPlugin; +import org.elasticsearch.script.LongFieldScript; +import org.elasticsearch.script.ScriptContext; +import org.elasticsearch.script.ScriptEngine; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class EsqlTimeoutIT extends AbstractEsqlIntegTestCase { + + @Before + public void setupIndex() throws IOException { + + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("the_field"); + { + mapping.field("type", "long"); + mapping.startObject("script").field("source", "").field("lang", "sleep").endObject(); + } + mapping.endObject(); + } + mapping.endObject(); + client().admin().indices().prepareCreate("test").setSettings(Map.of("number_of_shards", 1)).setMapping(mapping.endObject()).get(); + + client().prepareBulk() + .add(client().prepareIndex("test").setId("0").setSource("foo", 0)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), SleepFieldPlugin.class); + } + + public void testTimeout() { + ElasticsearchTimeoutException re = expectThrows(ElasticsearchTimeoutException.class, () -> { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("from test"); + request.timeout(new TimeValue(500, TimeUnit.MILLISECONDS)); + client().execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); + }); + assertThat(re.getMessage(), containsString("ESQL query timed out after 500ms")); + } + + public void testNoTimeout() { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("from test"); + request.timeout(new TimeValue(1500, TimeUnit.MILLISECONDS)); + EsqlQueryResponse a = client().execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); + assertThat(a.columns().size(), equalTo(2)); + } + + public static class SleepFieldPlugin extends Plugin implements ScriptPlugin { + @Override + public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { + return new ScriptEngine() { + @Override + public String getType() { + return "sleep"; + } + + @Override + @SuppressWarnings("unchecked") + public FactoryType compile( + String name, + String code, + ScriptContext context, + Map params + ) { + return (FactoryType) (LongFieldScript.Factory) ( + fieldName, + params1, + searchLookup, + onScriptError) -> ctx -> new LongFieldScript(fieldName, params1, searchLookup, onScriptError, ctx) { + @Override + public void execute() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + emit(34); + } + }; + } + + @Override + public Set> getSupportedContexts() { + return Set.of(LongFieldScript.CONTEXT); + } + }; + } + } +} From 7b71306329259feee863c9b2d19c27372d04d68c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 18 Sep 2023 11:53:35 -0700 Subject: [PATCH 06/11] Move timeout to TransportEsqlQueryAction --- .../xpack/esql/plugin/ComputeService.java | 79 +++++++------------ .../esql/plugin/TransportEsqlQueryAction.java | 21 ++++- 2 files changed, 47 insertions(+), 53 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 38aa5a2b2ba8b..5c14a19afd6f2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.plugin; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.search.SearchRequest; @@ -17,7 +16,6 @@ import org.elasticsearch.action.search.SearchShardsResponse; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; -import org.elasticsearch.action.support.ListenerTimeouts; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.action.support.SubscribableListener; @@ -141,64 +139,41 @@ public void execute( LOGGER.debug("Sending data node plan\n{}\n with filter [{}]", dataNodePlan, requestFilter); String[] originalIndices = PlannerUtils.planOriginalIndices(physicalPlan); - computeTargetNodes( rootTask, requestFilter, concreteIndices, originalIndices, - wrapTimeout( - configuration, - transportService.getThreadPool(), - rootTask, - listener.delegateFailureAndWrap((delegate, targetNodes) -> { - final ExchangeSourceHandler exchangeSource = exchangeService.createSourceHandler( + listener.delegateFailureAndWrap((delegate, targetNodes) -> { + final ExchangeSourceHandler exchangeSource = exchangeService.createSourceHandler( + sessionId, + queryPragmas.exchangeBufferSize(), + ESQL_THREAD_POOL_NAME + ); + try ( + Releasable ignored = exchangeSource::decRef; + RefCountingListener requestRefs = new RefCountingListener(delegate.map(unused -> collectedPages)) + ) { + final AtomicBoolean cancelled = new AtomicBoolean(); + // wait until the source handler is completed + exchangeSource.addCompletionListener(requestRefs.acquire()); + // run compute on the coordinator + var computeContext = new ComputeContext(sessionId, List.of(), configuration, exchangeSource, null); + runCompute(rootTask, computeContext, coordinatorPlan, cancelOnFailure(rootTask, cancelled, requestRefs.acquire())); + // run compute on remote nodes + // TODO: This is wrong, we need to be able to cancel + runComputeOnRemoteNodes( sessionId, - queryPragmas.exchangeBufferSize(), - ESQL_THREAD_POOL_NAME + rootTask, + configuration, + dataNodePlan, + exchangeSource, + targetNodes, + () -> cancelOnFailure(rootTask, cancelled, requestRefs.acquire()).map(unused -> null) ); - try ( - Releasable ignored = exchangeSource::decRef; - RefCountingListener requestRefs = new RefCountingListener(delegate.map(unused -> collectedPages)) - ) { - final AtomicBoolean cancelled = new AtomicBoolean(); - // wait until the source handler is completed - exchangeSource.addCompletionListener(requestRefs.acquire()); - // run compute on the coordinator - var computeContext = new ComputeContext(sessionId, List.of(), configuration, exchangeSource, null); - runCompute(rootTask, computeContext, coordinatorPlan, cancelOnFailure(rootTask, cancelled, requestRefs.acquire())); - // run compute on remote nodes - // TODO: This is wrong, we need to be able to cancel - runComputeOnRemoteNodes( - sessionId, - rootTask, - configuration, - dataNodePlan, - exchangeSource, - targetNodes, - () -> cancelOnFailure(rootTask, cancelled, requestRefs.acquire()).map(unused -> null) - ); - } - }) - ) + } + }) ); - - } - - private ActionListener> wrapTimeout( - EsqlConfiguration configuration, - ThreadPool pool, - CancellableTask task, - ActionListener> l - ) { - if (configuration.timeout() != null) { - return ListenerTimeouts.wrapWithTimeout(pool, configuration.timeout(), esqlExecutor, l, (x) -> { - LOGGER.debug("cancelling ESQL task {} on failure", task); - transportService.getTaskManager().cancelTaskAndDescendants(task, "timeout", false, ActionListener.noop()); - l.onFailure(new ElasticsearchTimeoutException("ESQL query timed out after {}", configuration.timeout())); - }); - } - return l; } private void runComputeOnRemoteNodes( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index d4ee92522d514..72160b89cd639 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -7,10 +7,12 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.ListenerTimeouts; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -21,6 +23,7 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.ColumnInfo; @@ -47,6 +50,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction listener) { // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can - requestExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, l))); + final ActionListener wrappedListener; + final TimeValue timeout = timeout(request); + if (timeout != null) { + final ThreadPool threadPool = transportService.getThreadPool(); + final var executor = threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME); + wrappedListener = ListenerTimeouts.wrapWithTimeout(threadPool, timeout, executor, listener, l -> { + logger.debug("cancelling ESQL task {} on timeout", task); + final TaskManager taskManager = transportService.getTaskManager(); + taskManager.cancelTaskAndDescendants((CancellableTask) task, "timeout", false, ActionListener.noop()); + listener.onFailure(new ElasticsearchTimeoutException("ESQL query timed out after {}", timeout)); + }); + } else { + wrappedListener = listener; + } + requestExecutor.execute(ActionRunnable.wrap(wrappedListener, l -> doExecuteForked(task, request, l))); } private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener listener) { From 30af9ddc7b81071530e70682425adceebee06c40 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 18 Sep 2023 12:45:29 -0700 Subject: [PATCH 07/11] Wait for outstanding tasks on timeout --- .../xpack/esql/plugin/TransportEsqlQueryAction.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 72160b89cd639..09e670bf5bdef 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -95,8 +95,9 @@ protected void doExecute(Task task, EsqlQueryRequest request, ActionListener { logger.debug("cancelling ESQL task {} on timeout", task); final TaskManager taskManager = transportService.getTaskManager(); - taskManager.cancelTaskAndDescendants((CancellableTask) task, "timeout", false, ActionListener.noop()); - listener.onFailure(new ElasticsearchTimeoutException("ESQL query timed out after {}", timeout)); + taskManager.cancelTaskAndDescendants((CancellableTask) task, "timeout", false, ActionListener.running(() -> { + listener.onFailure(new ElasticsearchTimeoutException("ESQL query timed out after {}", timeout)); + })); }); } else { wrappedListener = listener; From e40a2bbeaf91e90764838811370684d91f66c857 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 18 Sep 2023 12:47:33 -0700 Subject: [PATCH 08/11] Larger timeout for case without timeout --- .../java/org/elasticsearch/xpack/esql/action/EsqlTimeoutIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlTimeoutIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlTimeoutIT.java index 8d939ceb48f82..8bdd1fcd7c738 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlTimeoutIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlTimeoutIT.java @@ -73,7 +73,7 @@ public void testTimeout() { public void testNoTimeout() { EsqlQueryRequest request = new EsqlQueryRequest(); request.query("from test"); - request.timeout(new TimeValue(1500, TimeUnit.MILLISECONDS)); + request.timeout(new TimeValue(1, TimeUnit.MINUTES)); EsqlQueryResponse a = client().execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); assertThat(a.columns().size(), equalTo(2)); } From 94e1f906c6fba199bfb3da8ed2b7794d5651fb5f Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Tue, 19 Sep 2023 09:56:06 +0200 Subject: [PATCH 09/11] Remove max timeout --- .../xpack/esql/EsqlTestUtils.java | 3 +-- .../xpack/esql/plugin/EsqlPlugin.java | 10 +--------- .../esql/plugin/TransportEsqlQueryAction.java | 18 +++++------------- .../xpack/esql/session/EsqlConfiguration.java | 15 ++------------- .../org/elasticsearch/xpack/esql/CsvTests.java | 3 +-- .../optimizer/PhysicalPlanOptimizerTests.java | 3 +-- .../xpack/esql/planner/EvalMapperTests.java | 10 +--------- .../planner/LocalExecutionPlannerTests.java | 3 +-- .../esql/plugin/DataNodeRequestTests.java | 3 +-- .../EsqlConfigurationSerializationTests.java | 7 ++----- 10 files changed, 16 insertions(+), 59 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 2725600c45100..dd86742785d15 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -87,8 +87,7 @@ public byte[] max(String field, DataType dataType) { null, null, new QueryPragmas(Settings.EMPTY), - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY), - null + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY) ); private EsqlTestUtils() {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 09cab48b06a52..83914b75f1d9b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -84,14 +84,6 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { Setting.Property.NodeScope ); - public static final Setting QUERY_MAX_TIMEOUT = Setting.intSetting( - "esql.query.max_timeout", - -1, - -1, - Integer.MAX_VALUE, - Setting.Property.NodeScope - ); - @Override public Collection createComponents( Client client, @@ -128,7 +120,7 @@ public Collection createComponents( */ @Override public List> getSettings() { - return List.of(QUERY_RESULT_TRUNCATION_MAX_SIZE, QUERY_DEFAULT_TIMEOUT, QUERY_MAX_TIMEOUT); + return List.of(QUERY_RESULT_TRUNCATION_MAX_SIZE, QUERY_DEFAULT_TIMEOUT); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 09e670bf5bdef..8a2c2b58231f1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -113,8 +113,7 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener null, clusterService.getClusterName().value(), request.pragmas(), - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.get(settings), - timeout(request) + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.get(settings) ); String sessionId = sessionID(task); @@ -135,18 +134,11 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener } private TimeValue timeout(EsqlQueryRequest request) { - Integer defaultTimeoutMillis = EsqlPlugin.QUERY_DEFAULT_TIMEOUT.get(settings); - Integer maxTimeoutMillis = EsqlPlugin.QUERY_MAX_TIMEOUT.get(settings); - TimeValue defaultTimeout = defaultTimeoutMillis < 0 ? null : new TimeValue(defaultTimeoutMillis, TimeUnit.MILLISECONDS); - TimeValue maxTimeout = maxTimeoutMillis < 0 ? null : new TimeValue(maxTimeoutMillis, TimeUnit.MILLISECONDS); - TimeValue requestTimeout = request.timeout(); - if (requestTimeout == null) { - return defaultTimeout; - } - if (requestTimeout.millis() < 0) { - return maxTimeout; + if (request.timeout() != null) { + return request.timeout(); } - return maxTimeoutMillis >= 0 && requestTimeout.millis() > maxTimeoutMillis ? maxTimeout : requestTimeout; + Integer defaultTimeoutMillis = EsqlPlugin.QUERY_DEFAULT_TIMEOUT.get(settings); + return defaultTimeoutMillis < 0 ? null : new TimeValue(defaultTimeoutMillis, TimeUnit.MILLISECONDS); } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java index 1859b5646ea62..3d6f5ce18816f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java @@ -10,7 +10,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.ql.session.Configuration; @@ -27,30 +26,25 @@ public class EsqlConfiguration extends Configuration implements Writeable { private final Locale locale; - private final TimeValue timeout; - public EsqlConfiguration( ZoneId zi, Locale locale, String username, String clusterName, QueryPragmas pragmas, - int resultTruncationMaxSize, - TimeValue timeout + int resultTruncationMaxSize ) { super(zi, username, clusterName); this.locale = locale; this.pragmas = pragmas; this.resultTruncationMaxSize = resultTruncationMaxSize; - this.timeout = timeout; } public EsqlConfiguration(StreamInput in) throws IOException { super(in.readZoneId(), Instant.ofEpochSecond(in.readVLong(), in.readVInt()), in.readOptionalString(), in.readOptionalString()); - this.locale = Locale.forLanguageTag(in.readString()); + locale = Locale.forLanguageTag(in.readString()); this.pragmas = new QueryPragmas(in); this.resultTruncationMaxSize = in.readVInt(); - this.timeout = in.readOptionalTimeValue(); } @Override @@ -64,7 +58,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(locale.toLanguageTag()); pragmas.writeTo(out); out.writeVInt(resultTruncationMaxSize); - out.writeOptionalTimeValue(timeout); } public QueryPragmas pragmas() { @@ -79,10 +72,6 @@ public Locale locale() { return locale; } - public TimeValue timeout() { - return timeout; - } - @Override public boolean equals(Object o) { if (super.equals(o)) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 2127ba0b4c443..8a18d7b3a26ed 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -156,8 +156,7 @@ public class CsvTests extends ESTestCase { null, null, new QueryPragmas(Settings.builder().put("page_size", randomPageSize()).build()), - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY), - null + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY) ); private final FunctionRegistry functionRegistry = new EsqlFunctionRegistry(); private final EsqlParser parser = new EsqlParser(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 9ee99ea9c7304..746a34eaedce4 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -137,8 +137,7 @@ public static List readScriptSpec() { null, null, new QueryPragmas(settings), - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(settings), - null + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(settings) ) }; }).toList(); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java index 9eb408eebb3f6..7956892c34645 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java @@ -64,15 +64,7 @@ public class EvalMapperTests extends ESTestCase { private static final FieldAttribute LONG = field("long", DataTypes.LONG); private static final FieldAttribute DATE = field("date", DataTypes.DATETIME); - private static final EsqlConfiguration TEST_CONFIG = new EsqlConfiguration( - ZoneOffset.UTC, - Locale.US, - "test", - null, - null, - 10000000, - null - ); + private static final EsqlConfiguration TEST_CONFIG = new EsqlConfiguration(ZoneOffset.UTC, Locale.US, "test", null, null, 10000000); @ParametersFactory(argumentFormatting = "%1$s") public static List params() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 15a14bed5aa1f..b1965f19e44f5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -133,8 +133,7 @@ private EsqlConfiguration config() { "test_user", "test_cluser", pragmas, - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(null), - null + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(null) ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java index 166ba0a83213f..fae2a1caeab4c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java @@ -182,8 +182,7 @@ static PhysicalPlan mapAndMaybeOptimize(LogicalPlan logicalPlan) { null, null, new QueryPragmas(Settings.EMPTY), - EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY), - null + EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY) ); var physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)); FunctionRegistry functionRegistry = new EsqlFunctionRegistry(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlConfigurationSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlConfigurationSerializationTests.java index 2c11ad1107230..bc1146c492a73 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlConfigurationSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlConfigurationSerializationTests.java @@ -10,12 +10,10 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.lucene.DataPartitioning; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.io.IOException; -import java.util.concurrent.TimeUnit; public class EsqlConfigurationSerializationTests extends AbstractWireSerializingTestCase { @@ -37,7 +35,7 @@ public static EsqlConfiguration randomConfiguration() { var clusterName = randomAlphaOfLengthBetween(3, 10); var truncation = randomNonNegativeInt(); - return new EsqlConfiguration(zoneId, locale, username, clusterName, randomQueryPragmas(), truncation, null); + return new EsqlConfiguration(zoneId, locale, username, clusterName, randomQueryPragmas(), truncation); } @Override @@ -56,8 +54,7 @@ protected EsqlConfiguration mutateInstance(EsqlConfiguration in) throws IOExcept ordinal == 4 ? new QueryPragmas(Settings.builder().put(QueryPragmas.EXCHANGE_BUFFER_SIZE.getKey(), between(1, 10)).build()) : in.pragmas(), - ordinal == 5 ? in.resultTruncationMaxSize() + randomIntBetween(3, 10) : in.resultTruncationMaxSize(), - ordinal == 6 ? new TimeValue(randomIntBetween(0, 1000000), TimeUnit.MILLISECONDS) : in.timeout() + ordinal == 5 ? in.resultTruncationMaxSize() + randomIntBetween(3, 10) : in.resultTruncationMaxSize() ); } } From 6cf73f201bdf133be9d4d88211803a405b955286 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Tue, 19 Sep 2023 14:58:06 +0200 Subject: [PATCH 10/11] Use time units for timeout configuration --- .../xpack/esql/action/EsqlQueryRequest.java | 7 ++++--- .../org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java | 8 ++++---- .../xpack/esql/plugin/TransportEsqlQueryAction.java | 9 ++------- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java index 8fd5a5cd3b9d6..fb18270ff9f53 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -179,8 +178,10 @@ private static ObjectParser objectParser(Supplier request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD); - parser.declareInt((request, value) -> request.timeout(new TimeValue(value, TimeUnit.MILLISECONDS)), TIMEOUT_FIELD); - + parser.declareString( + (request, val) -> request.timeout(TimeValue.parseTimeValue(val, TIMEOUT_FIELD.getPreferredName())), + TIMEOUT_FIELD + ); return parser; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 83914b75f1d9b..07f6a24251b76 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -30,6 +30,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.indices.IndicesService; @@ -76,11 +77,10 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { Setting.Property.NodeScope ); - public static final Setting QUERY_DEFAULT_TIMEOUT = Setting.intSetting( + public static final Setting QUERY_DEFAULT_TIMEOUT = Setting.timeSetting( "esql.query.default_timeout", - -1, - -1, - Integer.MAX_VALUE, + new TimeValue(0), + new TimeValue(0), Setting.Property.NodeScope ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 8a2c2b58231f1..e6ef6320e505b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; public class TransportEsqlQueryAction extends HandledTransportAction { @@ -89,7 +88,7 @@ protected void doExecute(Task task, EsqlQueryRequest request, ActionListener wrappedListener; final TimeValue timeout = timeout(request); - if (timeout != null) { + if (timeout.millis() > 0) { final ThreadPool threadPool = transportService.getThreadPool(); final var executor = threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME); wrappedListener = ListenerTimeouts.wrapWithTimeout(threadPool, timeout, executor, listener, l -> { @@ -134,11 +133,7 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener } private TimeValue timeout(EsqlQueryRequest request) { - if (request.timeout() != null) { - return request.timeout(); - } - Integer defaultTimeoutMillis = EsqlPlugin.QUERY_DEFAULT_TIMEOUT.get(settings); - return defaultTimeoutMillis < 0 ? null : new TimeValue(defaultTimeoutMillis, TimeUnit.MILLISECONDS); + return request.timeout() != null ? request.timeout() : EsqlPlugin.QUERY_DEFAULT_TIMEOUT.get(settings); } /** From a938286322cdf70b6fcf282f55913049f14f4581 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Wed, 20 Sep 2023 13:17:42 +0200 Subject: [PATCH 11/11] Update docs/changelog/99526.yaml --- docs/changelog/99526.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/changelog/99526.yaml b/docs/changelog/99526.yaml index 6072aeb4125ac..e077022c09c19 100644 --- a/docs/changelog/99526.yaml +++ b/docs/changelog/99526.yaml @@ -2,4 +2,5 @@ pr: 99526 summary: Add timeout to ESQL area: ES|QL type: enhancement -issues: [] +issues: + - 99141