diff --git a/docs/reference/search/async-search.asciidoc b/docs/reference/search/async-search.asciidoc index 72241e2387848..91afbf334e188 100644 --- a/docs/reference/search/async-search.asciidoc +++ b/docs/reference/search/async-search.asciidoc @@ -138,7 +138,7 @@ set to `false`. ==== Get async search The get async search API retrieves the results of a previously submitted -async search request given its id. If the {es} {security-features} are enabled. +async search request given its id. If the {es} {security-features} are enabled, the access to the results of a specific async search is restricted to the user that submitted it in the first place. @@ -161,8 +161,8 @@ GET /_async_search/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsd "timed_out" : false, "num_reduce_phases" : 46, <4> "_shards" : { - "total" : 562, <5> - "successful" : 188, + "total" : 562, + "successful" : 188, <5> "skipped" : 0, "failed" : 0 }, @@ -222,6 +222,87 @@ override such value and extend the validity of the request. When this period expires, the search, if still running, is cancelled. If the search is completed, its saved results are deleted. + +[[get-async-search-status]] +==== Get async search status +The get async search status API, without retrieving search results, shows +only the status of a previously submitted async search request given its `id`. +If the {es} {security-features} are enabled, the access to the get async +search status API is restricted to the +<>. + +[source,console,id=get-async-search-status-example] +-------------------------------------------------- +GET /_async_search/status/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc= +-------------------------------------------------- +// TEST[continued s/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=/\${body.id}/] + +[source,console-result] +-------------------------------------------------- +{ + "id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=", + "is_running" : true, + "is_partial" : true, + "start_time_in_millis" : 1583945890986, + "expiration_time_in_millis" : 1584377890986, + "_shards" : { + "total" : 562, + "successful" : 188, <1> + "skipped" : 0, + "failed" : 0 + } +} +-------------------------------------------------- +// TEST[skip: a sample output of a status of a running async search] + +<1> Indicates how many shards have executed the query so far. + +For an async search that has been completed, the status response has +an additional `completion_status` field that shows the status +code of the completed async search. +[source,console-result] +-------------------------------------------------- +{ + "id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=", + "is_running" : false, + "is_partial" : false, + "start_time_in_millis" : 1583945890986, + "expiration_time_in_millis" : 1584377890986, + "_shards" : { + "total" : 562, + "successful" : 562, + "skipped" : 0, + "failed" : 0 + }, + "completion_status" : 200 <1> +} +-------------------------------------------------- +// TEST[skip: a sample output of a status of a completed async search] + +<1> Indicates that the async search was successfully completed + + +[source,console-result] +-------------------------------------------------- +{ + "id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=", + "is_running" : false, + "is_partial" : true, + "start_time_in_millis" : 1583945890986, + "expiration_time_in_millis" : 1584377890986, + "_shards" : { + "total" : 562, + "successful" : 450, + "skipped" : 0, + "failed" : 112 + }, + "completion_status" : 503 <1> +} +-------------------------------------------------- +// TEST[skip: a sample output of a status of a completed async search] + +<1> Indicates that the async search was completed with an error + [[delete-async-search]] ==== Delete async search diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java index fa82aa0ac634a..18aa1578a6619 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java @@ -45,6 +45,23 @@ public int length() { return array.length(); } + /** + * Returns the size of the expected results, excluding potential null values. + * @return the number of non-null elements + */ + public int nonNullLength() { + if (nonNullList != null) { + return nonNullList.size(); + } + int count = 0; + for (int i = 0; i < array.length(); i++) { + if (array.get(i) != null) { + count++; + } + } + return count; + } + /** * Sets the element at position {@code i} to the given value. * diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index cb317e72ae3fc..628e472e17cf2 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; +import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; import java.util.ArrayList; @@ -41,6 +42,7 @@ import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; + @SuiteScopeTestCase public class AsyncSearchActionIT extends AsyncSearchIntegTestCase { private static String indexName; @@ -187,10 +189,19 @@ public void testRestartAfterCompletion() throws Exception { } ensureTaskCompletion(initial.getId()); restartTaskNode(initial.getId(), indexName); + AsyncSearchResponse response = getAsyncSearch(initial.getId()); assertNotNull(response.getSearchResponse()); assertFalse(response.isRunning()); assertFalse(response.isPartial()); + + AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId()); + assertFalse(statusResponse.isRunning()); + assertFalse(statusResponse.isPartial()); + assertEquals(numShards, statusResponse.getTotalShards()); + assertEquals(numShards, statusResponse.getSuccessfulShards()); + assertEquals(RestStatus.OK, statusResponse.getCompletionStatus()); + deleteAsyncSearch(response.getId()); ensureTaskRemoval(response.getId()); } @@ -231,6 +242,15 @@ public void testCleanupOnFailure() throws Exception { assertTrue(response.isPartial()); assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards)); assertThat(response.getSearchResponse().getShardFailures().length, equalTo(numShards)); + + AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId()); + assertFalse(statusResponse.isRunning()); + assertTrue(statusResponse.isPartial()); + assertEquals(numShards, statusResponse.getTotalShards()); + assertEquals(0, statusResponse.getSuccessfulShards()); + assertEquals(numShards, statusResponse.getFailedShards()); + assertThat(statusResponse.getCompletionStatus().getStatus(), greaterThanOrEqualTo(400)); + deleteAsyncSearch(initial.getId()); ensureTaskRemoval(initial.getId()); } @@ -246,6 +266,9 @@ public void testInvalidId() throws Exception { } assertFalse(response.isRunning()); } + + ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncStatus("invalid")); + assertThat(exc.getMessage(), containsString("invalid id")); } public void testNoIndex() throws Exception { @@ -287,6 +310,13 @@ public void testCancellation() throws Exception { assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0)); assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); + AsyncStatusResponse statusResponse = getAsyncStatus(response.getId()); + assertTrue(statusResponse.isRunning()); + assertEquals(numShards, statusResponse.getTotalShards()); + assertEquals(0, statusResponse.getSuccessfulShards()); + assertEquals(0, statusResponse.getSkippedShards()); + assertEquals(0, statusResponse.getFailedShards()); + deleteAsyncSearch(response.getId()); ensureTaskRemoval(response.getId()); } @@ -321,6 +351,17 @@ public void testUpdateRunningKeepAlive() throws Exception { assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0)); assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); + AsyncStatusResponse statusResponse = getAsyncStatus(response.getId()); + assertTrue(statusResponse.isRunning()); + assertTrue(statusResponse.isPartial()); + assertThat(statusResponse.getExpirationTime(), greaterThan(expirationTime)); + assertThat(statusResponse.getStartTime(), lessThan(statusResponse.getExpirationTime())); + assertEquals(numShards, statusResponse.getTotalShards()); + assertEquals(0, statusResponse.getSuccessfulShards()); + assertEquals(0, statusResponse.getFailedShards()); + assertEquals(0, statusResponse.getSkippedShards()); + assertEquals(null, statusResponse.getCompletionStatus()); + response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1)); assertThat(response.getExpirationTime(), lessThan(expirationTime)); ensureTaskNotRunning(response.getId()); diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index 5bff8c39989d8..2d6f42d81e362 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -36,10 +36,13 @@ import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction; import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; +import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; +import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; +import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; @@ -167,6 +170,10 @@ protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) thr return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id).setKeepAlive(keepAlive)).get(); } + protected AsyncStatusResponse getAsyncStatus(String id) throws ExecutionException, InterruptedException { + return client().execute(GetAsyncStatusAction.INSTANCE, new GetAsyncStatusRequest(id)).get(); + } + protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException { return client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(id)).get(); } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java index f21e3628c6878..a15def25b012b 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java @@ -19,6 +19,7 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; +import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; import java.util.Arrays; @@ -34,7 +35,8 @@ public final class AsyncSearch extends Plugin implements ActionPlugin { public List> getActions() { return Arrays.asList( new ActionHandler<>(SubmitAsyncSearchAction.INSTANCE, TransportSubmitAsyncSearchAction.class), - new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class) + new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class), + new ActionHandler<>(GetAsyncStatusAction.INSTANCE, TransportGetAsyncStatusAction.class) ); } @@ -46,6 +48,7 @@ public List getRestHandlers(Settings settings, RestController restC return Arrays.asList( new RestSubmitAsyncSearchAction(), new RestGetAsyncSearchAction(), + new RestGetAsyncStatusAction(), new RestDeleteAsyncSearchAction() ); } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 7bd4553776169..27ae530257ea7 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -31,6 +31,7 @@ import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.AsyncTask; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; +import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; import java.util.ArrayList; import java.util.HashMap; @@ -347,6 +348,15 @@ private synchronized void checkCancellation() { } } + /** + * Returns the status of {@link AsyncSearchTask} + */ + public AsyncStatusResponse getStatusResponse() { + MutableSearchResponse mutableSearchResponse = searchResponse.get(); + assert mutableSearchResponse != null; + return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis); + } + class Listener extends SearchProgressActionListener { @Override protected void onQueryResult(int shardIndex) { diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java index adf43b54736ee..d34917fa479b9 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java @@ -7,6 +7,7 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse.Clusters; import org.elasticsearch.action.search.ShardSearchFailure; @@ -17,6 +18,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; +import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; import java.util.ArrayList; import java.util.List; @@ -32,6 +34,7 @@ * run concurrently to 1 and ensures that we pause the search progress when an {@link AsyncSearchResponse} is built. */ class MutableSearchResponse { + private static final TotalHits EMPTY_TOTAL_HITS = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO); private final int totalShards; private final int skippedShards; private final Clusters clusters; @@ -77,7 +80,7 @@ class MutableSearchResponse { this.queryFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards); this.isPartial = true; this.threadContext = threadContext; - this.totalHits = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO); + this.totalHits = EMPTY_TOTAL_HITS; } /** @@ -184,6 +187,58 @@ synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task, failure, isPartial, frozen == false, task.getStartTime(), expirationTime); } + + /** + * Creates an {@link AsyncStatusResponse} -- status of an async response. + * Response is created based on the current state of the mutable response or based on {@code finalResponse} if it is available. + * @param asyncExecutionId – id of async search request + * @param startTime – start time of task + * @param expirationTime – expiration time of async search request + * @return response representing the status of async search + */ + synchronized AsyncStatusResponse toStatusResponse(String asyncExecutionId, long startTime, long expirationTime) { + if (finalResponse != null) { + return new AsyncStatusResponse( + asyncExecutionId, + false, + false, + startTime, + expirationTime, + finalResponse.getTotalShards(), + finalResponse.getSuccessfulShards(), + finalResponse.getSkippedShards(), + finalResponse.getShardFailures() != null ? finalResponse.getShardFailures().length : 0, + finalResponse.status() + ); + } + if (failure != null) { + return new AsyncStatusResponse( + asyncExecutionId, + false, + true, + startTime, + expirationTime, + totalShards, + successfulShards, + skippedShards, + queryFailures == null ? 0 : queryFailures.nonNullLength(), + ExceptionsHelper.status(ExceptionsHelper.unwrapCause(failure)) + ); + } + return new AsyncStatusResponse( + asyncExecutionId, + true, + true, + startTime, + expirationTime, + totalShards, + successfulShards, + skippedShards, + queryFailures == null ? 0 : queryFailures.nonNullLength(), + null // for a still running search, completion status is null + ); + } + synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task, long expirationTime, ElasticsearchException reduceException) { diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestGetAsyncStatusAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestGetAsyncStatusAction.java new file mode 100644 index 0000000000000..ec89c3282ef69 --- /dev/null +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestGetAsyncStatusAction.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.search; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestStatusToXContentListener; +import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest; +import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction; + +import java.util.List; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestGetAsyncStatusAction extends BaseRestHandler { + @Override + public List routes() { + return unmodifiableList(asList(new Route(GET, "/_async_search/status/{id}"))); + } + + + @Override + public String getName() { + return "async_search_status_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + GetAsyncStatusRequest statusRequest = new GetAsyncStatusRequest(request.param("id")); + return channel -> client.execute(GetAsyncStatusAction.INSTANCE, statusRequest, new RestStatusToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java new file mode 100644 index 0000000000000..292519aa649cb --- /dev/null +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java @@ -0,0 +1,111 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.search; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTask; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; +import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest; +import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; +import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; +import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction; + +import java.util.Objects; + +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; + +public class TransportGetAsyncStatusAction extends HandledTransportAction { + private final TransportService transportService; + private final ClusterService clusterService; + private final AsyncTaskIndexService store; + + @Inject + public TransportGetAsyncStatusAction(TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + NamedWriteableRegistry registry, + Client client, + ThreadPool threadPool) { + super(GetAsyncStatusAction.NAME, transportService, actionFilters, GetAsyncStatusRequest::new); + this.transportService = transportService; + this.clusterService = clusterService; + this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, + threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); + } + + @Override + protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListener listener) { + AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); + DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId()); + if (node == null || Objects.equals(node, clusterService.localNode())) { + retrieveStatus(request, listener); + } else { + TransportRequestOptions.Builder builder = TransportRequestOptions.builder(); + transportService.sendRequest(node, GetAsyncStatusAction.NAME, request, builder.build(), + new ActionListenerResponseHandler<>(listener, AsyncStatusResponse::new, ThreadPool.Names.SAME)); + } + } + + private void retrieveStatus(GetAsyncStatusRequest request, ActionListener listener) { + long nowInMillis = System.currentTimeMillis(); + AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); + try { + AsyncTask task = (AsyncTask) taskManager.getTask(searchId.getTaskId().getId()); + if ((task instanceof AsyncSearchTask) && (task.getExecutionId().equals(searchId))) { + AsyncStatusResponse response = ((AsyncSearchTask) task).getStatusResponse(); + sendFinalResponse(request, response, nowInMillis, listener); + } else { + getStatusResponseFromIndex(searchId, request, nowInMillis, listener); + } + } catch (Exception exc) { + listener.onFailure(exc); + } + } + + /** + * Get a status response from index + */ + private void getStatusResponseFromIndex(AsyncExecutionId searchId, + GetAsyncStatusRequest request, long nowInMillis, ActionListener listener) { + store.getStatusResponse(searchId, AsyncStatusResponse::getStatusFromAsyncSearchResponseWithExpirationTime, + new ActionListener() { + @Override + public void onResponse(AsyncStatusResponse asyncStatusResponse) { + sendFinalResponse(request, asyncStatusResponse, nowInMillis, listener); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + ); + } + + private static void sendFinalResponse(GetAsyncStatusRequest request, + AsyncStatusResponse response, long nowInMillis, ActionListener listener) { + if (response.getExpirationTime() < nowInMillis) { // check if the result has expired + listener.onFailure(new ResourceNotFoundException(request.getId())); + } else { + listener.onResponse(response); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 9551e9b23f597..08f665751d6c2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -196,6 +196,7 @@ import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; +import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotFeatureSetUsage; @@ -486,6 +487,7 @@ public List> getClientActions() { // Async Search SubmitAsyncSearchAction.INSTANCE, GetAsyncSearchAction.INSTANCE, + GetAsyncStatusAction.INSTANCE, DeleteAsyncResultAction.INSTANCE, // Point in time OpenPointInTimeAction.INSTANCE, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index f22a918eac8a2..0b3b1ffaea9ad 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -38,6 +38,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; +import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer; @@ -49,6 +50,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiFunction; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; @@ -309,6 +311,37 @@ public void getResponse(AsyncExecutionId asyncExecutionId, )); } + + /** + * Gets the status response of the async search from the index + * @param asyncExecutionId – id of the async search + * @param statusProducer – a producer of the status from the stored async search response and expirationTime + * @param listener – listener to report result to + */ + public void getStatusResponse( + AsyncExecutionId asyncExecutionId, + BiFunction statusProducer, ActionListener listener) { + GetRequest internalGet = new GetRequest(index) + .preference(asyncExecutionId.getEncoded()) + .id(asyncExecutionId.getDocId()); + client.get(internalGet, ActionListener.wrap( + get -> { + if (get.isExists() == false) { + listener.onFailure(new ResourceNotFoundException(asyncExecutionId.getEncoded())); + return; + } + String encoded = (String) get.getSource().get(RESULT_FIELD); + if (encoded != null) { + Long expirationTime = (Long) get.getSource().get(EXPIRATION_TIME_FIELD); + listener.onResponse(statusProducer.apply(decodeResponse(encoded), expirationTime)); + } else { + listener.onResponse(null); + } + }, + listener::onFailure + )); + } + /** * Ensures that the current user can read the specified response without actually reading it */ diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/GetAsyncStatusRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/GetAsyncStatusRequest.java new file mode 100644 index 0000000000000..16400781ee13f --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/GetAsyncStatusRequest.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +/** + * A request class to get a status update of the async search request + */ +public class GetAsyncStatusRequest extends ActionRequest { + private final String id; + + /** + * Creates a new request + * @param id The id of the search progress request. + */ + public GetAsyncStatusRequest(String id) { + this.id = id; + } + + public GetAsyncStatusRequest(StreamInput in) throws IOException { + super(in); + this.id = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + /** + * Returns the id of the async search. + */ + public String getId() { + return id; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetAsyncStatusRequest request = (GetAsyncStatusRequest) o; + return Objects.equals(id, request.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncStatusResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncStatusResponse.java new file mode 100644 index 0000000000000..dd372809b6a45 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncStatusResponse.java @@ -0,0 +1,220 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.search.action; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.StatusToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestActions; + +import java.io.IOException; + +import static org.elasticsearch.rest.RestStatus.OK; + +/** + * A response of an async search request. + */ +public class AsyncStatusResponse extends ActionResponse implements StatusToXContentObject { + private final String id; + private final boolean isRunning; + private final boolean isPartial; + private final long startTimeMillis; + private final long expirationTimeMillis; + private final int totalShards; + private final int successfulShards; + private final int skippedShards; + private final int failedShards; + private final RestStatus completionStatus; + + public AsyncStatusResponse(String id, + boolean isRunning, + boolean isPartial, + long startTimeMillis, + long expirationTimeMillis, + int totalShards, + int successfulShards, + int skippedShards, + int failedShards, + RestStatus completionStatus) { + this.id = id; + this.isRunning = isRunning; + this.isPartial = isPartial; + this.startTimeMillis = startTimeMillis; + this.expirationTimeMillis = expirationTimeMillis; + this.totalShards = totalShards; + this.successfulShards = successfulShards; + this.skippedShards = skippedShards; + this.failedShards = failedShards; + this.completionStatus = completionStatus; + } + + public static AsyncStatusResponse getStatusFromAsyncSearchResponseWithExpirationTime(AsyncSearchResponse asyncSearchResponse, + long expirationTimeMillis) { + int totalShards = 0; + int successfulShards = 0; + int skippedShards = 0; + int failedShards = 0; + RestStatus completionStatus = null; + SearchResponse searchResponse = asyncSearchResponse.getSearchResponse(); + if (searchResponse != null) { + totalShards = searchResponse.getTotalShards(); + successfulShards = searchResponse.getSuccessfulShards(); + skippedShards = searchResponse.getSkippedShards(); + failedShards = searchResponse.getFailedShards(); + } + if (asyncSearchResponse.isRunning() == false) { + if (searchResponse != null) { + completionStatus = searchResponse.status(); + } else { + Exception failure = asyncSearchResponse.getFailure(); + if (failure != null) { + completionStatus = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(failure)); + } + } + } + return new AsyncStatusResponse( + asyncSearchResponse.getId(), + asyncSearchResponse.isRunning(), + asyncSearchResponse.isPartial(), + asyncSearchResponse.getStartTime(), + expirationTimeMillis, + totalShards, + successfulShards, + skippedShards, + failedShards, + completionStatus + ); + } + + public AsyncStatusResponse(StreamInput in) throws IOException { + this.id = in.readString(); + this.isRunning = in.readBoolean(); + this.isPartial = in.readBoolean(); + this.startTimeMillis = in.readLong(); + this.expirationTimeMillis = in.readLong(); + this.totalShards = in.readVInt(); + this.successfulShards = in.readVInt(); + this.skippedShards = in.readVInt(); + this.failedShards = in.readVInt(); + this.completionStatus = (this.isRunning == false) ? RestStatus.readFrom(in) : null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + out.writeBoolean(isRunning); + out.writeBoolean(isPartial); + out.writeLong(startTimeMillis); + out.writeLong(expirationTimeMillis); + out.writeVInt(totalShards); + out.writeVInt(successfulShards); + out.writeVInt(skippedShards); + out.writeVInt(failedShards); + if (isRunning == false) { + RestStatus.writeTo(out, completionStatus); + } + } + + @Override + public RestStatus status() { + return OK; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("id", id); + builder.field("is_running", isRunning); + builder.field("is_partial", isPartial); + builder.timeField("start_time_in_millis", "start_time", startTimeMillis); + builder.timeField("expiration_time_in_millis", "expiration_time", expirationTimeMillis); + RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, skippedShards, failedShards, null); + if (isRunning == false) { // completion status information is only available for a completed search + builder.field("completion_status", completionStatus.getStatus()); + } + builder.endObject(); + return builder; + } + + /** + * Returns the id of the async search status request. + */ + public String getId() { + return id; + } + + /** + * Returns {@code true} if the search is still running in the cluster, + * or {@code false} if the search has been completed. + */ + public boolean isRunning() { + return isRunning; + } + + /** + * Returns {@code true} if the search results are partial. + * This could be either because async search hasn't finished yet, + * or if it finished and some shards have failed. + */ + public boolean isPartial() { + return isPartial; + } + + /** + * Returns a timestamp when the search tasks started, in milliseconds since epoch. + */ + public long getStartTime() { + return startTimeMillis; + } + + /** + * Returns a timestamp when the search will be expired, in milliseconds since epoch. + */ + public long getExpirationTime() { + return expirationTimeMillis; + } + + /** + * Returns the total number of shards the search is executed on. + */ + public int getTotalShards() { + return totalShards; + } + + /** + * Returns the number of successful shards the search was executed on. + */ + public int getSuccessfulShards() { + return successfulShards; + } + + /** + * Returns the number of skipped shards due to pre-filtering. + */ + public int getSkippedShards() { + return skippedShards; + } + + /** + * Returns the number of failed shards the search was executed on. + */ + public int getFailedShards() { + return failedShards; + } + + /** + * For a completed async search returns the completion status. + * For a still running async search returns {@code null}. + */ + public RestStatus getCompletionStatus() { + return completionStatus; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncStatusAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncStatusAction.java new file mode 100644 index 0000000000000..056fee076d25f --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncStatusAction.java @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.search.action; + +import org.elasticsearch.action.ActionType; + +public class GetAsyncStatusAction extends ActionType { + public static final GetAsyncStatusAction INSTANCE = new GetAsyncStatusAction(); + public static final String NAME = "cluster:monitor/async_search/status"; + + private GetAsyncStatusAction() { + super(NAME, AsyncStatusResponse::new); + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/async_search.status.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/async_search.status.json new file mode 100644 index 0000000000000..54d56282513cb --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/async_search.status.json @@ -0,0 +1,25 @@ +{ + "async_search.status":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html", + "description": "Retrieves the status of a previously submitted async search request given its ID." + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_async_search/status/{id}", + "methods":[ + "GET" + ], + "parts":{ + "id":{ + "type":"string", + "description":"The async search ID" + } + } + } + ] + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/10_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/10_basic.yml index 54c076d480b02..70c8edc5226c6 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/10_basic.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/10_basic.yml @@ -109,6 +109,14 @@ - match: { response.hits.hits.0._source.max: 1 } - match: { response.aggregations.max.value: 3.0 } + - do: + async_search.status: + id: "$id" + - match: { id: $id } + - match: { is_running: false } + - match: { is_partial: false } + - match: { completion_status: 200 } + # test with typed_keys: - do: async_search.get: @@ -132,6 +140,11 @@ async_search.get: id: "$id" + - do: + catch: missing + async_search.status: + id: "$id" + - do: catch: missing async_search.delete: