Skip to content

Commit

Permalink
Add new x-pack endpoints to asynchronously track the progress of a se…
Browse files Browse the repository at this point in the history
…arch
  • Loading branch information
jimczi committed Dec 6, 2019
1 parent 0b9a9b4 commit 750a81d
Show file tree
Hide file tree
Showing 43 changed files with 2,579 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
* @see org.elasticsearch.client.Client#search(SearchRequest)
* @see SearchResponse
*/
public final class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {
public class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public SearchRequestBuilder setVersion(boolean version) {
sourceBuilder().version(version);
return this;
}

/**
* Should each {@link org.elasticsearch.search.SearchHit} be returned with the
* sequence number and primary term of the last modification of the document.
Expand Down
38 changes: 0 additions & 38 deletions server/src/main/java/org/elasticsearch/client/node/NodeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchProgressActionListener;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
Expand Down Expand Up @@ -108,38 +102,6 @@ > Task executeLocally(ActionType<Response> action, Request request, TaskListener
listener::onResponse, listener::onFailure);
}

/**
* Execute a {@link SearchRequest} locally and track the progress of the request through
* a {@link SearchProgressActionListener}.
*/
public SearchTask executeSearchLocally(SearchRequest request, SearchProgressActionListener listener) {
// we cannot track the progress if remote cluster requests are splitted.
request.setCcsMinimizeRoundtrips(false);
TransportSearchAction action = (TransportSearchAction) actions.get(SearchAction.INSTANCE);
SearchTask task = (SearchTask) taskManager.register("transport", action.actionName, request);
task.setProgressListener(listener);
action.execute(task, request, new ActionListener<>() {
@Override
public void onResponse(SearchResponse response) {
try {
taskManager.unregister(task);
} finally {
listener.onResponse(response);
}
}

@Override
public void onFailure(Exception e) {
try {
taskManager.unregister(task);
} finally {
listener.onFailure(e);
}
}
});
return task;
}

/**
* The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by
* {@link #executeLocally(ActionType, ActionRequest, TaskListener)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
searchRequest.routing(request.param("routing"));
searchRequest.preference(request.param("preference"));
searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));
searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", true));
searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));

checkRestTotalHits(request, searchRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESSingleNodeTestCase;

import java.util.ArrayList;
Expand All @@ -37,6 +39,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -189,7 +192,14 @@ public void onFailure(Exception e) {
throw new AssertionError();
}
};
client.executeSearchLocally(request, listener);
client.executeLocally(SearchAction.INSTANCE, new SearchRequest(request) {
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
SearchTask task = (SearchTask) super.createTask(id, type, action, parentTaskId, headers);
task.setProgressListener(listener);
return task;
}
}, listener);
latch.await();

assertThat(shardsListener.get(), equalTo(expectedShards));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ private String formatStatusCodeMessage(ClientYamlTestResponse restTestResponse,
private static Map<String, Tuple<String, org.hamcrest.Matcher<Integer>>> catches = new HashMap<>();

static {
catches.put("not_modified", tuple("304", equalTo(304)));
catches.put("bad_request", tuple("400", equalTo(400)));
catches.put("unauthorized", tuple("401", equalTo(401)));
catches.put("forbidden", tuple("403", equalTo(403)));
Expand Down
30 changes: 30 additions & 0 deletions x-pack/plugin/async-search/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

evaluationDependsOn(xpackModule('core'))

apply plugin: 'elasticsearch.esplugin'
esplugin {
name 'x-pack-async-search'
description 'A module which allows to track the progress of a search asynchronously.'
classname 'org.elasticsearch.xpack.search.AsyncSearch'
extendedPlugins = ['x-pack-core']
}
archivesBaseName = 'x-pack-async-search'

compileJava.options.compilerArgs << "-Xlint:-rawtypes"
compileTestJava.options.compilerArgs << "-Xlint:-rawtypes"


dependencies {
compileOnly project(":server")

compileOnly project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: xpackModule('ilm'))
}

integTest.enabled = false
18 changes: 18 additions & 0 deletions x-pack/plugin/async-search/qa/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import org.elasticsearch.gradle.test.RestIntegTestTask

apply plugin: 'elasticsearch.build'
test.enabled = false

dependencies {
compile project(':test:framework')
}

subprojects {
project.tasks.withType(RestIntegTestTask) {
final File xPackResources = new File(xpackProject('plugin').projectDir, 'src/test/resources')
project.copyRestSpec.from(xPackResources) {
include 'rest-api-spec/api/**'
}
}

}
24 changes: 24 additions & 0 deletions x-pack/plugin/async-search/qa/rest/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import org.elasticsearch.gradle.test.RestIntegTestTask

apply plugin: 'elasticsearch.testclusters'
apply plugin: 'elasticsearch.standalone-test'

dependencies {
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: xpackModule('async-search'), configuration: 'runtime')
}

task restTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
}

testClusters.restTest {
testDistribution = 'DEFAULT'
setting 'xpack.ml.enabled', 'false'
setting 'xpack.monitoring.enabled', 'false'
setting 'xpack.security.enabled', 'true'
user username: 'async-search-user', password: 'async-search-password'
}

check.dependsOn restTest
test.enabled = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.search;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;

import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;

public class AsyncSearchRestIT extends ESClientYamlSuiteTestCase {

public AsyncSearchRestIT(final ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return ESClientYamlSuiteTestCase.createParameters();
}

@Override
protected Settings restClientSettings() {
final String userAuthHeaderValue = basicAuthHeaderValue("async-search-user",
new SecureString("async-search-password".toCharArray()));
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", userAuthHeaderValue).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
---
"Async search":
- do:
indices.create:
index: test-1

- do:
indices.create:
index: test-2

- do:
indices.create:
index: test-3

- do:
index:
index: test-2
body: { max: 2 }

- do:
index:
index: test-1
body: { max: 1 }

- do:
index:
index: test-3
body: { max: 3 }

- do:
indices.refresh: {}

- do:
async_search.submit:
index: test-*
batched_reduce_size: 2
body:
query:
match_all: {}
aggs:
1:
max:
field: max
sort: max

- set: { id: id }

- do:
async_search.get:
id: "$id"
wait_for_completion: 10s

- set: { version: version }
- match: { version: 4 }
- is_false: partial_response
- length: { response.hits.hits: 3 }
- match: { response.hits.hits.0._source.max: 1 }
- match: { response.aggregations.1.value: 3.0 }

- do:
catch: not_modified
async_search.get:
id: "$id"
last_version: "$version"

- is_false: partial_response
- is_false: response

- do:
async_search.delete:
id: "$id"

- match: { acknowledged: true }

- do:
catch: missing
async_search.delete:
id: "$id"
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.search;

import org.elasticsearch.test.rest.ESRestTestCase;

public class AsyncSearchRestTestCase extends ESRestTestCase {
@Override
protected boolean preserveClusterUponCompletion() {
return true;
}
}
Loading

0 comments on commit 750a81d

Please sign in to comment.