Skip to content

Commit

Permalink
Async search status
Browse files Browse the repository at this point in the history
Introduce async search status API

GET /_async_search/status/<id>

The API is restricted to the monitoring_user role.

For a running async search, the response is:

```js
{
  "id" : <id>,
  "is_running" : true,
  "start_time_in_millis" : 1583945890986,
  "expiration_time_in_millis" : 1584377890986,
  "_shards" : {
      "total" : 562,
      "successful" : 188,
      "skipped" : 0,
      "failed" : 0
  }
}
```

For a completed async search, the response is:

```js
{
  "id" : <id>
  "is_running" : false,
  "expiration_time_in_millis" : 1584377890986
}
```

----
Techincal details:
We first try to retrieve the status of the async search from tasks.
If this doesn't succeed, we retrieve it from an index: .async-search.
In case of retrieving from the index, we assume that the async search is
completed, and a shorter response for the status is returned.

Closes #57537
  • Loading branch information
mayya-sharipova committed Sep 28, 2020
1 parent 48db3f2 commit faafd18
Show file tree
Hide file tree
Showing 15 changed files with 710 additions and 5 deletions.
53 changes: 50 additions & 3 deletions docs/reference/search/async-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ set to `false`.
==== Get async search

The get async search API retrieves the results of a previously submitted
async search request given its id. If the {es} {security-features} are enabled.
async search request given its id. If the {es} {security-features} are enabled,
the access to the results of a specific async search is restricted to the user
that submitted it in the first place.

Expand All @@ -161,8 +161,8 @@ GET /_async_search/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsd
"timed_out" : false,
"num_reduce_phases" : 46, <4>
"_shards" : {
"total" : 562, <5>
"successful" : 188,
"total" : 562,
"successful" : 188, <5>
"skipped" : 0,
"failed" : 0
},
Expand Down Expand Up @@ -222,6 +222,53 @@ override such value and extend the validity of the request. When this period
expires, the search, if still running, is cancelled. If the search is
completed, its saved results are deleted.


[[get-async-search-status]]
==== Get async search status
The get async search status API, without retrieving search results, shows
only the status of a previously submitted async search request given its `id`.
If the {es} {security-features} are enabled, the access to the get async
search status API is restricted to the
<<built-in-roles, monitoring_user role>>.

[source,console,id=get-async-search-status-example]
--------------------------------------------------
GET /_async_search/status/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=
--------------------------------------------------
// TEST[continued s/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=/\${body.id}/]

[source,console-result]
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"is_running" : true,
"start_time_in_millis" : 1583945890986,
"expiration_time_in_millis" : 1584377890986,
"_shards" : {
"total" : 562,
"successful" : 188, <1>
"skipped" : 0,
"failed" : 0
}
}
--------------------------------------------------
// TEST[skip: a sample output of a status of a running async search]

<1> Indicates how many shards have executed the query so far.

For an async search that has been completed, the status response has a shorter form
that includes only `id`, `is_running` and `expiration_time_in_millis` fields.
[source,console-result]
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"is_running" : false,
"expiration_time_in_millis" : 1584377890986
}
--------------------------------------------------
// TEST[skip: a sample output of a status of a completed async search]


[[delete-async-search]]
==== Delete async search

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;

import java.util.ArrayList;
Expand Down Expand Up @@ -183,10 +184,15 @@ public void testRestartAfterCompletion() throws Exception {
}
ensureTaskCompletion(initial.getId());
restartTaskNode(initial.getId(), indexName);

AsyncSearchResponse response = getAsyncSearch(initial.getId());
assertNotNull(response.getSearchResponse());
assertFalse(response.isRunning());
assertFalse(response.isPartial());

AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
assertFalse(statusResponse.isRunning());

deleteAsyncSearch(response.getId());
ensureTaskRemoval(response.getId());
}
Expand Down Expand Up @@ -227,6 +233,10 @@ public void testCleanupOnFailure() throws Exception {
assertTrue(response.isPartial());
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
assertThat(response.getSearchResponse().getShardFailures().length, equalTo(numShards));

AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
assertFalse(statusResponse.isRunning());

deleteAsyncSearch(initial.getId());
ensureTaskRemoval(initial.getId());
}
Expand All @@ -243,6 +253,10 @@ public void testInvalidId() throws Exception {
}
assertFalse(response.isRunning());
}

ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncStatus("invalid"));
assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(exc.getMessage(), containsString("invalid id"));
}

public void testNoIndex() throws Exception {
Expand Down Expand Up @@ -284,6 +298,11 @@ public void testCancellation() throws Exception {
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));

AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
assertTrue(statusResponse.isRunning());
assertThat(statusResponse.getTotalShards(), equalTo(numShards));
assertThat(statusResponse.getSuccessfulShards(), equalTo(0));

deleteAsyncSearch(response.getId());
ensureTaskRemoval(response.getId());
}
Expand Down Expand Up @@ -318,6 +337,12 @@ public void testUpdateRunningKeepAlive() throws Exception {
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));

AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
assertThat(statusResponse.getExpirationTime(), greaterThan(expirationTime));
assertThat(statusResponse.getStartTime(), lessThan(statusResponse.getExpirationTime()));
assertTrue(statusResponse.isRunning());
assertThat(statusResponse.getTotalShards(), equalTo(numShards));

