diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchClient.java index ab47b1c5e792a..bbf9c15e20cdf 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchClient.java @@ -21,7 +21,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.asyncsearch.AsyncSearchResponse; +import org.elasticsearch.client.asyncsearch.DeleteAsyncSearchRequest; +import org.elasticsearch.client.asyncsearch.GetAsyncSearchRequest; import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest; +import org.elasticsearch.client.core.AcknowledgedResponse; import java.io.IOException; @@ -42,7 +45,7 @@ public class AsyncSearchClient { * @return the response * @throws IOException in case there is a problem sending the request or parsing back the response */ - public AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request, RequestOptions options) throws IOException { + public AsyncSearchResponse submit(SubmitAsyncSearchRequest request, RequestOptions options) throws IOException { return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options, AsyncSearchResponse::fromXContent, emptySet()); } @@ -57,10 +60,61 @@ public AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request, R * @param listener the listener to be notified upon request completion * @return cancellable that may be used to cancel the request */ - public Cancellable submitAsyncSearchAsync(SubmitAsyncSearchRequest request, RequestOptions options, + public Cancellable submitAsync(SubmitAsyncSearchRequest request, RequestOptions options, ActionListener listener) { return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options, AsyncSearchResponse::fromXContent, listener, emptySet()); } + /** + * Get an async search request. + * See the docs for more. + * + */ + public AsyncSearchResponse get(GetAsyncSearchRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::getAsyncSearch, options, + AsyncSearchResponse::fromXContent, emptySet()); + } + + /** + * Asynchronously get an async search request. + * See the docs for more. + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable getAsync(GetAsyncSearchRequest request, RequestOptions options, + ActionListener listener) { + return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::getAsyncSearch, options, + AsyncSearchResponse::fromXContent, listener, emptySet()); + } + + /** + * Delete an async search request. + * See the docs for more. + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public AcknowledgedResponse delete(DeleteAsyncSearchRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::deleteAsyncSearch, options, + AcknowledgedResponse::fromXContent, emptySet()); + } + + /** + * Asynchronously delete an async search request. + * See the docs for more. + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable deleteAsync(DeleteAsyncSearchRequest request, RequestOptions options, + ActionListener listener) { + return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::deleteAsyncSearch, options, + AcknowledgedResponse::fromXContent, listener, emptySet()); + } + } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchRequestConverters.java index 2d91bd926ef4d..8a63589a55c51 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchRequestConverters.java @@ -19,8 +19,12 @@ package org.elasticsearch.client; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.elasticsearch.client.RequestConverters.Params; +import org.elasticsearch.client.asyncsearch.DeleteAsyncSearchRequest; +import org.elasticsearch.client.asyncsearch.GetAsyncSearchRequest; import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest; import org.elasticsearch.rest.action.search.RestSearchAction; @@ -71,4 +75,29 @@ static void addSearchRequestParams(Params params, SubmitAsyncSearchRequest reque } params.withBatchedReduceSize(request.getBatchedReduceSize()); } + + static Request getAsyncSearch(GetAsyncSearchRequest asyncSearchRequest) throws IOException { + String endpoint = new RequestConverters.EndpointBuilder() + .addPathPartAsIs("_async_search") + .addPathPart(asyncSearchRequest.getId()) + .build(); + Request request = new Request(HttpGet.METHOD_NAME, endpoint); + Params params = new RequestConverters.Params(); + if (asyncSearchRequest.getKeepAlive() != null) { + params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep()); + } + if (asyncSearchRequest.getWaitForCompletion() != null) { + params.putParam("wait_for_completion", asyncSearchRequest.getWaitForCompletion().getStringRep()); + } + request.addParameters(params.asMap()); + return request; + } + + static Request deleteAsyncSearch(DeleteAsyncSearchRequest deleteAsyncSearchRequest) throws IOException { + String endpoint = new RequestConverters.EndpointBuilder() + .addPathPartAsIs("_async_search") + .addPathPart(deleteAsyncSearchRequest.getId()) + .build(); + return new Request(HttpDelete.METHOD_NAME, endpoint); + } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/DeleteAsyncSearchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/DeleteAsyncSearchRequest.java new file mode 100644 index 0000000000000..3b37293212da0 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/DeleteAsyncSearchRequest.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.elasticsearch.client.asyncsearch; + +import org.elasticsearch.client.Validatable; + +import java.util.Objects; + +public class DeleteAsyncSearchRequest implements Validatable { + + private final String id; + + public DeleteAsyncSearchRequest(String id) { + this.id = id; +} + + public String getId() { + return this.id; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DeleteAsyncSearchRequest request = (DeleteAsyncSearchRequest) o; + return Objects.equals(getId(), request.getId()); + } + + @Override + public int hashCode() { + return Objects.hash(getId()); + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequest.java new file mode 100644 index 0000000000000..11ad059349481 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequest.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.elasticsearch.client.asyncsearch; + +import org.elasticsearch.client.Validatable; +import org.elasticsearch.client.ValidationException; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.Objects; +import java.util.Optional; + +public class GetAsyncSearchRequest implements Validatable { + + private TimeValue waitForCompletion; + private TimeValue keepAlive; + + public static final long MIN_KEEPALIVE = TimeValue.timeValueMinutes(1).millis(); + + private final String id; + + public GetAsyncSearchRequest(String id) { + this.id = id; + } + + public String getId() { + return this.id; + } + + public TimeValue getWaitForCompletion() { + return waitForCompletion; + } + + public void setWaitForCompletion(TimeValue waitForCompletion) { + this.waitForCompletion = waitForCompletion; + } + + public TimeValue getKeepAlive() { + return keepAlive; + } + + public void setKeepAlive(TimeValue keepAlive) { + this.keepAlive = keepAlive; + } + + @Override + public Optional validate() { + final ValidationException validationException = new ValidationException(); + if (keepAlive != null && keepAlive.getMillis() < MIN_KEEPALIVE) { + validationException.addValidationError("keep_alive must be greater than 1 minute, got: " + keepAlive.toString()); + } + if (validationException.validationErrors().isEmpty()) { + return Optional.empty(); + } + return Optional.of(validationException); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GetAsyncSearchRequest request = (GetAsyncSearchRequest) o; + return Objects.equals(getId(), request.getId()) + && Objects.equals(getKeepAlive(), request.getKeepAlive()) + && Objects.equals(getWaitForCompletion(), request.getWaitForCompletion()); + } + + @Override + public int hashCode() { + return Objects.hash(getId(), getKeepAlive(), getWaitForCompletion()); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/AsyncSearchRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/AsyncSearchRequestConvertersTests.java index df8fc65c751e6..a3e2c0cea7d9c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/AsyncSearchRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/AsyncSearchRequestConvertersTests.java @@ -19,8 +19,12 @@ package org.elasticsearch.client; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.client.asyncsearch.DeleteAsyncSearchRequest; +import org.elasticsearch.client.asyncsearch.GetAsyncSearchRequest; import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; @@ -112,4 +116,35 @@ private static void setRandomSearchParams(SubmitAsyncSearchRequest request, Map< expectedParams.put("max_concurrent_shard_requests", Integer.toString(request.getMaxConcurrentShardRequests())); } + public void testGetAsyncSearch() throws Exception { + String id = randomAlphaOfLengthBetween(5, 10); + Map expectedParams = new HashMap<>(); + GetAsyncSearchRequest submitRequest = new GetAsyncSearchRequest(id); + if (randomBoolean()) { + TimeValue keepAlive = TimeValue.parseTimeValue(randomTimeValue(), "test"); + submitRequest.setKeepAlive(keepAlive); + expectedParams.put("keep_alive", keepAlive.getStringRep()); + } + if (randomBoolean()) { + TimeValue waitForCompletion = TimeValue.parseTimeValue(randomTimeValue(), "test"); + submitRequest.setWaitForCompletion(waitForCompletion); + expectedParams.put("wait_for_completion", waitForCompletion.getStringRep()); + } + + Request request = AsyncSearchRequestConverters.getAsyncSearch(submitRequest); + String endpoint = "/_async_search/" + id; + assertEquals(HttpGet.METHOD_NAME, request.getMethod()); + assertEquals(endpoint.toString(), request.getEndpoint()); + assertEquals(expectedParams, request.getParameters()); + } + + public void testDeleteAsyncSearch() throws Exception { + String id = randomAlphaOfLengthBetween(5, 10); + DeleteAsyncSearchRequest deleteRequest = new DeleteAsyncSearchRequest(id); + + Request request = AsyncSearchRequestConverters.deleteAsyncSearch(deleteRequest); + assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); + assertEquals("/_async_search/" + id, request.getEndpoint()); + assertTrue(request.getParameters().isEmpty()); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/AsyncSearchIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/AsyncSearchIT.java index a1c608b3c6ef4..38e7351e58836 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/AsyncSearchIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/AsyncSearchIT.java @@ -21,37 +21,50 @@ import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; -import java.util.concurrent.TimeUnit; public class AsyncSearchIT extends ESRestHighLevelClientTestCase { - public void testSubmitAsyncSearchRequest() throws IOException { + public void testAsyncSearch() throws IOException { String index = "test-index"; createIndex(index, Settings.EMPTY); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()); - SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(sourceBuilder, index); - // 15 sec should be enough to make sure we always complete right away - request.setWaitForCompletion(new TimeValue(15, TimeUnit.SECONDS)); - AsyncSearchResponse response = highLevelClient().asyncSearch().submitAsyncSearch(request, RequestOptions.DEFAULT); - assertTrue(response.getVersion() >= 0); - assertFalse(response.isPartial()); - assertTrue(response.getStartTime() > 0); - assertTrue(response.getExpirationTime() > 0); - assertNotNull(response.getSearchResponse()); - if (response.isRunning() == false) { - assertNull(response.getId()); - assertFalse(response.isPartial()); + SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(sourceBuilder, index); + submitRequest.setCleanOnCompletion(false); + AsyncSearchResponse submitResponse = highLevelClient().asyncSearch().submit(submitRequest, RequestOptions.DEFAULT); + assertNotNull(submitResponse.getId()); + assertFalse(submitResponse.isPartial()); + assertTrue(submitResponse.getStartTime() > 0); + assertTrue(submitResponse.getExpirationTime() > 0); + assertNotNull(submitResponse.getSearchResponse()); + if (submitResponse.isRunning() == false) { + assertFalse(submitResponse.isPartial()); } else { - assertTrue(response.isPartial()); - assertNotNull(response.getId()); + assertTrue(submitResponse.isPartial()); + } + + GetAsyncSearchRequest getRequest = new GetAsyncSearchRequest(submitResponse.getId()); + AsyncSearchResponse getResponse = highLevelClient().asyncSearch().get(getRequest, RequestOptions.DEFAULT); + while (getResponse.isRunning()) { + getResponse = highLevelClient().asyncSearch().get(getRequest, RequestOptions.DEFAULT); } - } + assertFalse(getResponse.isRunning()); + assertFalse(getResponse.isPartial()); + assertTrue(getResponse.getStartTime() > 0); + assertTrue(getResponse.getExpirationTime() > 0); + assertNotNull(getResponse.getSearchResponse()); + + DeleteAsyncSearchRequest deleteRequest = new DeleteAsyncSearchRequest(submitResponse.getId()); + AcknowledgedResponse deleteAsyncSearchResponse = highLevelClient().asyncSearch().delete(deleteRequest, + RequestOptions.DEFAULT); + assertNotNull(deleteAsyncSearchResponse); + assertNotNull(deleteAsyncSearchResponse.isAcknowledged()); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequestTests.java new file mode 100644 index 0000000000000..b6861b218cd28 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequestTests.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.asyncsearch; + +import org.elasticsearch.client.ValidationException; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.TimeUnit; + +public class GetAsyncSearchRequestTests extends ESTestCase { + + public void testValidation() { + GetAsyncSearchRequest getAsyncSearchRequest = new GetAsyncSearchRequest(randomAlphaOfLength(10)); + getAsyncSearchRequest.setKeepAlive(new TimeValue(0)); + assertTrue(getAsyncSearchRequest.validate().isPresent()); + ValidationException validationException = getAsyncSearchRequest.validate().get(); + assertEquals(1, validationException.validationErrors().size()); + assertEquals("Validation Failed: 1: keep_alive must be greater than 1 minute, got: 0s;", validationException.getMessage()); + + getAsyncSearchRequest.setKeepAlive(new TimeValue(1, TimeUnit.MINUTES)); + assertFalse(getAsyncSearchRequest.validate().isPresent()); + } +} diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java index b9badfe844340..0dfae5f5446db 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java @@ -38,9 +38,9 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; +import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer; -import org.elasticsearch.xpack.core.security.SecurityContext; import java.io.IOException; import java.nio.ByteBuffer;