Skip to content

Commit

Permalink
Async search status (#64554)
Browse files Browse the repository at this point in the history
Introduce async search status API

GET /_async_search/status/<id>

The API is restricted to the monitoring_user role.

For a running async search, the response is:

```js
{
  "id" : <id>,
  "is_running" : true,
  "is_partial" : true,
  "start_time_in_millis" : 1583945890986,
  "expiration_time_in_millis" : 1584377890986,
  "_shards" : {
      "total" : 562,
      "successful" : 188,
      "skipped" : 0,
      "failed" : 0
  }
}
```

For a completed async search, an additional
`completion_status` fields is added.

```js
{
  "id" : <id>,
  "is_running" : false,
  "is_partial" : false,
  "start_time_in_millis" : 1583945890986,
  "expiration_time_in_millis" : 1584377890986,
  "_shards" : {
      "total" : 562,
      "successful" : 562,
      "skipped" : 0,
      "failed" : 0
  },
 "completion_status" : 200
}
```

Closes #57537
Backport for #62947
  • Loading branch information
mayya-sharipova authored Nov 3, 2020
1 parent 21c1b40 commit ee08069
Show file tree
Hide file tree
Showing 16 changed files with 743 additions and 5 deletions.
87 changes: 84 additions & 3 deletions docs/reference/search/async-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ set to `false`.
==== Get async search

The get async search API retrieves the results of a previously submitted
async search request given its id. If the {es} {security-features} are enabled.
async search request given its id. If the {es} {security-features} are enabled,
the access to the results of a specific async search is restricted to the user
that submitted it in the first place.

Expand All @@ -161,8 +161,8 @@ GET /_async_search/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsd
"timed_out" : false,
"num_reduce_phases" : 46, <4>
"_shards" : {
"total" : 562, <5>
"successful" : 188,
"total" : 562,
"successful" : 188, <5>
"skipped" : 0,
"failed" : 0
},
Expand Down Expand Up @@ -222,6 +222,87 @@ override such value and extend the validity of the request. When this period
expires, the search, if still running, is cancelled. If the search is
completed, its saved results are deleted.


[[get-async-search-status]]
==== Get async search status
The get async search status API, without retrieving search results, shows
only the status of a previously submitted async search request given its `id`.
If the {es} {security-features} are enabled, the access to the get async
search status API is restricted to the
<<built-in-roles, monitoring_user role>>.

[source,console,id=get-async-search-status-example]
--------------------------------------------------
GET /_async_search/status/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=
--------------------------------------------------
// TEST[continued s/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=/\${body.id}/]

[source,console-result]
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"is_running" : true,
"is_partial" : true,
"start_time_in_millis" : 1583945890986,
"expiration_time_in_millis" : 1584377890986,
"_shards" : {
"total" : 562,
"successful" : 188, <1>
"skipped" : 0,
"failed" : 0
}
}
--------------------------------------------------
// TEST[skip: a sample output of a status of a running async search]

<1> Indicates how many shards have executed the query so far.

For an async search that has been completed, the status response has
an additional `completion_status` field that shows the status
code of the completed async search.
[source,console-result]
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"is_running" : false,
"is_partial" : false,
"start_time_in_millis" : 1583945890986,
"expiration_time_in_millis" : 1584377890986,
"_shards" : {
"total" : 562,
"successful" : 562,
"skipped" : 0,
"failed" : 0
},
"completion_status" : 200 <1>
}
--------------------------------------------------
// TEST[skip: a sample output of a status of a completed async search]

<1> Indicates that the async search was successfully completed


[source,console-result]
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"is_running" : false,
"is_partial" : true,
"start_time_in_millis" : 1583945890986,
"expiration_time_in_millis" : 1584377890986,
"_shards" : {
"total" : 562,
"successful" : 450,
"skipped" : 0,
"failed" : 112
},
"completion_status" : 503 <1>
}
--------------------------------------------------
// TEST[skip: a sample output of a status of a completed async search]

<1> Indicates that the async search was completed with an error

[[delete-async-search]]
==== Delete async search

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ public int length() {
return array.length();
}

/**
* Returns the size of the expected results, excluding potential null values.
* @return the number of non-null elements
*/
public int nonNullLength() {
if (nonNullList != null) {
return nonNullList.size();
}
int count = 0;
for (int i = 0; i < array.length(); i++) {
if (array.get(i) != null) {
count++;
}
}
return count;
}

