Skip to content

Commit

Permalink
Add async_search.submit API to HLRC (#53592)
Browse files Browse the repository at this point in the history
This commit adds a new AsyncSearchClient to the High Level Rest Client which
initially supporst the submitAsyncSearch in its blocking and non-blocking
flavour. Also adding client side request and response objects and parsing code
to parse the xContent output of the client side AsyncSearchResponse together
with parsing roundtrip tests and a simple roundtrip integration test.

Relates to #49091
  • Loading branch information
Christoph Büscher authored Mar 19, 2020
1 parent 2e617c3 commit dea6b7d
Show file tree
Hide file tree
Showing 14 changed files with 1,020 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.asyncsearch.AsyncSearchResponse;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;

import java.io.IOException;

import static java.util.Collections.emptySet;

public class AsyncSearchClient {
private final RestHighLevelClient restHighLevelClient;

AsyncSearchClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}

/**
* Submit a new async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options,
AsyncSearchResponse::fromXContent, emptySet());
}

/**
* Asynchronously submit a new async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
* <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-ilm-ilm-get-lifecycle-policy.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable submitAsyncSearchAsync(SubmitAsyncSearchRequest request, RequestOptions options,
ActionListener<AsyncSearchResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options,
AsyncSearchResponse::fromXContent, listener, emptySet());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.RequestConverters.Params;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;
import org.elasticsearch.rest.action.search.RestSearchAction;

import java.io.IOException;
import java.util.Locale;

import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;

final class AsyncSearchRequestConverters {

static Request submitAsyncSearch(SubmitAsyncSearchRequest asyncSearchRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder().addCommaSeparatedPathParts(
asyncSearchRequest.getIndices())
.addPathPartAsIs("_async_search").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new RequestConverters.Params();
// add all typical search params and search request source as body
addSearchRequestParams(params, asyncSearchRequest);
if (asyncSearchRequest.getSearchSource() != null) {
request.setEntity(RequestConverters.createEntity(asyncSearchRequest.getSearchSource(), REQUEST_BODY_CONTENT_TYPE));
}
// set async search submit specific parameters
if (asyncSearchRequest.isCleanOnCompletion() != null) {
params.putParam("clean_on_completion", asyncSearchRequest.isCleanOnCompletion().toString());
}
if (asyncSearchRequest.getKeepAlive() != null) {
params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep());
}
if (asyncSearchRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", asyncSearchRequest.getWaitForCompletion().getStringRep());
}
request.addParameters(params.asMap());
return request;
}

static void addSearchRequestParams(Params params, SubmitAsyncSearchRequest request) {
params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true");
params.withRouting(request.getRouting());
params.withPreference(request.getPreference());
params.withIndicesOptions(request.getIndicesOptions());
params.withSearchType(request.getSearchType().name().toLowerCase(Locale.ROOT));
params.withMaxConcurrentShardRequests(request.getMaxConcurrentShardRequests());
if (request.getRequestCache() != null) {
params.withRequestCache(request.getRequestCache());
}
if (request.getAllowPartialSearchResults() != null) {
params.withAllowPartialResults(request.getAllowPartialSearchResults());
}
params.withBatchedReduceSize(request.getBatchedReduceSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,22 +403,22 @@ static Request search(SearchRequest searchRequest, String searchEndpoint) throws
return request;
}

private static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true");
params.withRouting(searchRequest.routing());
params.withPreference(searchRequest.preference());
params.withIndicesOptions(searchRequest.indicesOptions());
params.putParam("search_type", searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
params.putParam("max_concurrent_shard_requests", Integer.toString(searchRequest.getMaxConcurrentShardRequests()));
params.withMaxConcurrentShardRequests(searchRequest.getMaxConcurrentShardRequests());
if (searchRequest.requestCache() != null) {
params.putParam("request_cache", Boolean.toString(searchRequest.requestCache()));
params.withRequestCache(searchRequest.requestCache());
}
if (searchRequest.allowPartialSearchResults() != null) {
params.putParam("allow_partial_search_results", Boolean.toString(searchRequest.allowPartialSearchResults()));
params.withAllowPartialResults(searchRequest.allowPartialSearchResults());
}
params.putParam("batched_reduce_size", Integer.toString(searchRequest.getBatchedReduceSize()));
params.withBatchedReduceSize(searchRequest.getBatchedReduceSize());
if (searchRequest.scroll() != null) {
params.putParam("scroll", searchRequest.scroll().keepAlive());
}
Expand Down Expand Up @@ -860,6 +860,26 @@ Params withPreference(String preference) {
return putParam("preference", preference);
}

Params withSearchType(String searchType) {
return putParam("search_type", searchType);
}

Params withMaxConcurrentShardRequests(int maxConcurrentShardRequests) {
return putParam("max_concurrent_shard_requests", Integer.toString(maxConcurrentShardRequests));
}

Params withBatchedReduceSize(int batchedReduceSize) {
return putParam("batched_reduce_size", Integer.toString(batchedReduceSize));
}

Params withRequestCache(boolean requestCache) {
return putParam("request_cache", Boolean.toString(requestCache));
}

Params withAllowPartialResults(boolean allowPartialSearchResults) {
return putParam("allow_partial_search_results", Boolean.toString(allowPartialSearchResults));
}

Params withRealtime(boolean realtime) {
if (realtime == false) {
return putParam("realtime", Boolean.FALSE.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ public class RestHighLevelClient implements Closeable {
private final TransformClient transformClient = new TransformClient(this);
private final EnrichClient enrichClient = new EnrichClient(this);
private final EqlClient eqlClient = new EqlClient(this);
private final AsyncSearchClient asyncSearchClient = new AsyncSearchClient(this);

/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
Expand Down Expand Up @@ -428,13 +429,23 @@ public final XPackClient xpack() {
* A wrapper for the {@link RestHighLevelClient} that provides methods for
* accessing the Elastic Index Lifecycle APIs.
* <p>
* See the <a href="http://FILL-ME-IN-WE-HAVE-NO-DOCS-YET.com"> X-Pack APIs
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/index-lifecycle-management-api.html"> X-Pack APIs
* on elastic.co</a> for more information.
*/
public IndexLifecycleClient indexLifecycle() {
return ilmClient;
}

/**
* A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Elastic Index Async Search APIs.
* <p>
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> X-Pack APIs on elastic.co</a>
* for more information.
*/
public AsyncSearchClient asyncSearch() {
return asyncSearchClient;
}

/**
* Provides methods for accessing the Elastic Licensed Migration APIs that
* are shipped with the default distribution of Elasticsearch. All of
Expand Down
Loading

0 comments on commit dea6b7d

Please sign in to comment.