From f9cdcebbb4b7ef56aebde3fe8c45feff4d895fc0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 25 Aug 2020 15:08:50 -0400 Subject: [PATCH] Support point in time in async_search --- .../mustache/RestSearchTemplateAction.java | 3 +- .../AbstractBaseReindexRestHandler.java | 5 +- .../AbstractBulkByQueryRestHandler.java | 5 +- .../reindex/RestDeleteByQueryAction.java | 5 +- .../index/reindex/RestReindexAction.java | 3 +- .../reindex/RestUpdateByQueryAction.java | 5 +- .../index/reindex/RestReindexActionTests.java | 9 +- .../action/search/SearchRequest.java | 6 - .../rest/action/search/RestSearchAction.java | 31 ++- .../action/search/SearchRequestTests.java | 22 --- .../xpack/search/AsyncSearchSecurityIT.java | 182 +++++++++++++++++- .../xpack/search/AsyncSearchActionIT.java | 19 +- .../search/RestSubmitAsyncSearchAction.java | 2 +- .../search/AsyncSearchIntegTestCase.java | 30 ++- .../action/RestOpenPointInTimeAction.java | 4 +- .../rollup/rest/RestRollupSearchAction.java | 3 +- .../async_search/20-with-poin-in-time.yml | 78 ++++++++ 17 files changed, 343 insertions(+), 69 deletions(-) create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/20-with-poin-in-time.yml diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestSearchTemplateAction.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestSearchTemplateAction.java index dfc0f3d0228b2..6bb0abcfcc057 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestSearchTemplateAction.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/RestSearchTemplateAction.java @@ -64,7 +64,8 @@ public String getName() { public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { // Creates the search request with all required params SearchRequest searchRequest = new SearchRequest(); - RestSearchAction.parseSearchRequest(searchRequest, request, null, size -> searchRequest.source().size(size)); + RestSearchAction.parseSearchRequest( + searchRequest, request, null, client.getNamedWriteableRegistry(), size -> searchRequest.source().size(size)); // Creates the search template request SearchTemplateRequest searchTemplateRequest; diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java index 3423bdccee3b4..ed70d13f017fd 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; @@ -49,7 +50,7 @@ protected AbstractBaseReindexRestHandler(A action) { protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient client, boolean includeCreated, boolean includeUpdated) throws IOException { // Build the internal request - Request internal = setCommonOptions(request, buildRequest(request)); + Request internal = setCommonOptions(request, buildRequest(request, client.getNamedWriteableRegistry())); // Executes the request and waits for completion if (request.paramAsBoolean("wait_for_completion", true)) { @@ -77,7 +78,7 @@ protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient c /** * Build the Request based on the RestRequest. */ - protected abstract Request buildRequest(RestRequest request) throws IOException; + protected abstract Request buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException; /** * Sets common options of {@link AbstractBulkByScrollRequest} requests. diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java index 1e4a619980b12..a2fba3e833dfd 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; @@ -43,7 +44,7 @@ protected AbstractBulkByQueryRestHandler(A action) { super(action); } - protected void parseInternalRequest(Request internal, RestRequest restRequest, + protected void parseInternalRequest(Request internal, RestRequest restRequest, NamedWriteableRegistry namedWriteableRegistry, Map> bodyConsumers) throws IOException { assert internal != null : "Request should not be null"; assert restRequest != null : "RestRequest should not be null"; @@ -51,7 +52,7 @@ protected void parseInternalRequest(Request internal, RestRequest restRequest, SearchRequest searchRequest = internal.getSearchRequest(); try (XContentParser parser = extractRequestSpecificFields(restRequest, bodyConsumers)) { - RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> failOnSizeSpecified()); + RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, namedWriteableRegistry, size -> failOnSizeSpecified()); } searchRequest.source().size(restRequest.paramAsInt("scroll_size", searchRequest.source().size())); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java index 2208af8aa11cd..60de875a95e8b 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.rest.RestRequest; import java.io.IOException; @@ -52,7 +53,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client } @Override - protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOException { + protected DeleteByQueryRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException { /* * Passing the search request through DeleteByQueryRequest first allows * it to set its own defaults which differ from SearchRequest's @@ -64,7 +65,7 @@ protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOExcept consumers.put("conflicts", o -> internal.setConflicts((String) o)); consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue())); - parseInternalRequest(internal, request, consumers); + parseInternalRequest(internal, request, namedWriteableRegistry, consumers); return internal; } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java index bd6798344d618..fb58c3e2b8daa 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.RestRequest; @@ -55,7 +56,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client } @Override - protected ReindexRequest buildRequest(RestRequest request) throws IOException { + protected ReindexRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException { if (request.hasParam("pipeline")) { throw new IllegalArgumentException("_reindex doesn't support [pipeline] as a query parameter. " + "Specify it in the [dest] object instead."); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java index f6d5b1647fef5..9395a7c097301 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.script.Script; @@ -53,7 +54,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client } @Override - protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOException { + protected UpdateByQueryRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException { /* * Passing the search request through UpdateByQueryRequest first allows * it to set its own defaults which differ from SearchRequest's @@ -66,7 +67,7 @@ protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOExcept consumers.put("script", o -> internal.setScript(Script.parse(o))); consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue())); - parseInternalRequest(internal, request, consumers); + parseInternalRequest(internal, request, namedWriteableRegistry, consumers); internal.setPipeline(request.param("pipeline")); return internal; diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java index cb8217b78ef35..ace1c9d4df9b9 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -29,6 +30,7 @@ import org.junit.Before; import java.io.IOException; +import java.util.Collections; import static java.util.Collections.singletonMap; @@ -59,7 +61,8 @@ public void testPipelineQueryParameterIsError() throws IOException { request.withContent(BytesReference.bytes(body), body.contentType()); } request.withParams(singletonMap("pipeline", "doesn't matter")); - Exception e = expectThrows(IllegalArgumentException.class, () -> action.buildRequest(request.build())); + Exception e = expectThrows(IllegalArgumentException.class, () -> + action.buildRequest(request.build(), new NamedWriteableRegistry(Collections.emptyList()))); assertEquals("_reindex doesn't support [pipeline] as a query parameter. Specify it in the [dest] object instead.", e.getMessage()); } @@ -68,14 +71,14 @@ public void testSetScrollTimeout() throws IOException { { FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry()); requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON); - ReindexRequest request = action.buildRequest(requestBuilder.build()); + ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList())); assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT, request.getScrollTime()); } { FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry()); requestBuilder.withParams(singletonMap("scroll", "10m")); requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON); - ReindexRequest request = action.buildRequest(requestBuilder.build()); + ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList())); assertEquals("10m", request.getScrollTime().toString()); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 70066394ce11a..8c81cb54f7901 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -279,12 +279,6 @@ public ActionRequestValidationException validate() { if (scroll) { validationException = addValidationError("using [point in time] is not allowed in a scroll context", validationException); } - if (routing() != null) { - validationException = addValidationError("[routing] cannot be used with point in time", validationException); - } - if (preference() != null) { - validationException = addValidationError("[preference] cannot be used with point in time", validationException); - } } return validationException; } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 4abfbe14ea146..91ba7af424b47 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -19,6 +19,8 @@ package org.elasticsearch.rest.action.search; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchContextId; import org.elasticsearch.action.search.SearchRequest; @@ -51,6 +53,7 @@ import java.util.Set; import java.util.function.IntConsumer; +import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -100,12 +103,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC * company. */ IntConsumer setSize = size -> searchRequest.source().size(size); - request.withContentOrSourceParamParserOrNull(parser -> { - parseSearchRequest(searchRequest, request, parser, setSize); - if (searchRequest.pointInTimeBuilder() != null) { - preparePointInTime(searchRequest, client.getNamedWriteableRegistry()); - } - }); + request.withContentOrSourceParamParserOrNull(parser -> + parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize)); return channel -> { RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel()); @@ -122,6 +121,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC */ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request, XContentParser requestContentParser, + NamedWriteableRegistry namedWriteableRegistry, IntConsumer setSize) throws IOException { if (searchRequest.source() == null) { @@ -175,6 +175,10 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips())); checkRestTotalHits(request, searchRequest); + + if (searchRequest.pointInTimeBuilder() != null) { + preparePointInTime(searchRequest, namedWriteableRegistry); + } } /** @@ -291,6 +295,21 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil static void preparePointInTime(SearchRequest request, NamedWriteableRegistry namedWriteableRegistry) { assert request.pointInTimeBuilder() != null; + ActionRequestValidationException validationException = null; + if (request.indices().length > 0) { + validationException = addValidationError("[indices] cannot be used with point in time", validationException); + } + if (request.indicesOptions() != SearchRequest.DEFAULT_INDICES_OPTIONS) { + validationException = addValidationError("[indicesOptions] cannot be used with point in time", validationException); + } + if (request.routing() != null) { + validationException = addValidationError("[routing] cannot be used with point in time", validationException); + } + if (request.preference() != null) { + validationException = addValidationError("[preference] cannot be used with point in time", validationException); + } + ExceptionsHelper.reThrowIfNotNull(validationException); + final IndicesOptions indicesOptions = request.indicesOptions(); final IndicesOptions stricterIndicesOptions = IndicesOptions.fromOptions( indicesOptions.ignoreUnavailable(), indicesOptions.allowNoIndices(), false, false, false, diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index d7f2bbe69bb05..fbd6b709eeeaa 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -175,28 +175,6 @@ public void testValidate() throws IOException { assertEquals(1, validationErrors.validationErrors().size()); assertEquals("using [point in time] is not allowed in a scroll context", validationErrors.validationErrors().get(0)); } - { - // Reader context with preference - SearchRequest searchRequest = new SearchRequest() - .source(new SearchSourceBuilder(). - pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder("id", TimeValue.timeValueMillis(between(1, 10))))) - .preference("test"); - ActionRequestValidationException validationErrors = searchRequest.validate(); - assertNotNull(validationErrors); - assertEquals(1, validationErrors.validationErrors().size()); - assertEquals("[preference] cannot be used with point in time", validationErrors.validationErrors().get(0)); - } - { - // Reader context with routing - SearchRequest searchRequest = new SearchRequest() - .source(new SearchSourceBuilder() - .pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder("id", TimeValue.timeValueMillis(between(1, 10))))) - .routing("test"); - ActionRequestValidationException validationErrors = searchRequest.validate(); - assertNotNull(validationErrors); - assertEquals(1, validationErrors.validationErrors().size()); - assertEquals("[routing] cannot be used with point in time", validationErrors.validationErrors().get(0)); - } } public void testCopyConstructor() throws IOException { diff --git a/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java b/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java index d4f75b1fd39b1..2d36bdac6c25c 100644 --- a/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java +++ b/x-pack/plugin/async-search/qa/security/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.hamcrest.CustomMatcher; +import org.hamcrest.Matcher; import org.junit.Before; import java.io.IOException; @@ -39,6 +40,7 @@ import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; +import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -75,14 +77,7 @@ public void indexDocuments() throws IOException { public void testWithDlsAndFls() throws Exception { Response submitResp = submitAsyncSearch("*", "*", TimeValue.timeValueSeconds(10), "user_dls"); assertOK(submitResp); - String id = extractResponseId(submitResp); - Response getResp = getAsyncSearch(id, "user_dls"); - AsyncSearchResponse searchResponse = AsyncSearchResponse.fromXContent(XContentHelper.createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, - new BytesArray(EntityUtils.toByteArray(getResp.getEntity())), - XContentType.JSON)); - SearchHit[] hits = searchResponse.getSearchResponse().getHits().getHits(); - + SearchHit[] hits = getSearchHits(extractResponseId(submitResp), "user_dls"); assertThat(hits, arrayContainingInAnyOrder( new CustomMatcher("\"index\" doc 1 matcher") { @Override @@ -151,6 +146,139 @@ private void testCase(String user, String other) throws Exception { assertThat(exc.getMessage(), containsString("unauthorized")); } + private SearchHit[] getSearchHits(String asyncId, String user) throws IOException { + final Response resp = getAsyncSearch(asyncId, user); + assertOK(resp); + AsyncSearchResponse searchResponse = AsyncSearchResponse.fromXContent(XContentHelper.createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + new BytesArray(EntityUtils.toByteArray(resp.getEntity())), + XContentType.JSON)); + return searchResponse.getSearchResponse().getHits().getHits(); + } + + public void testAuthorizationOfPointInTime() throws Exception { + String authorizedUser = randomFrom("user1", "user2"); + final Matcher hitMatcher = new CustomMatcher<>("hit") { + @Override + public boolean matches(Object actual) { + SearchHit hit = (SearchHit) actual; + return hit.getIndex().equals("index-" + authorizedUser) && hit.getId().equals("0"); + } + }; + final String pitId = openPointInTime(new String[]{"index-" + authorizedUser}, authorizedUser); + try { + Response submit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), authorizedUser); + assertOK(submit); + final Response resp = getAsyncSearch(extractResponseId(submit), authorizedUser); + assertOK(resp); + assertThat(getSearchHits(extractResponseId(resp), authorizedUser), arrayContainingInAnyOrder(hitMatcher)); + + String unauthorizedUser = randomValueOtherThan(authorizedUser, () -> randomFrom("user1", "user2")); + ResponseException exc = expectThrows(ResponseException.class, + () -> submitAsyncSearchWithPIT(pitId, "*:*", TimeValue.timeValueSeconds(10), unauthorizedUser)); + assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403)); + assertThat(exc.getMessage(), containsString("unauthorized")); + + } finally { + closePointInTime(pitId, authorizedUser); + } + } + + public void testRejectPointInTimeWithIndices() throws Exception { + String authorizedUser = randomFrom("user1", "user2"); + final String pitId = openPointInTime(new String[]{"index-" + authorizedUser}, authorizedUser); + try { + final Request request = new Request("POST", "/_async_search"); + setRunAsHeader(request, authorizedUser); + request.addParameter("wait_for_completion_timeout", "true"); + request.addParameter("keep_on_completion", "true"); + if (randomBoolean()) { + request.addParameter("index", "index-" + authorizedUser); + } else { + request.addParameter("index", "*"); + } + final XContentBuilder requestBody = JsonXContent.contentBuilder() + .startObject() + .startObject("pit") + .field("id", pitId) + .field("keep_alive", "1m") + .endObject() + .endObject(); + request.setJsonEntity(Strings.toString(requestBody)); + final ResponseException exc = expectThrows(ResponseException.class, () -> client().performRequest(request)); + assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(400)); + assertThat(exc.getMessage(), containsString("[indices] cannot be used with point in time")); + } finally { + closePointInTime(pitId, authorizedUser); + } + } + + public void testSharingPointInTime() throws Exception { + final Matcher hitMatcher = new CustomMatcher<>("index") { + @Override + public boolean matches(Object actual) { + SearchHit hit = (SearchHit) actual; + return hit.getIndex().equals("index") && hit.getId().equals("0"); + } + }; + String firstUser = randomFrom("user1", "user2"); + final String pitId = openPointInTime(new String[]{"index"}, firstUser); + try { + { + Response firstSubmit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), firstUser); + assertOK(firstSubmit); + final Response firstResp = getAsyncSearch(extractResponseId(firstSubmit), firstUser); + assertOK(firstResp); + final SearchHit[] firstHits = getSearchHits(extractResponseId(firstResp), firstUser); + assertThat(firstHits, arrayContainingInAnyOrder(hitMatcher)); + } + { + String secondUser = randomValueOtherThan(firstUser, () -> randomFrom("user1", "user2")); + Response secondSubmit = submitAsyncSearchWithPIT(pitId, "foo:bar", TimeValue.timeValueSeconds(10), secondUser); + assertOK(secondSubmit); + final Response secondResp = getAsyncSearch(extractResponseId(secondSubmit), secondUser); + assertOK(secondResp); + final SearchHit[] secondHits = getSearchHits(extractResponseId(secondResp), secondUser); + assertThat(secondHits, arrayContainingInAnyOrder(hitMatcher)); + } + } finally { + closePointInTime(pitId, firstUser); + } + } + + public void testWithDLSPointInTime() throws Exception { + final String pitId = openPointInTime(new String[]{"index"}, "user1"); + try { + Response userResp = submitAsyncSearchWithPIT(pitId, "*", TimeValue.timeValueSeconds(10), "user1"); + assertOK(userResp); + assertThat(getSearchHits(extractResponseId(userResp), "user1"), arrayWithSize(3)); + + Response dlsResp = submitAsyncSearchWithPIT(pitId, "*", TimeValue.timeValueSeconds(10), "user_dls"); + assertOK(dlsResp); + assertThat(getSearchHits(extractResponseId(dlsResp), "user_dls"), arrayContainingInAnyOrder( + new CustomMatcher("\"index\" doc 1 matcher") { + @Override + public boolean matches(Object actual) { + SearchHit hit = (SearchHit) actual; + return "index".equals(hit.getIndex()) && + "1".equals(hit.getId()) && + hit.getSourceAsMap().isEmpty(); + } + }, + new CustomMatcher("\"index\" doc 2 matcher") { + @Override + public boolean matches(Object actual) { + SearchHit hit = (SearchHit) actual; + return "index".equals(hit.getIndex()) && + "2".equals(hit.getId()) && + "boo".equals(hit.getSourceAsMap().get("baz")); + } + })); + } finally { + closePointInTime(pitId, "user1"); + } + } + static String extractResponseId(Response response) throws IOException { Map map = toMap(response); return (String) map.get("id"); @@ -219,4 +347,42 @@ static void setRunAsHeader(Request request, String user) { builder.addHeader(RUN_AS_USER_HEADER, user); request.setOptions(builder); } + + private String openPointInTime(String[] indexNames, String user) throws IOException { + final Request request = new Request("POST", "/_pit"); + request.addParameter("index", String.join(",", indexNames)); + setRunAsHeader(request, user); + request.addParameter("keep_alive", between(1, 5) + "m"); + final Response response = client().performRequest(request); + assertOK(response); + return (String) toMap(response).get("id"); + } + + static Response submitAsyncSearchWithPIT(String pit, String query, TimeValue waitForCompletion, String user) throws IOException { + final Request request = new Request("POST", "/_async_search"); + setRunAsHeader(request, user); + request.addParameter("wait_for_completion_timeout", waitForCompletion.toString()); + request.addParameter("q", query); + request.addParameter("keep_on_completion", "true"); + final XContentBuilder requestBody = JsonXContent.contentBuilder() + .startObject() + .startObject("pit") + .field("id", pit) + .field("keep_alive", "1m") + .endObject() + .endObject(); + request.setJsonEntity(Strings.toString(requestBody)); + return client().performRequest(request); + } + + private void closePointInTime(String pitId, String user) throws IOException { + final Request request = new Request("DELETE", "/_pit"); + setRunAsHeader(request, user); + final XContentBuilder requestBody = JsonXContent.contentBuilder() + .startObject() + .field("id", pitId) + .endObject(); + request.setJsonEntity(Strings.toString(requestBody)); + assertOK(client().performRequest(request)); + } } diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index fd8c17b4edc3f..c7845c5ac799d 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -234,16 +234,17 @@ public void testCleanupOnFailure() throws Exception { } public void testInvalidId() throws Exception { - SearchResponseIterator it = - assertBlockingIterator(indexName, numShards, new SearchSourceBuilder(), randomBoolean() ? 1 : 0, 2); - AsyncSearchResponse response = it.next(); - ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncSearch("invalid")); - assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class)); - assertThat(exc.getMessage(), containsString("invalid id")); - while (it.hasNext()) { - response = it.next(); + try (SearchResponseIterator it = + assertBlockingIterator(indexName, numShards, new SearchSourceBuilder(), randomBoolean() ? 1 : 0, 2)) { + AsyncSearchResponse response = it.next(); + ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncSearch("invalid")); + assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class)); + assertThat(exc.getMessage(), containsString("invalid id")); + while (it.hasNext()) { + response = it.next(); + } + assertFalse(response.isRunning()); } - assertFalse(response.isRunning()); } public void testNoIndex() throws Exception { diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java index 2b4eedc1c866b..36dfd12134ce4 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java @@ -49,7 +49,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli // pre_filter_shard_size and ccs_minimize_roundtrips get set to the search request although the REST spec don't list //them as supported. We rely on SubmitAsyncSearchRequest#validate to fail in case they are set. request.withContentOrSourceParamParserOrNull(parser -> - parseSearchRequest(submit.getSearchRequest(), request, parser, setSize)); + parseSearchRequest(submit.getSearchRequest(), request, parser, client.getNamedWriteableRegistry(), setSize)); if (request.hasParam("wait_for_completion_timeout")) { submit.setWaitForCompletionTimeout(request.paramAsTime("wait_for_completion_timeout", submit.getWaitForCompletionTimeout())); diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index 18c65b78495ad..f4849a1ad0b89 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -36,7 +36,11 @@ import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; import org.elasticsearch.xpack.ilm.IndexLifecycle; @@ -50,6 +54,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX; import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING; @@ -207,7 +212,22 @@ protected SearchResponseIterator assertBlockingIterator(String indexName, SearchSourceBuilder source, int numFailures, int progressStep) throws Exception { - SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, indexName); + final String pitId; + final SubmitAsyncSearchRequest request; + if (randomBoolean()) { + OpenPointInTimeRequest openPIT = new OpenPointInTimeRequest( + new String[]{indexName}, + OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS, + TimeValue.timeValueMinutes(between(1, 5)), + null, + null); + pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPIT).actionGet().getSearchContextId(); + source.pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder(pitId, TimeValue.timeValueMinutes(1))); + request = new SubmitAsyncSearchRequest(source); + } else { + pitId = null; + request = new SubmitAsyncSearchRequest(source, indexName); + } request.setBatchedReduceSize(progressStep); request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)); BlockingQueryBuilder.QueryLatch queryLatch = BlockingQueryBuilder.acquireQueryLatch(numFailures); @@ -223,6 +243,7 @@ protected SearchResponseIterator assertBlockingIterator(String indexName, return new SearchResponseIterator() { private AsyncSearchResponse response = initial; private boolean isFirst = true; + private final AtomicBoolean closed = new AtomicBoolean(); @Override public boolean hasNext() { @@ -283,7 +304,12 @@ private AsyncSearchResponse doNext() throws Exception { @Override public void close() { - queryLatch.close(); + if (closed.compareAndSet(false, true)) { + if (pitId != null) { + client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet(); + } + queryLatch.close(); + } } }; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/RestOpenPointInTimeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/RestOpenPointInTimeAction.java index ba2c2721929bb..d7192341e6e24 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/RestOpenPointInTimeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/RestOpenPointInTimeAction.java @@ -28,7 +28,9 @@ public String getName() { @Override public List routes() { - return List.of(new Route(POST, "/{index}/_pit")); + return List.of( + new Route(POST, "/{index}/_pit"), + new Route(POST, "/_pit")); } @Override diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestRollupSearchAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestRollupSearchAction.java index 9a29b0f0c7ec6..a17e9f5332ff1 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestRollupSearchAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestRollupSearchAction.java @@ -37,7 +37,8 @@ public List routes() { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { SearchRequest searchRequest = new SearchRequest(); restRequest.withContentOrSourceParamParserOrNull(parser -> - RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> searchRequest.source().size(size))); + RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, + client.getNamedWriteableRegistry(), size -> searchRequest.source().size(size))); RestSearchAction.checkRestTotalHits(restRequest, searchRequest); return channel -> client.execute(RollupSearchAction.INSTANCE, searchRequest, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/20-with-poin-in-time.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/20-with-poin-in-time.yml new file mode 100644 index 0000000000000..e02ae478fc5f2 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/20-with-poin-in-time.yml @@ -0,0 +1,78 @@ +--- +"Async search with point in time": + - skip: + version: " - 7.99.99" + reason: "point in time is introduced in 8.0" + - do: + indices.create: + index: test-1 + body: + settings: + number_of_shards: "2" + + - do: + indices.create: + index: test-2 + body: + settings: + number_of_shards: "1" + + - do: + indices.create: + index: test-3 + body: + settings: + number_of_shards: "3" + + - do: + index: + index: test-2 + body: { max: 2 } + + - do: + index: + index: test-1 + body: { max: 1 } + + - do: + index: + index: test-3 + body: { max: 3 } + + - do: + indices.refresh: {} + + - do: + open_point_in_time: + index: test-* + keep_alive: 5m + - set: {id: point_in_time_id} + + - do: + async_search.submit: + batched_reduce_size: 2 + wait_for_completion_timeout: 10s + body: + query: + match_all: {} + aggs: + max: + max: + field: max + sort: max + pit: + id: "$point_in_time_id" + keep_alive: 1m + + - is_false: id + - match: { is_partial: false } + - length: { response.hits.hits: 3 } + - match: { response.hits.hits.0._source.max: 1 } + - match: { response.aggregations.max.value: 3.0 } + + - do: + close_point_in_time: + body: + id: "$point_in_time_id" + +