Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout to ESQL #99526

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/99526.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99526
summary: Add timeout to ESQL
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,50 @@ public void testBasicEsql() throws IOException {
assertEquals(List.of(List.of(499.5d)), result.get("values"));
}

public void testTimeout() throws IOException {
StringBuilder b = new StringBuilder();
for (int i = 0; i < 10000; i++) {
b.append(String.format(Locale.ROOT, """
{"create":{"_index":"%s"}}
{"test":"value%s"}
""", testIndexName(), randomAlphaOfLength(20) + " " + randomAlphaOfLength(20) + " " + randomAlphaOfLength(20)));
}
Request bulk = new Request("POST", "/_bulk");
bulk.addParameter("refresh", "true");
bulk.addParameter("filter_path", "errors");
bulk.setJsonEntity(b.toString());
Response response = client().performRequest(bulk);
Assert.assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8));

// let's make it a bit expensive, so that the timeout will most likely trigger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create a test plugin to avoid the flakiness here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering to add a sleep(x) function only in test, to simulate slow queries. Not sure it's what you mean, if you think there is a better way please let me know

RequestObjectBuilder builder = new RequestObjectBuilder().query(fromIndex() + """
| grok test "%{NOTSPACE:a} %{NOTSPACE:b} %{NOTSPACE:c}"
| grok test "%{NOTSPACE:c} %{NOTSPACE:d} %{NOTSPACE:e}"
| grok test "%{NOTSPACE:f} %{NOTSPACE:g} %{NOTSPACE:h}"
| grok test "%{NOTSPACE:i} %{NOTSPACE:j} %{NOTSPACE:k}"
| grok test "%{WORD:l} %{WORD:m} %{WORD:n}"
| grok test "%{WORD:o} %{WORD:p} %{WORD:q}"
| grok test "%{USERNAME:r} %{USERNAME:s} %{USERNAME:t}"
| grok test "%{USERNAME:u} %{USERNAME:v} %{USERNAME:w}"
| grok test "%{DATA:x} %{DATA:y} %{DATA:z}"
| sort test
| limit 9999
| sort b desc
| limit 9998
| sort c
| limit 9997
| eval s = length(a) + length(b) * length(c)
| stats max(s)
""");
if (Build.current().isSnapshot()) {
builder.pragmas(Settings.builder().put("data_partitioning", "shard").build());
}
builder.timeout(0);
builder.build();
ResponseException re = expectThrows(ResponseException.class, () -> runEsql(builder));
assertThat(EntityUtils.toString(re.getResponse().getEntity()), containsString("ESQL query timed out after 0s"));
}

