Skip to content

Commit

Permalink
ESQL: Add async ID and is_running headers to ESQL async query (#111840)
Browse files Browse the repository at this point in the history
Add headers to async ESQL queries to show the status and query ID without having to parse the body.

ESQL part of #109576
  • Loading branch information
ivancea authored Aug 21, 2024
1 parent d76e1af commit bf1ec5d
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 5 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/111840.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111840
summary: "ESQL: Add async ID and `is_running` headers to ESQL async query"
area: ES|QL
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ public Map<String, Object> getTransientHeaders() {
}

/**
* Add the {@code value} for the specified {@code key} Any duplicate {@code value} is ignored.
* Add the {@code value} for the specified {@code key}. Any duplicate {@code value} is ignored.
*
* @param key the header name
* @param value the header value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
* A class that contains all information related to a submitted async execution.
*/
public final class AsyncExecutionId {
public static final String ASYNC_EXECUTION_ID_HEADER = "X-Elasticsearch-Async-Id";
public static final String ASYNC_EXECUTION_IS_RUNNING_HEADER = "X-Elasticsearch-Async-Is-Running";

private final String docId;
private final TaskId taskId;
private final String encoded;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,17 +903,24 @@ public static Map<String, Object> runEsqlAsync(
checkKeepOnCompletion(requestObject, json);
String id = (String) json.get("id");

var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);

if (id == null) {
// no id returned from an async call, must have completed immediately and without keep_on_completion
assertThat(requestObject.keepOnCompletion(), either(nullValue()).or(is(false)));
assertThat((boolean) json.get("is_running"), is(false));
if (supportsAsyncHeaders) {
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is("?0"));
}
assertWarnings(response, expectedWarnings, expectedWarningsRegex);
json.remove("is_running"); // remove this to not mess up later map assertions
return Collections.unmodifiableMap(json);
} else {
// async may not return results immediately, so may need an async get
assertThat(id, is(not(emptyOrNullString())));
if ((boolean) json.get("is_running") == false) {
boolean isRunning = (boolean) json.get("is_running");
if (isRunning == false) {
// must have completed immediately so keep_on_completion must be true
assertThat(requestObject.keepOnCompletion(), is(true));
assertWarnings(response, expectedWarnings, expectedWarningsRegex);
Expand All @@ -925,6 +932,12 @@ public static Map<String, Object> runEsqlAsync(
assertThat(json.get("columns"), is(equalTo(List.<Map<String, String>>of()))); // no partial results
assertThat(json.get("pages"), nullValue());
}

if (supportsAsyncHeaders) {
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), is(id));
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is(isRunning ? "?1" : "?0"));
}

// issue a second request to "async get" the results
Request getRequest = prepareAsyncGetRequest(id);
getRequest.setOptions(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ public enum Cap {
*/
COMBINE_DISJUNCTIVE_CIDRMATCHES,

/**
* Support sending HTTP headers about the status of an async query.
*/
ASYNC_QUERY_STATUS_HEADERS,

/**
* Consider the upper bound when computing the interval in BUCKET auto mode.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
implements
AsyncTaskManagementService.AsyncOperation<EsqlQueryRequest, EsqlQueryResponse, EsqlQueryTask> {

private final ThreadPool threadPool;
private final PlanExecutor planExecutor;
private final ComputeService computeService;
private final ExchangeService exchangeService;
Expand Down Expand Up @@ -82,6 +83,7 @@ public TransportEsqlQueryAction(
) {
// TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
super(EsqlQueryAction.NAME, transportService, actionFilters, EsqlQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
this.planExecutor = planExecutor;
this.clusterService = clusterService;
this.requestExecutor = threadPool.executor(ThreadPool.Names.SEARCH);
Expand Down Expand Up @@ -181,9 +183,11 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {
List<ColumnInfoImpl> columns = result.schema().stream().map(c -> new ColumnInfoImpl(c.name(), c.dataType().outputType())).toList();
EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0");
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
String id = asyncTask.getExecutionId().getEncoded();
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async());
String asyncExecutionId = asyncTask.getExecutionId().getEncoded();
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), asyncExecutionId, false, request.async());
}
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
}
Expand Down Expand Up @@ -231,12 +235,15 @@ public EsqlQueryTask createTask(

@Override
public EsqlQueryResponse initialResponse(EsqlQueryTask task) {
var asyncExecutionId = task.getExecutionId().getEncoded();
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?1");
return new EsqlQueryResponse(
List.of(),
List.of(),
null,
false,
task.getExecutionId().getEncoded(),
asyncExecutionId,
true, // is_running
true // isAsync
);
Expand Down

0 comments on commit bf1ec5d

Please sign in to comment.