Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce eql search status API (#68065) #68913

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/reference/eql/eql-search-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,9 @@ This search ID is only provided if one of the following conditions is met:
parameter is `true`.

You can use this ID with the <<get-async-eql-search-api,get async EQL search
API>> to get the current status and available results for the search.
API>> to get the current status and available results for the search or
<<get-async-eql-status-api,get async EQL status API>> to get only
the current status.
--

`is_partial`::
Expand Down
25 changes: 25 additions & 0 deletions docs/reference/eql/eql.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,28 @@ complete.
// TESTRESPONSE[s/"took": 2000/"took": $body.took/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]

Another more lightweight way to check the progress of an async search is to use
the <<get-async-eql-status-api,get async EQL status API>> with the search ID.

[source,console]
----
GET /_eql/search/status/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=
----
// TEST[skip: no access to search ID]

[source,console-result]
----
{
"id": "FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=",
"is_running": false,
"is_partial": false,
"expiration_time_in_millis" : 1611690295000,
"completion_status": 200
}
----
// TESTRESPONSE[s/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=/$body.id/]
// TESTRESPONSE[s/"expiration_time_in_millis": 1611690295000/"expiration_time_in_millis": $body.expiration_time_in_millis/]

[discrete]
[[eql-search-store-async-eql-search]]
=== Change the search retention period
Expand Down Expand Up @@ -660,6 +682,9 @@ GET /_eql/search/FjlmbndxNmJjU0RPdExBTGg0elNOOEEaQk9xSjJBQzBRMldZa1VVQ2pPa01YUTo
Saved synchronous searches are still subject to the `keep_alive` parameter's
retention period. When this period ends, the search and its results are deleted.

You can also check only the status of the saved synchronous search without
results by using <<get-async-eql-status-api,get async EQL status API>>.

You can also manually delete saved synchronous searches using the
<<delete-async-eql-search-api,delete async EQL search API>>.

Expand Down
120 changes: 120 additions & 0 deletions docs/reference/eql/get-async-eql-status-api.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
[role="xpack"]
[testenv="basic"]

[[get-async-eql-status-api]]
=== Get async EQL status API
++++
<titleabbrev>Get async EQL search status</titleabbrev>
++++
Returns the current status for an <<eql-search-async,async EQL search>> or
a <<eql-search-store-sync-eql-search,stored synchronous EQL search>>
without returning results. This is a more lightweight API than
<<get-async-eql-search-api,get async EQL search API>> as it doesn't return
search results, and reports only the status.

If the {es} {security-features} are enabled, the access to the get async
eql status API is restricted to the <<built-in-roles, monitoring_user role>>.

[source,console]
----
GET /_eql/search/status/FkpMRkJGS1gzVDRlM3g4ZzMyRGlLbkEaTXlJZHdNT09TU2VTZVBoNDM3cFZMUToxMDM=
----
// TEST[skip: no access to search ID]

[[get-async-eql-status-api-request]]
==== {api-request-title}

`GET /_eql/search/status/<search_id>`


[[get-async-eql-status-api-path-params]]
==== {api-path-parms-title}

`<search_id>`::
(Required, string)
Identifier for the search.
+
A search ID is provided in the <<eql-search-api,EQL search API>>'s response for
an <<eql-search-async,async search>>. A search ID is also provided if the
request's <<eql-search-api-keep-on-completion,`keep_on_completion`>> parameter
is `true`.

[role="child_attributes"]
[[get-async-eql-status-api-response-body]]
==== {api-response-body-title}

`id`::
(string)
Identifier for the search.

`is_running`::
(boolean)
If `true`, the search request is still executing.
If `false`, the search is completed.

`is_partial`::
(boolean)
If `true`, the response does not contain complete search results.
This could be because either the search is still running
(`is_running` status is `false`), or because it is already completed
(`is_running` status is `true`) and results are partial due to
failures or timeouts.

`start_time_in_millis`::
(Long)
For a running search shows a timestamp when the eql search
started, in milliseconds since the Unix epoch.

`expiration_time_in_millis`::
(long)
Shows a timestamp when the eql search will be expired, in milliseconds
since the Unix epoch. When this time is reached, the search and its results
are deleted, even if the search is still ongoing.

`completion_status`::
(Integer)
For a completed search shows the http status code of the completed
search.


[[eql-status-api-example]]
==== {api-examples-title}

[source,console]
----
GET /_eql/search/status/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=?keep_alive=5d
----
// TEST[skip: no access to search ID]

If the search is still running, the status response has the following form:

[source,console-result]
--------------------------------------------------
{
"id" : "FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=",
"is_running" : true,
"is_partial" : true,
"start_time_in_millis" : 1611690235000,
"expiration_time_in_millis" : 1611690295000

}
--------------------------------------------------
// TEST[skip: no access to search ID]

If the search is completed the status response doesn't have
`start_time_in_millis`, but has an additional `completion_status`
field that shows the status code of the completed eql search:

[source,console-result]
--------------------------------------------------
{
"id" : "FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=",
"is_running" : false,
"is_partial" : false,
"expiration_time_in_millis" : 1611690295000,
"completion_status" : 200 <1>
}
--------------------------------------------------
// TEST[skip: no access to search ID]

<1> Indicates that the eql search was successfully completed
3 changes: 3 additions & 0 deletions docs/reference/search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ For an overview of EQL and related tutorials, see <<eql>>.

* <<eql-search-api>>
* <<get-async-eql-search-api>>
* <<get-async-eql-status-api>>
* <<delete-async-eql-search-api>>


Expand All @@ -70,6 +71,8 @@ include::eql/eql-search-api.asciidoc[]

include::eql/get-async-eql-search-api.asciidoc[]

include::eql/get-async-eql-status-api.asciidoc[]

include::eql/delete-async-eql-search-api.asciidoc[]

include::search/count.asciidoc[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,16 @@ private synchronized void checkCancellation() {
}

/**
* Returns the status of {@link AsyncSearchTask}
* Returns the status from {@link AsyncSearchTask}
*/
public AsyncStatusResponse getStatusResponse() {
MutableSearchResponse mutableSearchResponse = searchResponse.get();
public static AsyncStatusResponse getStatusResponse(AsyncSearchTask asyncTask) {
MutableSearchResponse mutableSearchResponse = asyncTask.searchResponse.get();
assert mutableSearchResponse != null;
return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis);
return mutableSearchResponse.toStatusResponse(
asyncTask.searchId.getEncoded(),
asyncTask.getStartTime(),
asyncTask.expirationTimeMillis
);
}

class Listener extends SearchProgressActionListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.search;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -21,7 +20,6 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTask;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
Expand Down Expand Up @@ -55,56 +53,19 @@ public TransportGetAsyncStatusAction(TransportService transportService,
protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListener<AsyncStatusResponse> listener) {
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
if (node == null || Objects.equals(node, clusterService.localNode())) {
retrieveStatus(request, listener);
DiscoveryNode localNode = clusterService.state().getNodes().getLocalNode();
if (node == null || Objects.equals(node, localNode)) {
store.retrieveStatus(
request,
taskManager,
AsyncSearchTask.class,
AsyncSearchTask::getStatusResponse,
AsyncStatusResponse::getStatusFromStoredSearch,
listener
);
} else {
transportService.sendRequest(node, GetAsyncStatusAction.NAME, request,
new ActionListenerResponseHandler<>(listener, AsyncStatusResponse::new, ThreadPool.Names.SAME));
}
}

private void retrieveStatus(GetAsyncStatusRequest request, ActionListener<AsyncStatusResponse> listener) {
long nowInMillis = System.currentTimeMillis();
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
try {
AsyncTask task = (AsyncTask) taskManager.getTask(searchId.getTaskId().getId());
if ((task instanceof AsyncSearchTask) && (task.getExecutionId().equals(searchId))) {
AsyncStatusResponse response = ((AsyncSearchTask) task).getStatusResponse();
sendFinalResponse(request, response, nowInMillis, listener);
} else {
getStatusResponseFromIndex(searchId, request, nowInMillis, listener);
}
} catch (Exception exc) {
listener.onFailure(exc);
}
}

/**
* Get a status response from index
*/
private void getStatusResponseFromIndex(AsyncExecutionId searchId,
GetAsyncStatusRequest request, long nowInMillis, ActionListener<AsyncStatusResponse> listener) {
store.getStatusResponse(searchId, AsyncStatusResponse::getStatusFromAsyncSearchResponseWithExpirationTime,
new ActionListener<AsyncStatusResponse>() {
@Override
public void onResponse(AsyncStatusResponse asyncStatusResponse) {
sendFinalResponse(request, asyncStatusResponse, nowInMillis, listener);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
}

private static void sendFinalResponse(GetAsyncStatusRequest request,
AsyncStatusResponse response, long nowInMillis, ActionListener<AsyncStatusResponse> listener) {
if (response.getExpirationTime() < nowInMillis) { // check if the result has expired
listener.onFailure(new ResourceNotFoundException(request.getId()));
} else {
listener.onResponse(response);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.search;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.io.IOException;
import java.util.Date;
import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId;

public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<AsyncStatusResponse> {

@Override
protected AsyncStatusResponse createTestInstance() {
String id = randomSearchId();
boolean isRunning = randomBoolean();
boolean isPartial = isRunning ? randomBoolean() : false;
long startTimeMillis = (new Date(randomLongBetween(0, 3000000000000L))).getTime();
long expirationTimeMillis = startTimeMillis + 3600000L;
int totalShards = randomIntBetween(10, 150);
int successfulShards = randomIntBetween(0, totalShards - 5);
int skippedShards = randomIntBetween(0, 5);
int failedShards = totalShards - successfulShards - skippedShards;
RestStatus completionStatus = isRunning ? null : randomBoolean() ? RestStatus.OK : RestStatus.SERVICE_UNAVAILABLE;
return new AsyncStatusResponse(
id,
isRunning,
isPartial,
startTimeMillis,
expirationTimeMillis,
totalShards,
successfulShards,
skippedShards,
failedShards,
completionStatus
);
}

@Override
protected Writeable.Reader<AsyncStatusResponse> instanceReader() {
return AsyncStatusResponse::new;
}

@Override
protected AsyncStatusResponse mutateInstance(AsyncStatusResponse instance) {
// return a response with the opposite running status
boolean isRunning = instance.isRunning() == false;
boolean isPartial = isRunning ? randomBoolean() : false;
RestStatus completionStatus = isRunning ? null : randomBoolean() ? RestStatus.OK : RestStatus.SERVICE_UNAVAILABLE;
return new AsyncStatusResponse(
instance.getId(),
isRunning,
isPartial,
instance.getStartTime(),
instance.getExpirationTime(),
instance.getTotalShards(),
instance.getSuccessfulShards(),
instance.getSkippedShards(),
instance.getFailedShards(),
completionStatus
);
}

public void testToXContent() throws IOException {
AsyncStatusResponse response = createTestInstance();
try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
String expectedJson = "{\n" +
" \"id\" : \"" + response.getId() + "\",\n" +
" \"is_running\" : " + response.isRunning() + ",\n" +
" \"is_partial\" : " + response.isPartial() + ",\n" +
" \"start_time_in_millis\" : " + response.getStartTime() + ",\n" +
" \"expiration_time_in_millis\" : " + response.getExpirationTime() + ",\n" +
" \"_shards\" : {\n" +
" \"total\" : " + response.getTotalShards() + ",\n" +
" \"successful\" : " + response.getSuccessfulShards() + ",\n" +
" \"skipped\" : " + response.getSkippedShards() + ",\n" +
" \"failed\" : " + response.getFailedShards() + "\n";
if (response.getCompletionStatus() == null) {
expectedJson = expectedJson +
" }\n" +
"}";
} else {
expectedJson = expectedJson +
" },\n" +
" \"completion_status\" : " + response.getCompletionStatus().getStatus() + "\n" +
"}";
}
builder.prettyPrint();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
assertEquals(expectedJson, Strings.toString(builder));
}
}
}
Loading