response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
assertThat(response.getExpirationTime(), lessThan(expirationTime));
ensureTaskNotRunning(response.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
Expand Down Expand Up @@ -154,6 +157,10 @@ protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) thr
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id).setKeepAlive(keepAlive)).get();
}

protected AsyncStatusResponse getAsyncStatus(String id) throws ExecutionException, InterruptedException {
return client().execute(GetAsyncStatusAction.INSTANCE, new GetAsyncStatusRequest(id)).get();
}

protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException {
return client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(id)).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;

import java.util.Arrays;
Expand All @@ -34,7 +35,8 @@ public final class AsyncSearch extends Plugin implements ActionPlugin {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(SubmitAsyncSearchAction.INSTANCE, TransportSubmitAsyncSearchAction.class),
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class)
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class),
new ActionHandler<>(GetAsyncStatusAction.INSTANCE, TransportGetAsyncStatusAction.class)
);
}

Expand All @@ -46,6 +48,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
return Arrays.asList(
new RestSubmitAsyncSearchAction(),
new RestGetAsyncSearchAction(),
new RestGetAsyncStatusAction(),
new RestDeleteAsyncSearchAction()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTask;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -347,6 +348,15 @@ private synchronized void checkCancellation() {
}
}

/**
* Returns the status of {@link AsyncSearchTask}
*/
public AsyncStatusResponse getStatusResponse() {
MutableSearchResponse mutableSearchResponse = searchResponse.get();
assert mutableSearchResponse != null;
return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis);
}

class Listener extends SearchProgressActionListener {
@Override
protected void onQueryResult(int shardIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.restoreResponseHeadersContext;
import static org.elasticsearch.xpack.core.search.action.AsyncStatusResponse.getCompletedSearchStatusResponse;

/**
* A mutable search response that allows to update and create partial response synchronously.
Expand All @@ -32,6 +34,7 @@
* run concurrently to 1 and ensures that we pause the search progress when an {@link AsyncSearchResponse} is built.
*/
class MutableSearchResponse {
private static final TotalHits EMPTY_TOTAL_HITS = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
private final int totalShards;
private final int skippedShards;
private final Clusters clusters;
Expand Down Expand Up @@ -77,7 +80,7 @@ class MutableSearchResponse {
this.queryFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards);
this.isPartial = true;
this.threadContext = threadContext;
this.totalHits = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
this.totalHits = EMPTY_TOTAL_HITS;
}

/**
Expand Down Expand Up @@ -184,6 +187,32 @@ synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
failure, isPartial, frozen == false, task.getStartTime(), expirationTime);
}


/**
* Creates an {@link AsyncStatusResponse} -- status of an async response.
* Response is created based on the current state of the mutable response or based on {@code finalResponse} if it is available.
* @param asyncExecutionId – id of async search request
* @param startTime – start time of task
* @param expirationTime – expiration time of async search request
* @return response representing the status of async search
*/
AsyncStatusResponse toStatusResponse(String asyncExecutionId, long startTime, long expirationTime) {
if (frozen == false) {
return new AsyncStatusResponse(
asyncExecutionId,
startTime,
expirationTime,
finalResponse != null ? finalResponse.getTotalShards() : totalShards,
finalResponse != null ? finalResponse.getSuccessfulShards() : successfulShards,
finalResponse != null ? finalResponse.getSkippedShards() : skippedShards,
finalResponse != null ? (finalResponse.getShardFailures() == null ? finalResponse.getShardFailures().length : 0) :
(queryFailures != null ? queryFailures.length() : 0)
);
} else {
return getCompletedSearchStatusResponse(asyncExecutionId, expirationTime);
}
}

synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
long expirationTime,
ElasticsearchException reduceException) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.search;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;

import java.util.List;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.rest.RestRequest.Method.GET;

public class RestGetAsyncStatusAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return unmodifiableList(asList(new Route(GET, "/_async_search/status/{id}")));
}


@Override
public String getName() {
return "async_search_status_action";
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
GetAsyncStatusRequest statusRequest = new GetAsyncStatusRequest(request.param("id"));
return channel -> client.execute(GetAsyncStatusAction.INSTANCE, statusRequest, new RestStatusToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.search;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncStatusService;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsyncStatusRequest, AsyncStatusResponse> {
private final TransportService transportService;
private final AsyncStatusService<AsyncSearchTask, AsyncStatusResponse> statusService;

@Inject
public TransportGetAsyncStatusAction(TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
NamedWriteableRegistry registry,
Client client,
ThreadPool threadPool) {
super(GetAsyncStatusAction.NAME, transportService, actionFilters, GetAsyncStatusRequest::new);
this.transportService = transportService;
AsyncTaskIndexService<AsyncStatusResponse> store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncStatusResponse::new, registry);
this.statusService = new AsyncStatusService<>(store, AsyncSearchTask.class, AsyncSearchTask::getStatusResponse,
AsyncStatusResponse::getCompletedSearchStatusResponse, transportService.getTaskManager(), clusterService);
}

@Override
protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListener<AsyncStatusResponse> listener) {
DiscoveryNode node = statusService.getNode(request.getId());
if (node == null || statusService.isLocalNode(node)) {
statusService.retrieveStatus(request, listener);
} else {
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
transportService.sendRequest(node, GetAsyncStatusAction.NAME, request, builder.build(),
new ActionListenerResponseHandler<>(listener, AsyncStatusResponse::new, ThreadPool.Names.SAME));
}
}
}
Loading

0 comments on commit faafd18

Please sign in to comment.