From 8bf1d14dc0b396d63938aaf253c305ecdec1aa04 Mon Sep 17 00:00:00 2001 From: Mayya Sharipova Date: Thu, 11 Feb 2021 09:30:13 -0500 Subject: [PATCH] Introduce eql search status API (#68065) Introduce eql search status API, that reports the status of eql stored or async search. GET _eql/search/status/ The API is restricted to the monitoring_user role. For a running eql search, a response has the following format: { "id" : , "is_running" : true, "is_partial" : true, "start_time_in_millis" : 1611690235000, "expiration_time_in_millis" : 1611690295000 } For a completed eql search, a response has the following format: { "id" : , "is_running" : false, "is_partial" : false, "expiration_time_in_millis" : 1611690295000, "completion_status" : 200 } Backport for #68065 Closes #66955 --- docs/reference/eql/eql-search-api.asciidoc | 4 +- docs/reference/eql/eql.asciidoc | 25 +++ .../eql/get-async-eql-status-api.asciidoc | 120 +++++++++++ docs/reference/search.asciidoc | 3 + .../xpack/search/AsyncSearchTask.java | 12 +- .../search/TransportGetAsyncStatusAction.java | 59 +----- .../search/AsyncStatusResponseTests.java | 105 ++++++++++ .../core/async/AsyncTaskIndexService.java | 69 ++++++- .../search/action/AsyncStatusResponse.java | 42 +++- .../search/action/SearchStatusResponse.java | 19 ++ .../rest-api-spec/test/eql/10_basic.yml | 50 +++++ .../xpack/eql/action/EqlSearchTask.java | 24 ++- .../xpack/eql/action/EqlStatusResponse.java | 191 ++++++++++++++++++ .../eql/plugin/EqlAsyncGetStatusAction.java | 19 ++ .../xpack/eql/plugin/EqlPlugin.java | 4 +- .../plugin/RestEqlGetAsyncStatusAction.java | 36 ++++ .../TransportEqlAsyncGetStatusAction.java | 75 +++++++ .../eql/action/EqlStatusResponseTests.java | 85 ++++++++ .../xpack/security/operator/Constants.java | 1 + .../rest-api-spec/api/eql.get_status.json | 31 +++ 20 files changed, 898 insertions(+), 76 deletions(-) create mode 100644 docs/reference/eql/get-async-eql-status-api.asciidoc create mode 100644 x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncStatusResponseTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SearchStatusResponse.java create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlStatusResponse.java create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncGetStatusAction.java create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlGetAsyncStatusAction.java create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetStatusAction.java create mode 100644 x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlStatusResponseTests.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/eql.get_status.json diff --git a/docs/reference/eql/eql-search-api.asciidoc b/docs/reference/eql/eql-search-api.asciidoc index 2be665b5da2e3..f9fe7cbabea7e 100644 --- a/docs/reference/eql/eql-search-api.asciidoc +++ b/docs/reference/eql/eql-search-api.asciidoc @@ -338,7 +338,9 @@ This search ID is only provided if one of the following conditions is met: parameter is `true`. You can use this ID with the <> to get the current status and available results for the search. +API>> to get the current status and available results for the search or +<> to get only +the current status. -- `is_partial`:: diff --git a/docs/reference/eql/eql.asciidoc b/docs/reference/eql/eql.asciidoc index bc9c0f88b6e9e..6f8b024576fa7 100644 --- a/docs/reference/eql/eql.asciidoc +++ b/docs/reference/eql/eql.asciidoc @@ -567,6 +567,28 @@ complete. // TESTRESPONSE[s/"took": 2000/"took": $body.took/] // TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/] +Another more lightweight way to check the progress of an async search is to use +the <> with the search ID. + +[source,console] +---- +GET /_eql/search/status/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE= +---- +// TEST[skip: no access to search ID] + +[source,console-result] +---- +{ + "id": "FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=", + "is_running": false, + "is_partial": false, + "expiration_time_in_millis" : 1611690295000, + "completion_status": 200 +} +---- +// TESTRESPONSE[s/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=/$body.id/] +// TESTRESPONSE[s/"expiration_time_in_millis": 1611690295000/"expiration_time_in_millis": $body.expiration_time_in_millis/] + [discrete] [[eql-search-store-async-eql-search]] === Change the search retention period @@ -660,6 +682,9 @@ GET /_eql/search/FjlmbndxNmJjU0RPdExBTGg0elNOOEEaQk9xSjJBQzBRMldZa1VVQ2pPa01YUTo Saved synchronous searches are still subject to the `keep_alive` parameter's retention period. When this period ends, the search and its results are deleted. +You can also check only the status of the saved synchronous search without +results by using <>. + You can also manually delete saved synchronous searches using the <>. diff --git a/docs/reference/eql/get-async-eql-status-api.asciidoc b/docs/reference/eql/get-async-eql-status-api.asciidoc new file mode 100644 index 0000000000000..9645b5d3bd3ca --- /dev/null +++ b/docs/reference/eql/get-async-eql-status-api.asciidoc @@ -0,0 +1,120 @@ +[role="xpack"] +[testenv="basic"] + +[[get-async-eql-status-api]] +=== Get async EQL status API +++++ +Get async EQL search status +++++ +Returns the current status for an <> or +a <> +without returning results. This is a more lightweight API than +<> as it doesn't return +search results, and reports only the status. + +If the {es} {security-features} are enabled, the access to the get async +eql status API is restricted to the <>. + +[source,console] +---- +GET /_eql/search/status/FkpMRkJGS1gzVDRlM3g4ZzMyRGlLbkEaTXlJZHdNT09TU2VTZVBoNDM3cFZMUToxMDM= +---- +// TEST[skip: no access to search ID] + +[[get-async-eql-status-api-request]] +==== {api-request-title} + +`GET /_eql/search/status/` + + +[[get-async-eql-status-api-path-params]] +==== {api-path-parms-title} + +``:: +(Required, string) +Identifier for the search. ++ +A search ID is provided in the <>'s response for +an <>. A search ID is also provided if the +request's <> parameter +is `true`. + +[role="child_attributes"] +[[get-async-eql-status-api-response-body]] +==== {api-response-body-title} + +`id`:: +(string) +Identifier for the search. + +`is_running`:: +(boolean) +If `true`, the search request is still executing. +If `false`, the search is completed. + +`is_partial`:: +(boolean) +If `true`, the response does not contain complete search results. +This could be because either the search is still running +(`is_running` status is `false`), or because it is already completed +(`is_running` status is `true`) and results are partial due to +failures or timeouts. + +`start_time_in_millis`:: +(Long) +For a running search shows a timestamp when the eql search +started, in milliseconds since the Unix epoch. + +`expiration_time_in_millis`:: +(long) +Shows a timestamp when the eql search will be expired, in milliseconds +since the Unix epoch. When this time is reached, the search and its results +are deleted, even if the search is still ongoing. + +`completion_status`:: +(Integer) +For a completed search shows the http status code of the completed +search. + + +[[eql-status-api-example]] +==== {api-examples-title} + +[source,console] +---- +GET /_eql/search/status/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=?keep_alive=5d +---- +// TEST[skip: no access to search ID] + +If the search is still running, the status response has the following form: + +[source,console-result] +-------------------------------------------------- +{ + "id" : "FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=", + "is_running" : true, + "is_partial" : true, + "start_time_in_millis" : 1611690235000, + "expiration_time_in_millis" : 1611690295000 + +} +-------------------------------------------------- +// TEST[skip: no access to search ID] + +If the search is completed the status response doesn't have +`start_time_in_millis`, but has an additional `completion_status` +field that shows the status code of the completed eql search: + +[source,console-result] +-------------------------------------------------- +{ + "id" : "FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=", + "is_running" : false, + "is_partial" : false, + "expiration_time_in_millis" : 1611690295000, + "completion_status" : 200 <1> +} +-------------------------------------------------- +// TEST[skip: no access to search ID] + +<1> Indicates that the eql search was successfully completed diff --git a/docs/reference/search.asciidoc b/docs/reference/search.asciidoc index c753dbd6e282e..6ac76b25cc77b 100644 --- a/docs/reference/search.asciidoc +++ b/docs/reference/search.asciidoc @@ -45,6 +45,7 @@ For an overview of EQL and related tutorials, see <>. * <> * <> +* <> * <> @@ -70,6 +71,8 @@ include::eql/eql-search-api.asciidoc[] include::eql/get-async-eql-search-api.asciidoc[] +include::eql/get-async-eql-status-api.asciidoc[] + include::eql/delete-async-eql-search-api.asciidoc[] include::search/count.asciidoc[] diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 81e967e6ce45b..ee4eb01057067 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -350,12 +350,16 @@ private synchronized void checkCancellation() { } /** - * Returns the status of {@link AsyncSearchTask} + * Returns the status from {@link AsyncSearchTask} */ - public AsyncStatusResponse getStatusResponse() { - MutableSearchResponse mutableSearchResponse = searchResponse.get(); + public static AsyncStatusResponse getStatusResponse(AsyncSearchTask asyncTask) { + MutableSearchResponse mutableSearchResponse = asyncTask.searchResponse.get(); assert mutableSearchResponse != null; - return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis); + return mutableSearchResponse.toStatusResponse( + asyncTask.searchId.getEncoded(), + asyncTask.getStartTime(), + asyncTask.expirationTimeMillis + ); } class Listener extends SearchProgressActionListener { diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java index 1e9a228932478..6f1dedb01f20a 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java @@ -6,7 +6,6 @@ */ package org.elasticsearch.xpack.search; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ActionFilters; @@ -21,7 +20,6 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.async.AsyncExecutionId; -import org.elasticsearch.xpack.core.async.AsyncTask; import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; @@ -55,56 +53,19 @@ public TransportGetAsyncStatusAction(TransportService transportService, protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListener listener) { AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId()); - if (node == null || Objects.equals(node, clusterService.localNode())) { - retrieveStatus(request, listener); + DiscoveryNode localNode = clusterService.state().getNodes().getLocalNode(); + if (node == null || Objects.equals(node, localNode)) { + store.retrieveStatus( + request, + taskManager, + AsyncSearchTask.class, + AsyncSearchTask::getStatusResponse, + AsyncStatusResponse::getStatusFromStoredSearch, + listener + ); } else { transportService.sendRequest(node, GetAsyncStatusAction.NAME, request, new ActionListenerResponseHandler<>(listener, AsyncStatusResponse::new, ThreadPool.Names.SAME)); } } - - private void retrieveStatus(GetAsyncStatusRequest request, ActionListener listener) { - long nowInMillis = System.currentTimeMillis(); - AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); - try { - AsyncTask task = (AsyncTask) taskManager.getTask(searchId.getTaskId().getId()); - if ((task instanceof AsyncSearchTask) && (task.getExecutionId().equals(searchId))) { - AsyncStatusResponse response = ((AsyncSearchTask) task).getStatusResponse(); - sendFinalResponse(request, response, nowInMillis, listener); - } else { - getStatusResponseFromIndex(searchId, request, nowInMillis, listener); - } - } catch (Exception exc) { - listener.onFailure(exc); - } - } - - /** - * Get a status response from index - */ - private void getStatusResponseFromIndex(AsyncExecutionId searchId, - GetAsyncStatusRequest request, long nowInMillis, ActionListener listener) { - store.getStatusResponse(searchId, AsyncStatusResponse::getStatusFromAsyncSearchResponseWithExpirationTime, - new ActionListener() { - @Override - public void onResponse(AsyncStatusResponse asyncStatusResponse) { - sendFinalResponse(request, asyncStatusResponse, nowInMillis, listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - } - ); - } - - private static void sendFinalResponse(GetAsyncStatusRequest request, - AsyncStatusResponse response, long nowInMillis, ActionListener listener) { - if (response.getExpirationTime() < nowInMillis) { // check if the result has expired - listener.onFailure(new ResourceNotFoundException(request.getId())); - } else { - listener.onResponse(response); - } - } } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncStatusResponseTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncStatusResponseTests.java new file mode 100644 index 0000000000000..acd669e8ac068 --- /dev/null +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncStatusResponseTests.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.search; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.Date; +import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId; + +public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected AsyncStatusResponse createTestInstance() { + String id = randomSearchId(); + boolean isRunning = randomBoolean(); + boolean isPartial = isRunning ? randomBoolean() : false; + long startTimeMillis = (new Date(randomLongBetween(0, 3000000000000L))).getTime(); + long expirationTimeMillis = startTimeMillis + 3600000L; + int totalShards = randomIntBetween(10, 150); + int successfulShards = randomIntBetween(0, totalShards - 5); + int skippedShards = randomIntBetween(0, 5); + int failedShards = totalShards - successfulShards - skippedShards; + RestStatus completionStatus = isRunning ? null : randomBoolean() ? RestStatus.OK : RestStatus.SERVICE_UNAVAILABLE; + return new AsyncStatusResponse( + id, + isRunning, + isPartial, + startTimeMillis, + expirationTimeMillis, + totalShards, + successfulShards, + skippedShards, + failedShards, + completionStatus + ); + } + + @Override + protected Writeable.Reader instanceReader() { + return AsyncStatusResponse::new; + } + + @Override + protected AsyncStatusResponse mutateInstance(AsyncStatusResponse instance) { + // return a response with the opposite running status + boolean isRunning = instance.isRunning() == false; + boolean isPartial = isRunning ? randomBoolean() : false; + RestStatus completionStatus = isRunning ? null : randomBoolean() ? RestStatus.OK : RestStatus.SERVICE_UNAVAILABLE; + return new AsyncStatusResponse( + instance.getId(), + isRunning, + isPartial, + instance.getStartTime(), + instance.getExpirationTime(), + instance.getTotalShards(), + instance.getSuccessfulShards(), + instance.getSkippedShards(), + instance.getFailedShards(), + completionStatus + ); + } + + public void testToXContent() throws IOException { + AsyncStatusResponse response = createTestInstance(); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + String expectedJson = "{\n" + + " \"id\" : \"" + response.getId() + "\",\n" + + " \"is_running\" : " + response.isRunning() + ",\n" + + " \"is_partial\" : " + response.isPartial() + ",\n" + + " \"start_time_in_millis\" : " + response.getStartTime() + ",\n" + + " \"expiration_time_in_millis\" : " + response.getExpirationTime() + ",\n" + + " \"_shards\" : {\n" + + " \"total\" : " + response.getTotalShards() + ",\n" + + " \"successful\" : " + response.getSuccessfulShards() + ",\n" + + " \"skipped\" : " + response.getSkippedShards() + ",\n" + + " \"failed\" : " + response.getFailedShards() + "\n"; + if (response.getCompletionStatus() == null) { + expectedJson = expectedJson + + " }\n" + + "}"; + } else { + expectedJson = expectedJson + + " },\n" + + " \"completion_status\" : " + response.getCompletionStatus().getStatus() + "\n" + + "}"; + } + builder.prettyPrint(); + response.toXContent(builder, ToXContent.EMPTY_PARAMS); + assertEquals(expectedJson, Strings.toString(builder)); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index fa7bd7fe847d4..6d9d12bc9dab8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -24,6 +24,7 @@ import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; @@ -40,7 +41,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; -import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; +import org.elasticsearch.xpack.core.search.action.SearchStatusResponse; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer; @@ -52,7 +53,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.BiFunction; +import java.util.function.Function; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; @@ -352,18 +353,55 @@ public void getResponse(AsyncExecutionId asyncExecutionId, )); } + /** + * Retrieve the status of the async search or async or stored eql search. + * Retrieve from the task if the task is still available or from the index. + */ + public void retrieveStatus( + GetAsyncStatusRequest request, + TaskManager taskManager, + Class tClass, + Function statusProducerFromTask, + TriFunction statusProducerFromIndex, + ActionListener listener) { + AsyncExecutionId asyncExecutionId = AsyncExecutionId.decode(request.getId()); + try { + T asyncTask = getTask(taskManager, asyncExecutionId, tClass); + if (asyncTask != null) { // get status response from task + SR response = statusProducerFromTask.apply(asyncTask); + sendFinalStatusResponse(request, response, listener); + } else { // get status response from index + getStatusResponseFromIndex(asyncExecutionId, statusProducerFromIndex, + new ActionListener() { + @Override + public void onResponse(SR searchStatusResponse) { + sendFinalStatusResponse(request, searchStatusResponse, listener); + } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + ); + } + } catch (Exception exc) { + listener.onFailure(exc); + } + } /** - * Gets the status response of the async search from the index - * @param asyncExecutionId – id of the async search - * @param statusProducer – a producer of the status from the stored async search response and expirationTime + * Gets the status response of the stored search from the index + * @param asyncExecutionId – id of the stored search (async search or stored eql search) + * @param statusProducer – a producer of a status from the stored search, expirationTime and async search id * @param listener – listener to report result to */ - public void getStatusResponse(AsyncExecutionId asyncExecutionId, - BiFunction statusProducer, - ActionListener listener) { + private void getStatusResponseFromIndex( + AsyncExecutionId asyncExecutionId, + TriFunction statusProducer, + ActionListener listener) { + String asyncId = asyncExecutionId.getEncoded(); GetRequest internalGet = new GetRequest(index) - .preference(asyncExecutionId.getEncoded()) + .preference(asyncId) .id(asyncExecutionId.getDocId()); clientWithOrigin.get(internalGet, ActionListener.wrap( get -> { @@ -374,7 +412,7 @@ public void getStatusResponse(AsyncExecutionId asyncExecutionId, String encoded = (String) get.getSource().get(RESULT_FIELD); if (encoded != null) { Long expirationTime = (Long) get.getSource().get(EXPIRATION_TIME_FIELD); - listener.onResponse(statusProducer.apply(decodeResponse(encoded), expirationTime)); + listener.onResponse(statusProducer.apply(decodeResponse(encoded), expirationTime, asyncId)); } else { listener.onResponse(null); } @@ -383,6 +421,17 @@ public void getStatusResponse(AsyncExecutionId asyncExecutionId, )); } + private static void sendFinalStatusResponse( + GetAsyncStatusRequest request, + SR response, + ActionListener listener) { + if (response.getExpirationTime() < System.currentTimeMillis()) { // check if the result has expired + listener.onFailure(new ResourceNotFoundException(request.getId())); + } else { + listener.onResponse(response); + } + } + /** * Checks if the current user's authentication matches the original authentication stored * in the async search index. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncStatusResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncStatusResponse.java index af557788ef847..5f12fc49fae06 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncStatusResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncStatusResponse.java @@ -17,13 +17,14 @@ import org.elasticsearch.rest.action.RestActions; import java.io.IOException; +import java.util.Objects; import static org.elasticsearch.rest.RestStatus.OK; /** - * A response of an async search request. + * A response of an async search status request. */ -public class AsyncStatusResponse extends ActionResponse implements StatusToXContentObject { +public class AsyncStatusResponse extends ActionResponse implements SearchStatusResponse, StatusToXContentObject { private final String id; private final boolean isRunning; private final boolean isPartial; @@ -57,8 +58,15 @@ public AsyncStatusResponse(String id, this.completionStatus = completionStatus; } - public static AsyncStatusResponse getStatusFromAsyncSearchResponseWithExpirationTime(AsyncSearchResponse asyncSearchResponse, - long expirationTimeMillis) { + /** + * Get status from the stored async search response + * @param asyncSearchResponse stored async search response + * @param expirationTimeMillis – expiration time in milliseconds + * @param id – encoded async search id + * @return status response + */ + public static AsyncStatusResponse getStatusFromStoredSearch(AsyncSearchResponse asyncSearchResponse, + long expirationTimeMillis, String id) { int totalShards = 0; int successfulShards = 0; int skippedShards = 0; @@ -82,7 +90,7 @@ public static AsyncStatusResponse getStatusFromAsyncSearchResponseWithExpiration } } return new AsyncStatusResponse( - asyncSearchResponse.getId(), + id, asyncSearchResponse.isRunning(), asyncSearchResponse.isPartial(), asyncSearchResponse.getStartTime(), @@ -145,6 +153,29 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + AsyncStatusResponse other = (AsyncStatusResponse) obj; + return id.equals(other.id) + && isRunning == other.isRunning + && isPartial == other.isPartial + && startTimeMillis == other.startTimeMillis + && expirationTimeMillis == other.expirationTimeMillis + && totalShards == other.totalShards + && successfulShards == other.successfulShards + && skippedShards == other.skippedShards + && failedShards == other.failedShards + && Objects.equals(completionStatus, other.completionStatus); + } + + @Override + public int hashCode() { + return Objects.hash(id, isRunning, isPartial, startTimeMillis, expirationTimeMillis, totalShards, + successfulShards, skippedShards, failedShards, completionStatus); + } + /** * Returns the id of the async search status request. */ @@ -179,6 +210,7 @@ public long getStartTime() { /** * Returns a timestamp when the search will be expired, in milliseconds since epoch. */ + @Override public long getExpirationTime() { return expirationTimeMillis; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SearchStatusResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SearchStatusResponse.java new file mode 100644 index 0000000000000..ced2d6e1c3cd5 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SearchStatusResponse.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.search.action; + +/** + * An interface for status response of the stored or running async search + */ +public interface SearchStatusResponse { + + /** + * Returns a timestamp when the search will be expired, in milliseconds since epoch. + */ + long getExpirationTime(); +} diff --git a/x-pack/plugin/eql/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/eql/10_basic.yml b/x-pack/plugin/eql/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/eql/10_basic.yml index 897f2c93ddb37..8adf1acd78db7 100644 --- a/x-pack/plugin/eql/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/eql/10_basic.yml +++ b/x-pack/plugin/eql/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/eql/10_basic.yml @@ -158,3 +158,53 @@ setup: catch: missing eql.delete: id: $id + +--- +"EQL status API": + + - do: + eql.search: + index: eql_test + keep_on_completion: true + wait_for_completion_timeout: "0ms" + body: + query: 'process where user == "SYSTEM"' + - is_true: id + - set: { id: id } + + # status for eql search that may be completed or not + - do: + eql.get_status: + id: $id + - match: { id: $id } + - set: { is_running: run_status } + - match: { is_partial: $run_status } + - is_true: expiration_time_in_millis + + # wait for the certain completion + - do: + eql.get: + id: $id + wait_for_completion_timeout: "10s" + + # status for completed eql search + - do: + eql.get_status: + id: $id + - match: { id: $id } + - match: { is_running: false } + - match: { is_partial: false } + - is_false: start_time_in_millis + - is_true: expiration_time_in_millis + - match: { completion_status: 200 } + + - do: + eql.delete: + id: $id + - match: { acknowledged: true } + + # status for a deleted/missing eql search + - do: + catch: missing + eql.get_status: + id: $id diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java index 7ccf5a6301f09..21040b2898636 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java @@ -13,11 +13,8 @@ import org.elasticsearch.xpack.eql.async.StoredAsyncTask; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; public class EqlSearchTask extends StoredAsyncTask { - public volatile AtomicReference finalResponse = new AtomicReference<>(); - public EqlSearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers, Map originHeaders, AsyncExecutionId asyncExecutionId, TimeValue keepAlive) { super(id, type, action, description, parentTaskId, headers, originHeaders, asyncExecutionId, keepAlive); @@ -25,8 +22,23 @@ public EqlSearchTask(long id, String type, String action, String description, Ta @Override public EqlSearchResponse getCurrentResult() { - EqlSearchResponse response = finalResponse.get(); - return response != null ? response : new EqlSearchResponse(EqlSearchResponse.Hits.EMPTY, - System.currentTimeMillis() - getStartTime(), false, getExecutionId().getEncoded(), true, true); + // for eql searches we never store a search response in the task (neither partial, nor final) + // we kill the task on final response, so if the task is still present, it means the search is still running + return new EqlSearchResponse(EqlSearchResponse.Hits.EMPTY, System.currentTimeMillis() - getStartTime(), false, + getExecutionId().getEncoded(), true, true); + } + + /** + * Returns the status from {@link EqlSearchTask} + */ + public static EqlStatusResponse getStatusResponse(EqlSearchTask asyncTask) { + return new EqlStatusResponse( + asyncTask.getExecutionId().getEncoded(), + true, + true, + asyncTask.getStartTime(), + asyncTask.getExpirationTimeMillis(), + null + ); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlStatusResponse.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlStatusResponse.java new file mode 100644 index 0000000000000..f78a99baa2eb0 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlStatusResponse.java @@ -0,0 +1,191 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.eql.action; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.StatusToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.core.search.action.SearchStatusResponse; +import org.elasticsearch.xpack.eql.async.StoredAsyncResponse; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.rest.RestStatus.OK; + +/** + * A response for eql search status request + */ +public class EqlStatusResponse extends ActionResponse implements SearchStatusResponse, StatusToXContentObject { + private final String id; + private final boolean isRunning; + private final boolean isPartial; + private final Long startTimeMillis; + private final long expirationTimeMillis; + private final RestStatus completionStatus; + + public EqlStatusResponse(String id, + boolean isRunning, + boolean isPartial, + Long startTimeMillis, + long expirationTimeMillis, + RestStatus completionStatus) { + this.id = id; + this.isRunning = isRunning; + this.isPartial = isPartial; + this.startTimeMillis = startTimeMillis; + this.expirationTimeMillis = expirationTimeMillis; + this.completionStatus = completionStatus; + } + + /** + * Get status from the stored eql search response + * @param storedResponse + * @param expirationTimeMillis – expiration time in milliseconds + * @param id – encoded async search id + * @return a status response + */ + public static EqlStatusResponse getStatusFromStoredSearch(StoredAsyncResponse storedResponse, + long expirationTimeMillis, String id) { + EqlSearchResponse searchResponse = storedResponse.getResponse(); + if (searchResponse != null) { + assert searchResponse.isRunning() == false : "Stored eql search response must have a completed status!"; + return new EqlStatusResponse( + searchResponse.id(), + false, + searchResponse.isPartial(), + null, // we dont' store in the index start time for completed response + expirationTimeMillis, + RestStatus.OK + ); + } else { + Exception exc = storedResponse.getException(); + assert exc != null : "Stored eql response must either have a search response or an exception!"; + return new EqlStatusResponse( + id, + false, + false, + null, // we dont' store in the index start time for completed response + expirationTimeMillis, + ExceptionsHelper.status(exc) + ); + } + } + + public EqlStatusResponse(StreamInput in) throws IOException { + this.id = in.readString(); + this.isRunning = in.readBoolean(); + this.isPartial = in.readBoolean(); + this.startTimeMillis = in.readOptionalLong(); + this.expirationTimeMillis = in.readLong(); + this.completionStatus = (this.isRunning == false) ? RestStatus.readFrom(in) : null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + out.writeBoolean(isRunning); + out.writeBoolean(isPartial); + out.writeOptionalLong(startTimeMillis); + out.writeLong(expirationTimeMillis); + if (isRunning == false) { + RestStatus.writeTo(out, completionStatus); + } + } + + @Override + public RestStatus status() { + return OK; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("id", id); + builder.field("is_running", isRunning); + builder.field("is_partial", isPartial); + if (startTimeMillis != null) { // start time is available only for a running eql search + builder.timeField("start_time_in_millis", "start_time", startTimeMillis); + } + builder.timeField("expiration_time_in_millis", "expiration_time", expirationTimeMillis); + if (isRunning == false) { // completion status is available only for a completed eql search + builder.field("completion_status", completionStatus.getStatus()); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + EqlStatusResponse other = (EqlStatusResponse) obj; + return id.equals(other.id) + && isRunning == other.isRunning + && isPartial == other.isPartial + && Objects.equals(startTimeMillis, other.startTimeMillis) + && expirationTimeMillis == other.expirationTimeMillis + && Objects.equals(completionStatus, other.completionStatus); + } + + @Override + public int hashCode() { + return Objects.hash(id, isRunning, isPartial, startTimeMillis, expirationTimeMillis, completionStatus); + } + + /** + * Returns the id of the eql search status request. + */ + public String getId() { + return id; + } + + /** + * Returns {@code true} if the eql search is still running in the cluster, + * or {@code false} if the search has been completed. + */ + public boolean isRunning() { + return isRunning; + } + + /** + * Returns {@code true} if the eql search results are partial. + * This could be either because eql search hasn't finished yet, + * or if it finished and some shards have failed or timed out. + */ + public boolean isPartial() { + return isPartial; + } + + /** + * Returns a timestamp when the eql search task started, in milliseconds since epoch. + * For a completed eql search returns {@code null}, as we don't store start time for completed searches. + */ + public Long getStartTime() { + return startTimeMillis; + } + + /** + * Returns a timestamp when the eql search will be expired, in milliseconds since epoch. + */ + @Override + public long getExpirationTime() { + return expirationTimeMillis; + } + + /** + * For a completed eql search returns the completion status. + * For a still running eql search returns {@code null}. + */ + public RestStatus getCompletionStatus() { + return completionStatus; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncGetStatusAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncGetStatusAction.java new file mode 100644 index 0000000000000..a43f5fc5c067b --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlAsyncGetStatusAction.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.eql.plugin; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.xpack.eql.action.EqlStatusResponse; + +public class EqlAsyncGetStatusAction extends ActionType { + public static final EqlAsyncGetStatusAction INSTANCE = new EqlAsyncGetStatusAction(); + public static final String NAME = "cluster:monitor/eql/async/status"; + + private EqlAsyncGetStatusAction() { + super(NAME, EqlStatusResponse::new); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java index 9e56bc0c0db61..2a19bba0df0d8 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java @@ -94,7 +94,8 @@ public List> getSettings() { return org.elasticsearch.common.collect.List.of( new ActionHandler<>(EqlSearchAction.INSTANCE, TransportEqlSearchAction.class), new ActionHandler<>(EqlStatsAction.INSTANCE, TransportEqlStatsAction.class), - new ActionHandler<>(EqlAsyncGetResultAction.INSTANCE, TransportEqlAsyncGetResultAction.class) + new ActionHandler<>(EqlAsyncGetResultAction.INSTANCE, TransportEqlAsyncGetResultAction.class), + new ActionHandler<>(EqlAsyncGetStatusAction.INSTANCE, TransportEqlAsyncGetStatusAction.class) ); } @@ -111,6 +112,7 @@ public List getRestHandlers(Settings settings, new RestEqlSearchAction(), new RestEqlStatsAction(), new RestEqlGetAsyncResultAction(), + new RestEqlGetAsyncStatusAction(), new RestEqlDeleteAsyncResultAction() ); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlGetAsyncStatusAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlGetAsyncStatusAction.java new file mode 100644 index 0000000000000..3d6f7f6efed47 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlGetAsyncStatusAction.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.eql.plugin; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestStatusToXContentListener; +import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest; + +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestEqlGetAsyncStatusAction extends BaseRestHandler { + @Override + public List routes() { + return singletonList(new Route(GET, "/_eql/search/status/{id}")); + } + + @Override + public String getName() { + return "eql_get_async_status"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + GetAsyncStatusRequest statusRequest = new GetAsyncStatusRequest(request.param("id")); + return channel -> client.execute(EqlAsyncGetStatusAction.INSTANCE, statusRequest, new RestStatusToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetStatusAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetStatusAction.java new file mode 100644 index 0000000000000..f7b46a7a83c00 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetStatusAction.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.eql.plugin; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; +import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest; +import org.elasticsearch.xpack.eql.action.EqlSearchResponse; +import org.elasticsearch.xpack.eql.action.EqlSearchTask; +import org.elasticsearch.xpack.eql.action.EqlStatusResponse; +import org.elasticsearch.xpack.eql.async.StoredAsyncResponse; + +import java.util.Objects; + +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; + + +public class TransportEqlAsyncGetStatusAction extends HandledTransportAction { + private final TransportService transportService; + private final ClusterService clusterService; + private final AsyncTaskIndexService> store; + + @Inject + public TransportEqlAsyncGetStatusAction(TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + NamedWriteableRegistry registry, + Client client, + ThreadPool threadPool) { + super(EqlAsyncGetStatusAction.NAME, transportService, actionFilters, GetAsyncStatusRequest::new); + this.transportService = transportService; + this.clusterService = clusterService; + Writeable.Reader> reader = in -> new StoredAsyncResponse<>(EqlSearchResponse::new, in); + this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, + threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry); + } + + @Override + protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListener listener) { + AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId()); + DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId()); + DiscoveryNode localNode = clusterService.state().getNodes().getLocalNode(); + if (node == null || Objects.equals(node, localNode)) { + store.retrieveStatus( + request, + taskManager, + EqlSearchTask.class, + EqlSearchTask::getStatusResponse, + EqlStatusResponse::getStatusFromStoredSearch, + listener + ); + } else { + transportService.sendRequest(node, EqlAsyncGetStatusAction.NAME, request, + new ActionListenerResponseHandler<>(listener, EqlStatusResponse::new, ThreadPool.Names.SAME)); + } + } +} diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlStatusResponseTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlStatusResponseTests.java new file mode 100644 index 0000000000000..2b8aecf8fade3 --- /dev/null +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlStatusResponseTests.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.eql.action; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.Date; + +import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId; + +public class EqlStatusResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected EqlStatusResponse createTestInstance() { + String id = randomSearchId(); + boolean isRunning = randomBoolean(); + boolean isPartial = isRunning ? randomBoolean() : false; + long randomDate = (new Date(randomLongBetween(0, 3000000000000L))).getTime(); + Long startTimeMillis = randomBoolean() ? null : randomDate; + long expirationTimeMillis = startTimeMillis == null ? randomDate : startTimeMillis + 3600000L; + RestStatus completionStatus = isRunning ? null : randomBoolean() ? RestStatus.OK : RestStatus.SERVICE_UNAVAILABLE; + return new EqlStatusResponse(id, isRunning, isPartial, startTimeMillis, expirationTimeMillis, completionStatus); + } + + @Override + protected Writeable.Reader instanceReader() { + return EqlStatusResponse::new; + } + + @Override + protected EqlStatusResponse mutateInstance(EqlStatusResponse instance) { + // return a response with the opposite running status + boolean isRunning = instance.isRunning() == false; + boolean isPartial = isRunning ? randomBoolean() : false; + RestStatus completionStatus = isRunning ? null : randomBoolean() ? RestStatus.OK : RestStatus.SERVICE_UNAVAILABLE; + return new EqlStatusResponse( + instance.getId(), + isRunning, + isPartial, + instance.getStartTime(), + instance.getExpirationTime(), + completionStatus + ); + } + + public void testToXContent() throws IOException { + EqlStatusResponse response = createTestInstance(); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + String expectedJson = "{\n" + + " \"id\" : \"" + response.getId() + "\",\n" + + " \"is_running\" : " + response.isRunning() + ",\n" + + " \"is_partial\" : " + response.isPartial() + ",\n"; + + if (response.getStartTime() != null) { + expectedJson = expectedJson + + " \"start_time_in_millis\" : " + response.getStartTime() + ",\n"; + } + expectedJson = expectedJson + + " \"expiration_time_in_millis\" : " + response.getExpirationTime(); + + if (response.getCompletionStatus() == null) { + expectedJson = expectedJson + "\n" + + "}"; + } else { + expectedJson = expectedJson + ",\n" + + " \"completion_status\" : " + response.getCompletionStatus().getStatus() + "\n" + + "}"; + } + builder.prettyPrint(); + response.toXContent(builder, ToXContent.EMPTY_PARAMS); + assertEquals(expectedJson, Strings.toString(builder)); + } + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 1be6ecfb6b7ae..6e9e0d5726b58 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -214,6 +214,7 @@ public class Constants { "cluster:monitor/ccr/stats", "cluster:monitor/data_frame/get", "cluster:monitor/data_frame/stats/get", + "cluster:monitor/eql/async/status", "cluster:monitor/health", "cluster:monitor/main", "cluster:monitor/nodes/hot_threads", diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.get_status.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.get_status.json new file mode 100644 index 0000000000000..be8a439893362 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.get_status.json @@ -0,0 +1,31 @@ +{ + "eql.get_status": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/current/eql-search-api.html", + "description": "Returns the status of a previously submitted async or stored Event Query Language (EQL) search" + }, + "stability": "stable", + "visibility": "public", + "headers": { + "accept": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/_eql/search/status/{id}", + "methods": [ + "GET" + ], + "parts": { + "id": { + "type": "string", + "description": "The async search ID" + } + } + } + ] + } + } +}