public void testInvalidPragma() throws IOException {
assumeTrue("pragma only enabled on snapshot builds", Build.current().isSnapshot());
RequestObjectBuilder builder = new RequestObjectBuilder().query("row a = 1, b = 2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public RequestObjectBuilder timeZone(ZoneId zoneId) throws IOException {
return this;
}

public RequestObjectBuilder timeout(int timeout) throws IOException {
builder.field("timeout", timeout);
return this;
}

public RequestObjectBuilder pragmas(Settings pragmas) throws IOException {
builder.startObject("pragma");
pragmas.toXContent(builder, ToXContent.EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public byte[] max(String field, DataType dataType) {
null,
null,
new QueryPragmas(Settings.EMPTY),
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY)
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY),
null
);

private EsqlTestUtils() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.CancellableTask;
Expand All @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -65,13 +67,16 @@ public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesR
private static final ParseField PARAMS_FIELD = new ParseField("params");
private static final ParseField LOCALE_FIELD = new ParseField("locale");

private static final ParseField TIMEOUT_FIELD = new ParseField("timeout");

private static final ObjectParser<EsqlQueryRequest, Void> PARSER = objectParser(EsqlQueryRequest::new);

private String query;
private boolean columnar;
private ZoneId zoneId;
private Locale locale;
private QueryBuilder filter;
private TimeValue timeout;
private QueryPragmas pragmas = new QueryPragmas(Settings.EMPTY);
private List<TypedParamValue> params = List.of();

Expand Down Expand Up @@ -149,6 +154,14 @@ public void params(List<TypedParamValue> params) {
this.params = params;
}

public void timeout(TimeValue val) {
this.timeout = val;
}

public TimeValue timeout() {
return timeout;
}

public static EsqlQueryRequest fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
Expand All @@ -166,6 +179,7 @@ private static ObjectParser<EsqlQueryRequest, Void> objectParser(Supplier<EsqlQu
);
parser.declareField(EsqlQueryRequest::params, EsqlQueryRequest::parseParams, PARAMS_FIELD, VALUE_ARRAY);
parser.declareString((request, localeTag) -> request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD);
parser.declareInt((request, value) -> request.timeout(new TimeValue(value, TimeUnit.MILLISECONDS)), TIMEOUT_FIELD);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take the timeout as a time value directly like the _search API rather than an int?


return parser;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.search.SearchRequest;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.action.support.ListenerTimeouts;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -139,12 +141,13 @@ public void execute(
LOGGER.info("Sending data node plan\n{}\n with filter [{}]", dataNodePlan, requestFilter);

String[] originalIndices = PlannerUtils.planOriginalIndices(physicalPlan);

computeTargetNodes(
rootTask,
requestFilter,
concreteIndices,
originalIndices,
listener.delegateFailureAndWrap((delegate, targetNodes) -> {
wrapTimeout(configuration, transportService.getThreadPool(), listener.delegateFailureAndWrap((delegate, targetNodes) -> {
final ExchangeSourceHandler exchangeSource = exchangeService.createSourceHandler(
sessionId,
queryPragmas.exchangeBufferSize(),
Expand Down Expand Up @@ -172,8 +175,23 @@ public void execute(
() -> cancelOnFailure(rootTask, cancelled, requestRefs.acquire()).map(unused -> null)
);
}
})
}))
);

}

private ActionListener<List<TargetNode>> wrapTimeout(
EsqlConfiguration configuration,
ThreadPool pool,
ActionListener<List<TargetNode>> l
) {
if (configuration.timeout() != null) {
return ListenerTimeouts.wrapWithTimeout(pool, configuration.timeout(), esqlExecutor, l, (x) -> {
String timeoutMessage = "ESQL query timed out after {}";
l.onFailure(new ElasticsearchTimeoutException(timeoutMessage, configuration.timeout()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to cancel the ESQL request when it's timed out.

});
}
return l;
}

private void runComputeOnRemoteNodes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
Expand All @@ -35,9 +36,12 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRequest, EsqlQueryResponse> {

public static TimeValue DEFAULT_TIMEOUT = new TimeValue(4, TimeUnit.MINUTES);
public static TimeValue MAX_TIMEOUT = new TimeValue(4, TimeUnit.MINUTES);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default and max values are supposed to be lower than other timeouts that can happen in the infrastructure, eg. the a http timeouts on cloud proxies.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't have MAX_TIMEOUT since ES can be deployed to an environment without the proxy timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can move it to a Setting and disable it by default, so that we can enable it when we know we have a proxy timeout.

private final PlanExecutor planExecutor;
private final ComputeService computeService;
private final ExchangeService exchangeService;
Expand Down Expand Up @@ -84,16 +88,20 @@ protected void doExecute(Task task, EsqlQueryRequest request, ActionListener<Esq
}

private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
TimeValue timeout = timeout(request);

EsqlConfiguration configuration = new EsqlConfiguration(
request.zoneId() != null ? request.zoneId() : ZoneOffset.UTC,
request.locale() != null ? request.locale() : Locale.US,
// TODO: plug-in security
null,
clusterService.getClusterName().value(),
request.pragmas(),
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.get(settings)
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.get(settings),
timeout
);
String sessionId = sessionID(task);

planExecutor.esql(
request,
sessionId,
Expand All @@ -110,6 +118,18 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener
);
}

private TimeValue timeout(EsqlQueryRequest request) {
TimeValue rTimeout = request.timeout();
if (rTimeout == null) {
return DEFAULT_TIMEOUT;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we can have the default timeout at the request level here. Once we have the default, it will be very hard to remove as it's considered a breaking change. I think we need to discuss with the team. Let's remove it from this PR, then re-introduce it in a follow-up after the discussion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a default timeout could be a useful feature, but I'll move it to a Setting and I'll set it to -1 (ie. no timeout) by default.
Let's discuss it with the team though, maybe there are better options.

}
TimeValue timeout = rTimeout.compareTo(MAX_TIMEOUT) > 0 ? MAX_TIMEOUT : rTimeout;
if (timeout.duration() < 0) {
return MAX_TIMEOUT;
}
return timeout;
}

/**
* Returns the ID for this compute session. The ID is unique within the cluster, and is used
* to identify the compute-session across nodes. The ID is just the TaskID of the task that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.ql.session.Configuration;

Expand All @@ -26,25 +27,30 @@ public class EsqlConfiguration extends Configuration implements Writeable {

private final Locale locale;

private final TimeValue timeout;

public EsqlConfiguration(
ZoneId zi,
Locale locale,
String username,
String clusterName,
QueryPragmas pragmas,
int resultTruncationMaxSize
int resultTruncationMaxSize,
TimeValue timeout
) {
super(zi, username, clusterName);
this.locale = locale;
this.pragmas = pragmas;
this.resultTruncationMaxSize = resultTruncationMaxSize;
this.timeout = timeout;
}

public EsqlConfiguration(StreamInput in) throws IOException {
super(in.readZoneId(), Instant.ofEpochSecond(in.readVLong(), in.readVInt()), in.readOptionalString(), in.readOptionalString());
locale = Locale.forLanguageTag(in.readString());
this.locale = Locale.forLanguageTag(in.readString());
this.pragmas = new QueryPragmas(in);
this.resultTruncationMaxSize = in.readVInt();
this.timeout = in.readOptionalTimeValue();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need a TransportVersion bump since we are in tech preview and serverless is not out yet?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to bump the version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serverless is "out" in the sense that it's running in a production environment and serialization errors during upgrades may trip assertions and block a future release. Please don't skip this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are disabling ES|QL in Serverless now to simplify the developement process.

}

@Override
Expand All @@ -58,6 +64,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(locale.toLanguageTag());
pragmas.writeTo(out);
out.writeVInt(resultTruncationMaxSize);
out.writeOptionalTimeValue(timeout);
}

public QueryPragmas pragmas() {
Expand All @@ -72,6 +79,10 @@ public Locale locale() {
return locale;
}

public TimeValue timeout() {
return timeout;
}

@Override
public boolean equals(Object o) {
if (super.equals(o)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public class CsvTests extends ESTestCase {
null,
null,
new QueryPragmas(Settings.builder().put("page_size", randomPageSize()).build()),
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY)
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY),
null
);
private final FunctionRegistry functionRegistry = new EsqlFunctionRegistry();
private final EsqlParser parser = new EsqlParser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public static List<Object[]> readScriptSpec() {
null,
null,
new QueryPragmas(settings),
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(settings)
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(settings),
null
) };
}).toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,15 @@ public class EvalMapperTests extends ESTestCase {
private static final FieldAttribute LONG = field("long", DataTypes.LONG);
private static final FieldAttribute DATE = field("date", DataTypes.DATETIME);

private static final EsqlConfiguration TEST_CONFIG = new EsqlConfiguration(ZoneOffset.UTC, Locale.US, "test", null, null, 10000000);
private static final EsqlConfiguration TEST_CONFIG = new EsqlConfiguration(
ZoneOffset.UTC,
Locale.US,
"test",
null,
null,
10000000,
null
);

@ParametersFactory(argumentFormatting = "%1$s")
public static List<Object[]> params() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ private EsqlConfiguration config() {
"test_user",
"test_cluser",
pragmas,
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(null)
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(null),
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ static PhysicalPlan mapAndMaybeOptimize(LogicalPlan logicalPlan) {
null,
null,
new QueryPragmas(Settings.EMPTY),
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY)
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY),
null
);
var physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration));
FunctionRegistry functionRegistry = new EsqlFunctionRegistry();
Expand Down
Loading