/**
* Sets the element at position {@code i} to the given value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;

import java.util.ArrayList;
Expand All @@ -41,6 +42,7 @@
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;


@SuiteScopeTestCase
public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
private static String indexName;
Expand Down Expand Up @@ -187,10 +189,19 @@ public void testRestartAfterCompletion() throws Exception {
}
ensureTaskCompletion(initial.getId());
restartTaskNode(initial.getId(), indexName);

AsyncSearchResponse response = getAsyncSearch(initial.getId());
assertNotNull(response.getSearchResponse());
assertFalse(response.isRunning());
assertFalse(response.isPartial());

AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
assertFalse(statusResponse.isRunning());
assertFalse(statusResponse.isPartial());
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(numShards, statusResponse.getSuccessfulShards());
assertEquals(RestStatus.OK, statusResponse.getCompletionStatus());

deleteAsyncSearch(response.getId());
ensureTaskRemoval(response.getId());
}
Expand Down Expand Up @@ -231,6 +242,15 @@ public void testCleanupOnFailure() throws Exception {
assertTrue(response.isPartial());
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
assertThat(response.getSearchResponse().getShardFailures().length, equalTo(numShards));

AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
assertFalse(statusResponse.isRunning());
assertTrue(statusResponse.isPartial());
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(0, statusResponse.getSuccessfulShards());
assertEquals(numShards, statusResponse.getFailedShards());
assertThat(statusResponse.getCompletionStatus().getStatus(), greaterThanOrEqualTo(400));

deleteAsyncSearch(initial.getId());
ensureTaskRemoval(initial.getId());
}
Expand All @@ -246,6 +266,9 @@ public void testInvalidId() throws Exception {
}
assertFalse(response.isRunning());
}

ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncStatus("invalid"));
assertThat(exc.getMessage(), containsString("invalid id"));
}

public void testNoIndex() throws Exception {
Expand Down Expand Up @@ -287,6 +310,13 @@ public void testCancellation() throws Exception {
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));

AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
assertTrue(statusResponse.isRunning());
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(0, statusResponse.getSuccessfulShards());
assertEquals(0, statusResponse.getSkippedShards());
assertEquals(0, statusResponse.getFailedShards());

deleteAsyncSearch(response.getId());
ensureTaskRemoval(response.getId());
}
Expand Down Expand Up @@ -321,6 +351,17 @@ public void testUpdateRunningKeepAlive() throws Exception {
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));

AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
assertTrue(statusResponse.isRunning());
assertTrue(statusResponse.isPartial());
assertThat(statusResponse.getExpirationTime(), greaterThan(expirationTime));
assertThat(statusResponse.getStartTime(), lessThan(statusResponse.getExpirationTime()));
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(0, statusResponse.getSuccessfulShards());
assertEquals(0, statusResponse.getFailedShards());
assertEquals(0, statusResponse.getSkippedShards());
assertEquals(null, statusResponse.getCompletionStatus());

response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
assertThat(response.getExpirationTime(), lessThan(expirationTime));
ensureTaskNotRunning(response.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
Expand Down Expand Up @@ -167,6 +170,10 @@ protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) thr
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id).setKeepAlive(keepAlive)).get();
}

protected AsyncStatusResponse getAsyncStatus(String id) throws ExecutionException, InterruptedException {
return client().execute(GetAsyncStatusAction.INSTANCE, new GetAsyncStatusRequest(id)).get();
}

protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException {
return client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(id)).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;

import java.util.Arrays;
Expand All @@ -34,7 +35,8 @@ public final class AsyncSearch extends Plugin implements ActionPlugin {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(SubmitAsyncSearchAction.INSTANCE, TransportSubmitAsyncSearchAction.class),
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class)
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class),
new ActionHandler<>(GetAsyncStatusAction.INSTANCE, TransportGetAsyncStatusAction.class)
);
}

Expand All @@ -46,6 +48,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
return Arrays.asList(
new RestSubmitAsyncSearchAction(),
new RestGetAsyncSearchAction(),
new RestGetAsyncStatusAction(),
new RestDeleteAsyncSearchAction()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTask;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -347,6 +348,15 @@ private synchronized void checkCancellation() {
}
}

/**
* Returns the status of {@link AsyncSearchTask}
*/
public AsyncStatusResponse getStatusResponse() {
MutableSearchResponse mutableSearchResponse = searchResponse.get();
assert mutableSearchResponse != null;
return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis);
}

class Listener extends SearchProgressActionListener {
@Override
protected void onQueryResult(int shardIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.action.search.ShardSearchFailure;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -32,6 +34,7 @@
* run concurrently to 1 and ensures that we pause the search progress when an {@link AsyncSearchResponse} is built.
*/
class MutableSearchResponse {
private static final TotalHits EMPTY_TOTAL_HITS = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
private final int totalShards;
private final int skippedShards;
private final Clusters clusters;
Expand Down Expand Up @@ -77,7 +80,7 @@ class MutableSearchResponse {
this.queryFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards);
this.isPartial = true;
this.threadContext = threadContext;
this.totalHits = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
this.totalHits = EMPTY_TOTAL_HITS;
}

/**
Expand Down Expand Up @@ -184,6 +187,58 @@ synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
failure, isPartial, frozen == false, task.getStartTime(), expirationTime);
}


/**
* Creates an {@link AsyncStatusResponse} -- status of an async response.
* Response is created based on the current state of the mutable response or based on {@code finalResponse} if it is available.
* @param asyncExecutionId – id of async search request
* @param startTime – start time of task
* @param expirationTime – expiration time of async search request
* @return response representing the status of async search
*/
synchronized AsyncStatusResponse toStatusResponse(String asyncExecutionId, long startTime, long expirationTime) {
if (finalResponse != null) {
return new AsyncStatusResponse(
asyncExecutionId,
false,
false,
startTime,
expirationTime,
finalResponse.getTotalShards(),
finalResponse.getSuccessfulShards(),
finalResponse.getSkippedShards(),
finalResponse.getShardFailures() != null ? finalResponse.getShardFailures().length : 0,
finalResponse.status()
);
}
if (failure != null) {
return new AsyncStatusResponse(
asyncExecutionId,
false,
true,
startTime,
expirationTime,
totalShards,
successfulShards,
skippedShards,
queryFailures == null ? 0 : queryFailures.nonNullLength(),
ExceptionsHelper.status(ExceptionsHelper.unwrapCause(failure))
);
}
return new AsyncStatusResponse(
asyncExecutionId,
true,
true,
startTime,
expirationTime,
totalShards,
successfulShards,
skippedShards,
queryFailures == null ? 0 : queryFailures.nonNullLength(),
null // for a still running search, completion status is null
);
}

synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
long expirationTime,
ElasticsearchException reduceException) {
Expand Down
Loading

0 comments on commit ee08069

Please sign in to comment.