diff --git a/docs/reference/cluster/tasks.asciidoc b/docs/reference/cluster/tasks.asciidoc index ed73290883d23..b3457953f46e5 100644 --- a/docs/reference/cluster/tasks.asciidoc +++ b/docs/reference/cluster/tasks.asciidoc @@ -195,3 +195,71 @@ The following command will change the grouping to parent tasks: GET _tasks?group_by=parents -------------------------------------------------- // CONSOLE + +The grouping can be disabled by specifying `none` as a `group_by` parameter: + +[source,js] +-------------------------------------------------- +GET _tasks?group_by=none +-------------------------------------------------- +// CONSOLE + +[float] +=== Identifying running tasks + +The `X-Opaque-Id` header, when provided on the HTTP request header, is going to be returned as a header in the response as well as +in the `headers` field for in the task information. This allows to track certain calls, or associate certain tasks with +a the client that started them: + +[source,sh] +-------------------------------------------------- +curl -i -H "X-Opaque-Id: 123456" "http://localhost:9200/_tasks?group_by=parents" +-------------------------------------------------- +// NOTCONSOLE + +The result will look similar to the following: + +[source,js] +-------------------------------------------------- +HTTP/1.1 200 OK +X-Opaque-Id: 123456 <1> +content-type: application/json; charset=UTF-8 +content-length: 831 + +{ + "tasks" : { + "u5lcZHqcQhu-rUoFaqDphA:45" : { + "node" : "u5lcZHqcQhu-rUoFaqDphA", + "id" : 45, + "type" : "transport", + "action" : "cluster:monitor/tasks/lists", + "start_time_in_millis" : 1513823752749, + "running_time_in_nanos" : 293139, + "cancellable" : false, + "headers" : { + "X-Opaque-Id" : "123456" <2> + }, + "children" : [ + { + "node" : "u5lcZHqcQhu-rUoFaqDphA", + "id" : 46, + "type" : "direct", + "action" : "cluster:monitor/tasks/lists[n]", + "start_time_in_millis" : 1513823752750, + "running_time_in_nanos" : 92133, + "cancellable" : false, + "parent_task_id" : "u5lcZHqcQhu-rUoFaqDphA:45", + "headers" : { + "X-Opaque-Id" : "123456" <3> + } + } + ] + } + } +} +-------------------------------------------------- +// NOTCONSOLE + +<1> id as a part of the response header +<2> id for the tasks that was initiated by the REST request +<3> the child task of the task initiated by the REST request diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index a13bdea0ef2f4..db259de411165 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -81,6 +81,7 @@ import org.junit.Before; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Iterator; @@ -123,6 +124,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { private SearchRequest firstSearchRequest; private PlainActionFuture listener; private String scrollId; + private ThreadPool threadPool; private TaskManager taskManager; private BulkByScrollTask testTask; private WorkerBulkByScrollTaskState worker; @@ -141,7 +143,8 @@ public void setupForTest() { testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest); listener = new PlainActionFuture<>(); scrollId = null; - taskManager = new TaskManager(Settings.EMPTY); + threadPool = new TestThreadPool(getClass().getName()); + taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest); testTask.setWorker(testRequest.getRequestsPerSecond(), null); worker = testTask.getWorkerState(); @@ -159,8 +162,9 @@ private void setupClient(ThreadPool threadPool) { } @After - public void tearDownAndVerifyCommonStuff() { + public void tearDownAndVerifyCommonStuff() throws Exception { client.close(); + terminate(threadPool); } /** diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java index 62a2c34ea582c..3c2f5194fceda 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java @@ -32,6 +32,7 @@ import org.mockito.ArgumentCaptor; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.Consumer; @@ -53,7 +54,7 @@ public class TransportRethrottleActionTests extends ESTestCase { @Before public void createTask() { slices = between(2, 50); - task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID); + task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap()); task.setWorkerCount(slices); } @@ -101,7 +102,8 @@ public void testRethrottleSuccessfulResponse() { List sliceStatuses = new ArrayList<>(slices); for (int i = 0; i < slices; i++) { BulkByScrollTask.Status status = believeableInProgressStatus(i); - tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()))); + tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()), + Collections.emptyMap())); sliceStatuses.add(new BulkByScrollTask.StatusOrException(status)); } rethrottleTestCase(slices, @@ -121,7 +123,8 @@ public void testRethrottleWithSomeSucceeded() { List tasks = new ArrayList<>(); for (int i = succeeded; i < slices; i++) { BulkByScrollTask.Status status = believeableInProgressStatus(i); - tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()))); + tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()), + Collections.emptyMap())); sliceStatuses.add(new BulkByScrollTask.StatusOrException(status)); } rethrottleTestCase(slices - succeeded, diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index b2126b1b61185..efa296b6278af 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -73,7 +73,7 @@ protected Version getCurrentVersion() { } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings); + MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index ee65eb5ccbd0d..c78ae25e44a06 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -77,7 +77,7 @@ protected Version getCurrentVersion() { } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings); + MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml index 29790e2146190..067eba6e4b860 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml @@ -212,3 +212,44 @@ field3: value - match: { hits.total: 1 } - match: { hits.hits.0._id: q3 } + +--- +"Create a task result record in the old cluster": + - do: + indices.create: + index: reindexed_index + body: + settings: + index: + number_of_replicas: 0 + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "reindexed_index", "_type": "doc"}}' + - '{"f1": "1"}' + - '{"index": {"_index": "reindexed_index", "_type": "doc"}}' + - '{"f1": "2"}' + - '{"index": {"_index": "reindexed_index", "_type": "doc"}}' + - '{"f1": "3"}' + - '{"index": {"_index": "reindexed_index", "_type": "doc"}}' + - '{"f1": "4"}' + - '{"index": {"_index": "reindexed_index", "_type": "doc"}}' + - '{"f1": "5"}' + + - do: + reindex: + wait_for_completion: false + body: + source: + index: reindexed_index + size: 1 + dest: + index: reindexed_index_copy + - match: {task: '/.+:\d+/'} + - set: {task: task} + + - do: + tasks.get: + wait_for_completion: true + task_id: $task diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml index af58a2f362c78..338b8728b6a82 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml @@ -126,3 +126,42 @@ field3: value - match: { hits.total: 1 } - match: { hits.hits.0._id: q3 } + +--- +"Find a task result record from the old cluster": + - do: + search: + index: .tasks + body: + query: + match_all: {} + - match: { hits.total: 1 } + - match: { hits.hits.0._id: '/.+:\d+/' } + - set: {hits.hits.0._id: task_id} + + - do: + tasks.get: + wait_for_completion: true + task_id: $task_id + + - is_false: node_failures + - is_true: task + + - do: + headers: { "X-Opaque-Id": "Reindexing Again" } + reindex: + wait_for_completion: false + body: + source: + index: reindexed_index_copy + size: 1 + dest: + index: reindexed_index_another_copy + - match: { task: '/.+:\d+/' } + - set: { task: task_id } + + - do: + tasks.get: + wait_for_completion: true + task_id: $task_id + - match: { task.headers.X-Opaque-Id: "Reindexing Again" } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json index fbe355ee164b0..1110c3c111b99 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json @@ -34,7 +34,7 @@ "group_by": { "type" : "enum", "description": "Group tasks by nodes or parent/child relationships", - "options" : ["nodes", "parents"], + "options" : ["nodes", "parents", "none"], "default" : "nodes" } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yml index dd1c415876fa7..57bf5b629b76a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yml @@ -17,3 +17,19 @@ group_by: parents - is_true: tasks + +--- +"tasks_list headers": + - skip: + version: " - 6.99.99" + reason: task headers has been added in 7.0.0 + + - do: + headers: { "X-Opaque-Id": "That is me" } + tasks.list: + actions: "cluster:monitor/tasks/lists" + group_by: none + + - is_true: tasks + - match: { tasks.0.headers.X-Opaque-Id: "That is me" } + diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 28fd3458b902a..872c217f98091 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -312,6 +312,7 @@ import org.elasticsearch.rest.action.search.RestMultiSearchAction; import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.rest.action.search.RestSearchScrollAction; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.usage.UsageService; @@ -324,6 +325,7 @@ import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.unmodifiableMap; @@ -362,7 +364,10 @@ public ActionModule(boolean transportClient, Settings settings, IndexNameExpress actionFilters = setupActionFilters(actionPlugins); autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver); destructiveOperations = new DestructiveOperations(settings, clusterSettings); - Set headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet()); + Set headers = Stream.concat( + actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()), + Stream.of("X-Opaque-Id") + ).collect(Collectors.toSet()); UnaryOperator restWrapper = null; for (ActionPlugin plugin : actionPlugins) { UnaryOperator newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java index aca1be7adff4c..0bd1ff2945bd7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java @@ -56,7 +56,7 @@ * Transport action that can be used to cancel currently running cancellable tasks. *

* For a task to be cancellable it has to return an instance of - * {@link CancellableTask} from {@link TransportRequest#createTask(long, String, String, TaskId)} + * {@link CancellableTask} from {@link TransportRequest#createTask} */ public class TransportCancelTasksAction extends TransportTasksAction { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java index de5fcf9345d23..88d8ff4679917 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java @@ -186,6 +186,21 @@ public XContentBuilder toXContentGroupedByParents(XContentBuilder builder, Param return builder; } + /** + * Presents a flat list of tasks + */ + public XContentBuilder toXContentGroupedByNone(XContentBuilder builder, Params params) throws IOException { + toXContentCommon(builder, params); + builder.startArray("tasks"); + for (TaskInfo taskInfo : getTasks()) { + builder.startObject(); + taskInfo.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + return builder; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 59a0b5e198087..46d51ee0b40e4 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -77,7 +77,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement /** * Max length of the source document to include into toString() * - * @see ReplicationRequest#createTask(long, java.lang.String, java.lang.String, org.elasticsearch.tasks.TaskId) + * @see ReplicationRequest#createTask */ static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048; diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 07d2229a5366d..2c699bf6d9b3a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.Map; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -428,9 +429,9 @@ public boolean isSuggestOnly() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { // generating description in a lazy way since source can be quite big - return new SearchTask(id, type, action, null, parentTaskId) { + return new SearchTask(id, type, action, null, parentTaskId, headers) { @Override public String getDescription() { StringBuilder sb = new StringBuilder(); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java index be83ef6d5839e..68f6a6afce091 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java @@ -32,6 +32,7 @@ import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -117,8 +118,8 @@ public void readFrom(StreamInput in) throws IOException { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new SearchTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java index d0a1cdd456f47..699448909a2b5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java @@ -22,13 +22,15 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; +import java.util.Map; + /** * Task storing information about a currently running search request. */ public class SearchTask extends CancellableTask { - public SearchTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + public SearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 07822bebaa9ac..81584a7bb6467 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -35,6 +35,7 @@ import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -207,8 +208,8 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new ReplicationTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ReplicationTask(id, type, action, getDescription(), parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java index 2e0baa057b223..1cf8b8bf0ff68 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java @@ -27,6 +27,7 @@ import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; import static java.util.Objects.requireNonNull; @@ -36,8 +37,8 @@ public class ReplicationTask extends Task { private volatile String phase = "starting"; - public ReplicationTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + public ReplicationTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index a63d14d7f9d12..1a57b6a5d9500 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -81,6 +81,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -1228,8 +1229,8 @@ public TaskId getParentTask() { return request.getParentTask(); } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return request.createTask(id, type, action, parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return request.createTask(id, type, action, parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 0d4a87f3bfa24..dc0f7b015632e 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -180,7 +180,7 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings final TransportService transportService = new TransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), boundTransportAddress -> DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 0), - UUIDs.randomBase64UUID()), null); + UUIDs.randomBase64UUID()), null, Collections.emptySet()); modules.add((b -> { b.bind(BigArrays.class).toInstance(bigArrays); b.bind(PluginsService.class).toInstance(pluginsService); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 9d2c221fcb24d..fc4163ddd19b2 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Map; import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -408,8 +409,8 @@ protected Self doForSlice(Self request, TaskId slicingTask, int totalSlices) { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index d5e656489558c..276484b055253 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.CancellableTask; @@ -38,6 +37,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import static java.lang.Math.min; @@ -62,8 +62,8 @@ public class BulkByScrollTask extends CancellableTask { private volatile LeaderBulkByScrollTaskState leaderState; private volatile WorkerBulkByScrollTaskState workerState; - public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index b1bd1c5b3138e..1e31eae7d417f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -271,8 +272,8 @@ public ResyncRequest(ShardId shardId, String allocationId) { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new ResyncTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ResyncTask(id, type, action, getDescription(), parentTaskId, headers); } @Override @@ -297,8 +298,8 @@ public static class ResyncTask extends Task { private volatile int resyncedOperations; private volatile int skippedOperations; - public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 21ce12c59f8fb..62fc271f99084 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -156,6 +156,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -424,8 +425,12 @@ protected Node(final Environment environment, Collection metaDataIndexUpgradeService, metaDataUpgrader); new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); + Set taskHeaders = Stream.concat( + pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), + Stream.of("X-Opaque-Id") + ).collect(Collectors.toSet()); final TransportService transportService = newTransportService(settings, transport, threadPool, - networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); + networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService); final SearchTransportService searchTransportService = new SearchTransportService(settings, transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); @@ -543,8 +548,8 @@ static void warnIfPreRelease(final Version version, final boolean isSnapshot, fi protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, - ClusterSettings clusterSettings) { - return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings); + ClusterSettings clusterSettings, Set taskHeaders) { + return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); } protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) { diff --git a/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java b/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java index 377da56f6018b..41f0ed86116ad 100644 --- a/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java @@ -84,6 +84,13 @@ default Collection getRestHeaders() { return Collections.emptyList(); } + /** + * Returns headers which should be copied from internal requests into tasks. + */ + default Collection getTaskHeaders() { + return Collections.emptyList(); + } + /** * Returns a function used to wrap each rest request before handling the request. * diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java index 6ef5d5a2de2bf..8e6447e0e4980 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java @@ -103,10 +103,21 @@ public RestResponse buildResponse(T response, XContentBuilder builder) throws Ex return new BytesRestResponse(RestStatus.OK, builder); } }; + } else if ("none".equals(groupBy)) { + return new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(T response, XContentBuilder builder) throws Exception { + builder.startObject(); + response.toXContentGroupedByNone(builder, channel.request()); + builder.endObject(); + return new BytesRestResponse(RestStatus.OK, builder); + } + }; + } else if ("parents".equals(groupBy)) { return new RestToXContentListener<>(channel); } else { - throw new IllegalArgumentException("[group_by] must be one of [nodes] or [parents] but was [" + groupBy + "]"); + throw new IllegalArgumentException("[group_by] must be one of [nodes], [parents] or [none] but was [" + groupBy + "]"); } } diff --git a/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java b/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java index ac71b84d54f34..6957eea4758d9 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java @@ -31,6 +31,7 @@ import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Map; /** * Shard level fetch base request. Holds all the info needed to execute a fetch. @@ -115,8 +116,8 @@ public void readFrom(StreamInput in) throws IOException { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new SearchTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java index d1fba0f761526..c551205f6b5db 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java @@ -29,6 +29,7 @@ import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Map; public class InternalScrollSearchRequest extends TransportRequest { @@ -76,8 +77,8 @@ public void readFrom(StreamInput in) throws IOException { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new SearchTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java index 76a13b7b02f24..a0fecc9bf8b3d 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Map; /** * Shard level search request that represents an actual search sent from the coordinating node to the nodes holding @@ -177,8 +178,8 @@ public boolean isProfile() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new SearchTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java index c893ed93046f0..dbb14fda71783 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java @@ -32,6 +32,7 @@ import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Map; import static org.elasticsearch.search.dfs.AggregatedDfs.readAggregatedDfs; @@ -91,8 +92,8 @@ public void readFrom(StreamInput in) throws IOException { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new SearchTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); } public String getDescription() { diff --git a/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java b/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java index 685e9bcf35251..1d43076305ccd 100644 --- a/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java +++ b/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.Nullable; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; /** @@ -30,8 +31,8 @@ public abstract class CancellableTask extends Task { private final AtomicReference reason = new AtomicReference<>(); - public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/tasks/Task.java b/server/src/main/java/org/elasticsearch/tasks/Task.java index e59970b84ee47..9fd9019cd213c 100644 --- a/server/src/main/java/org/elasticsearch/tasks/Task.java +++ b/server/src/main/java/org/elasticsearch/tasks/Task.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import java.io.IOException; +import java.util.Map; /** * Current task information @@ -43,6 +44,8 @@ public class Task { private final TaskId parentTask; + private final Map headers; + /** * The task's start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style). */ @@ -53,11 +56,12 @@ public class Task { */ private final long startTimeNanos; - public Task(long id, String type, String action, String description, TaskId parentTask) { - this(id, type, action, description, parentTask, System.currentTimeMillis(), System.nanoTime()); + public Task(long id, String type, String action, String description, TaskId parentTask, Map headers) { + this(id, type, action, description, parentTask, System.currentTimeMillis(), System.nanoTime(), headers); } - public Task(long id, String type, String action, String description, TaskId parentTask, long startTime, long startTimeNanos) { + public Task(long id, String type, String action, String description, TaskId parentTask, long startTime, long startTimeNanos, + Map headers) { this.id = id; this.type = type; this.action = action; @@ -65,6 +69,7 @@ public Task(long id, String type, String action, String description, TaskId pare this.parentTask = parentTask; this.startTime = startTime; this.startTimeNanos = startTimeNanos; + this.headers = headers; } /** @@ -92,7 +97,7 @@ public final TaskInfo taskInfo(String localNodeId, boolean detailed) { */ protected final TaskInfo taskInfo(String localNodeId, String description, Status status) { return new TaskInfo(new TaskId(localNodeId, getId()), getType(), getAction(), description, status, startTime, - System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask); + System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask, headers); } /** @@ -149,6 +154,14 @@ public Status getStatus() { public interface Status extends ToXContentObject, NamedWriteable {} + + /** + * Returns stored task header associated with the task + */ + public String getHeader(String header) { + return headers.get(header); + } + public TaskResult result(DiscoveryNode node, Exception error) throws IOException { return new TaskResult(taskInfo(node.getId(), true), error); } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java index a2364ac8e4047..86ba59ebcc804 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.tasks; +import java.util.Map; + /** * An interface for a request that can be used to register a task manager task */ @@ -47,8 +49,8 @@ default void setParentTask(String parentTaskNode, long parentTaskId) { * A request can override this method and return null to avoid being tracked by the task * manager. */ - default Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new Task(id, type, action, getDescription(), parentTaskId); + default Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new Task(id, type, action, getDescription(), parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java b/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java index d0fd66703e09e..2bd16a9addf6a 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java @@ -19,6 +19,7 @@ package org.elasticsearch.tasks; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -31,6 +32,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -65,8 +68,10 @@ public final class TaskInfo implements Writeable, ToXContentFragment { private final TaskId parentTaskId; + private final Map headers; + public TaskInfo(TaskId taskId, String type, String action, String description, Task.Status status, long startTime, - long runningTimeNanos, boolean cancellable, TaskId parentTaskId) { + long runningTimeNanos, boolean cancellable, TaskId parentTaskId, Map headers) { this.taskId = taskId; this.type = type; this.action = action; @@ -76,6 +81,7 @@ public TaskInfo(TaskId taskId, String type, String action, String description, T this.runningTimeNanos = runningTimeNanos; this.cancellable = cancellable; this.parentTaskId = parentTaskId; + this.headers = headers; } /** @@ -91,6 +97,11 @@ public TaskInfo(StreamInput in) throws IOException { runningTimeNanos = in.readLong(); cancellable = in.readBoolean(); parentTaskId = TaskId.readFromStream(in); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + headers = in.readMap(StreamInput::readString, StreamInput::readString); + } else { + headers = Collections.emptyMap(); + } } @Override @@ -104,6 +115,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(runningTimeNanos); out.writeBoolean(cancellable); parentTaskId.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); + } } public TaskId getTaskId() { @@ -162,6 +176,13 @@ public TaskId getParentTaskId() { return parentTaskId; } + /** + * Returns the task headers + */ + public Map getHeaders() { + return headers; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("node", taskId.getNodeId()); @@ -180,6 +201,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (parentTaskId.isSet()) { builder.field("parent_task_id", parentTaskId.toString()); } + builder.startObject("headers"); + for(Map.Entry attribute : headers.entrySet()) { + builder.field(attribute.getKey(), attribute.getValue()); + } + builder.endObject(); return builder; } @@ -195,10 +221,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws long runningTimeNanos = (Long) a[i++]; boolean cancellable = (Boolean) a[i++]; String parentTaskIdString = (String) a[i++]; - + @SuppressWarnings("unchecked") Map headers = (Map) a[i++]; + if (headers == null) { + // This might happen if we are reading an old version of task info + headers = Collections.emptyMap(); + } RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes); TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString); - return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId); + return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, + headers); }); static { // Note for the future: this has to be backwards and forwards compatible with all changes to the task storage format @@ -212,6 +243,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws PARSER.declareLong(constructorArg(), new ParseField("running_time_in_nanos")); PARSER.declareBoolean(constructorArg(), new ParseField("cancellable")); PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id")); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers")); } @Override @@ -234,11 +266,12 @@ public boolean equals(Object obj) { && Objects.equals(runningTimeNanos, other.runningTimeNanos) && Objects.equals(parentTaskId, other.parentTaskId) && Objects.equals(cancellable, other.cancellable) - && Objects.equals(status, other.status); + && Objects.equals(status, other.status) + && Objects.equals(headers, other.headers); } @Override public int hashCode() { - return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status); + return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status, headers); } } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index afeeeeedd1168..16212e066bbff 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -32,19 +32,26 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; /** * Task Manager service for keeping track of currently running tasks on the nodes @@ -52,6 +59,10 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplier { private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100); + /** Rest headers that are copied to the task */ + private final List taskHeaders; + private final ThreadPool threadPool; + private final ConcurrentMapLong tasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); private final ConcurrentMapLong cancellableTasks = ConcurrentCollections @@ -65,8 +76,13 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES; - public TaskManager(Settings settings) { + private final ByteSizeValue maxHeaderSize; + + public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { super(settings); + this.threadPool = threadPool; + this.taskHeaders = new ArrayList<>(taskHeaders); + this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); } public void setTaskResultsService(TaskResultsService taskResultsService) { @@ -80,7 +96,21 @@ public void setTaskResultsService(TaskResultsService taskResultsService) { * Returns the task manager tracked task or null if the task doesn't support the task manager */ public Task register(String type, String action, TaskAwareRequest request) { - Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask()); + Map headers = new HashMap<>(); + long headerSize = 0; + long maxSize = maxHeaderSize.getBytes(); + ThreadContext threadContext = threadPool.getThreadContext(); + for (String key : taskHeaders) { + String httpHeader = threadContext.getHeader(key); + if (httpHeader != null) { + headerSize += key.length() * 2 + httpHeader.length() * 2; + if (headerSize > maxSize) { + throw new IllegalArgumentException("Request exceeded the maximum size of task headers " + maxHeaderSize); + } + headers.put(key, httpHeader); + } + } + Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers); if (task == null) { return null; } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index 69549c611f1e6..f661095d6bd47 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -34,6 +34,7 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -48,6 +49,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Map; /** * Service that can store task results. @@ -60,6 +62,10 @@ public class TaskResultsService extends AbstractComponent { public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json"; + public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version"; + + public static final int TASK_RESULT_MAPPING_VERSION = 2; + private final Client client; private final ClusterService clusterService; @@ -109,7 +115,7 @@ public void onFailure(Exception e) { }); } else { IndexMetaData metaData = state.getMetaData().index(TASK_INDEX); - if (metaData.getMappings().containsKey(TASK_TYPE) == false) { + if (getTaskResultMappingVersion(metaData) < TASK_RESULT_MAPPING_VERSION) { // The index already exists but doesn't have our mapping client.admin().indices().preparePutMapping(TASK_INDEX).setType(TASK_TYPE) .setSource(taskResultIndexMapping(), XContentType.JSON) @@ -131,6 +137,17 @@ public void onFailure(Exception e) { } } + private int getTaskResultMappingVersion(IndexMetaData metaData) { + MappingMetaData mappingMetaData = metaData.getMappings().get(TASK_TYPE); + if (mappingMetaData == null) { + return 0; + } + @SuppressWarnings("unchecked") Map meta = (Map) mappingMetaData.sourceAsMap().get("_meta"); + if (meta == null || meta.containsKey(TASK_RESULT_MAPPING_VERSION_META_FIELD) == false) { + return 1; // The mapping was created before meta field was introduced + } + return (int) meta.get(TASK_RESULT_MAPPING_VERSION_META_FIELD); + } private void doStoreResult(TaskResult taskResult, ActionListener listener) { IndexRequestBuilder index = client.prepareIndex(TASK_INDEX, TASK_TYPE, taskResult.getTask().getTaskId().toString()); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index a59ffcaa872d2..7687844231ccd 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -61,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -149,7 +150,8 @@ public void close() throws IOException { * updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}. */ public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, - Function localNodeFactory, @Nullable ClusterSettings clusterSettings) { + Function localNodeFactory, @Nullable ClusterSettings clusterSettings, + Set taskHeaders) { super(settings); this.transport = transport; this.threadPool = threadPool; @@ -158,7 +160,7 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa setTracerLogInclude(TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); - taskManager = createTaskManager(); + taskManager = createTaskManager(settings, threadPool, taskHeaders); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings); @@ -184,8 +186,8 @@ public TaskManager getTaskManager() { return taskManager; } - protected TaskManager createTaskManager() { - return new TaskManager(settings); + protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + return new TaskManager(settings, threadPool, taskHeaders); } /** diff --git a/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json b/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json index 0f1a32e1bef81..435e6c5759cbb 100644 --- a/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json +++ b/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json @@ -1,5 +1,8 @@ { "task" : { + "_meta": { + "version": 2 + }, "dynamic" : "strict", "properties" : { "completed": { @@ -37,6 +40,10 @@ }, "description": { "type": "text" + }, + "headers": { + "type" : "object", + "enabled" : false } } }, diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index c28fddf68ad72..6b2e2040bca80 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -45,6 +45,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -91,8 +92,8 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) { @Override public boolean shouldCancelChildrenOnCancellation() { return false; @@ -131,8 +132,8 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) { @Override public boolean shouldCancelChildrenOnCancellation() { return true; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 8927fed567ed9..62313d01b95c3 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -56,9 +56,11 @@ import org.junit.BeforeClass; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -175,15 +177,16 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) { }; transportService = new TransportService(settings, new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), - new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), - new NetworkService(Collections.emptyList())), - threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null) { + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), + new NetworkService(Collections.emptyList())), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null, + Collections.emptySet()) { @Override - protected TaskManager createTaskManager() { + protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { - return new MockTaskManager(settings); + return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(); + return super.createTaskManager(settings, threadPool, taskHeaders); } } }; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java index c5d8b39c3da39..8628a8ee2c391 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.test.ESTestCase; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Map; public class TaskTests extends ESTestCase { @@ -36,7 +37,8 @@ public void testTaskInfoToString() { long runningTime = randomNonNegativeLong(); boolean cancellable = randomBoolean(); TaskInfo taskInfo = new TaskInfo(new TaskId(nodeId, taskId), "test_type", - "test_action", "test_description", null, startTime, runningTime, cancellable, TaskId.EMPTY_TASK_ID); + "test_action", "test_description", null, startTime, runningTime, cancellable, TaskId.EMPTY_TASK_ID, + Collections.singletonMap("foo", "bar")); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); assertEquals(((Number)map.get("id")).longValue(), taskId); @@ -46,6 +48,7 @@ public void testTaskInfoToString() { assertEquals(((Number)map.get("start_time_in_millis")).longValue(), startTime); assertEquals(((Number)map.get("running_time_in_nanos")).longValue(), runningTime); assertEquals(map.get("cancellable"), cancellable); + assertEquals(map.get("headers"), Collections.singletonMap("foo", "bar")); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 778930e7d05ac..b04205ed01813 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -84,6 +84,7 @@ import static java.util.Collections.singleton; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; @@ -355,19 +356,26 @@ public void testSearchTaskDescriptions() { client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}", XContentType.JSON) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - assertSearchResponse(client().prepareSearch("test").setTypes("doc").setQuery(QueryBuilders.matchAllQuery()).get()); + Map headers = new HashMap<>(); + headers.put("X-Opaque-Id", "my_id"); + headers.put("Foo-Header", "bar"); + headers.put("Custom-Task-Header", "my_value"); + assertSearchResponse( + client().filterWithHeader(headers).prepareSearch("test").setTypes("doc").setQuery(QueryBuilders.matchAllQuery()).get()); // the search operation should produce one main task List mainTask = findEvents(SearchAction.NAME, Tuple::v1); assertEquals(1, mainTask.size()); assertThat(mainTask.get(0).getDescription(), startsWith("indices[test], types[doc], search_type[")); assertThat(mainTask.get(0).getDescription(), containsString("\"query\":{\"match_all\"")); + assertTaskHeaders(mainTask.get(0)); // check that if we have any shard-level requests they all have non-zero length description List shardTasks = findEvents(SearchAction.NAME + "[*]", Tuple::v1); for (TaskInfo taskInfo : shardTasks) { assertThat(taskInfo.getParentTaskId(), notNullValue()); assertEquals(mainTask.get(0).getTaskId(), taskInfo.getParentTaskId()); + assertTaskHeaders(taskInfo); switch (taskInfo.getAction()) { case SearchTransportService.QUERY_ACTION_NAME: case SearchTransportService.DFS_ACTION_NAME: @@ -392,6 +400,25 @@ public void testSearchTaskDescriptions() { } + public void testSearchTaskHeaderLimit() { + int maxSize = Math.toIntExact(SETTING_HTTP_MAX_HEADER_SIZE.getDefault(Settings.EMPTY).getBytes() / 2 + 1); + + Map headers = new HashMap<>(); + headers.put("X-Opaque-Id", "my_id"); + headers.put("Custom-Task-Header", randomAlphaOfLengthBetween(maxSize, maxSize + 100)); + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> client().filterWithHeader(headers).admin().cluster().prepareListTasks().get() + ); + assertThat(ex.getMessage(), startsWith("Request exceeded the maximum size of task headers ")); + } + + private void assertTaskHeaders(TaskInfo taskInfo) { + assertThat(taskInfo.getHeaders().keySet(), hasSize(2)); + assertEquals("my_id", taskInfo.getHeaders().get("X-Opaque-Id")); + assertEquals("my_value", taskInfo.getHeaders().get("Custom-Task-Header")); + } + /** * Very basic "is it plugged in" style test that indexes a document and makes sure that you can fetch the status of the process. The * goal here is to verify that the large moving parts that make fetching task status work fit together rather than to verify any @@ -802,24 +829,24 @@ public void testNodeNotFoundButTaskFound() throws Exception { // Save a fake task that looks like it is from a node that isn't part of the cluster CyclicBarrier b = new CyclicBarrier(2); TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class); - resultsService.storeResult( - new TaskResult(new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID), - new RuntimeException("test")), - new ActionListener() { - @Override - public void onResponse(Void response) { - try { - b.await(); - } catch (InterruptedException | BrokenBarrierException e) { - onFailure(e); - } + resultsService.storeResult(new TaskResult( + new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()), + new RuntimeException("test")), + new ActionListener() { + @Override + public void onResponse(Void response) { + try { + b.await(); + } catch (InterruptedException | BrokenBarrierException e) { + onFailure(e); } + } - @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); - } - }); + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }); b.await(); // Now we can find it! diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 88674bfec74d8..5bf000a17bac7 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -59,9 +59,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import static org.elasticsearch.test.ESTestCase.awaitBusy; @@ -76,12 +78,17 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(UnblockTestTasksAction.INSTANCE, TransportUnblockTestTasksAction.class)); } + @Override + public Collection getTaskHeaders() { + return Collections.singleton("Custom-Task-Header"); + } + static class TestTask extends CancellableTask { private volatile boolean blocked = true; - TestTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + TestTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } @Override @@ -178,8 +185,8 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new TestTask(id, type, action, this.getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new TestTask(id, type, action, this.getDescription(), parentTaskId, headers); } } @@ -247,8 +254,8 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) { @Override public boolean shouldCancelChildrenOnCancellation() { return true; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index d0d5be5b4178d..2fb23b26709bd 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -109,9 +109,9 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { if (enableTaskManager) { - return super.createTask(id, type, action, parentTaskId); + return super.createTask(id, type, action, parentTaskId, headers); } else { return null; } @@ -156,9 +156,9 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { if (enableTaskManager) { - return super.createTask(id, type, action, parentTaskId); + return super.createTask(id, type, action, parentTaskId, headers); } else { return null; } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 5141b9cd47187..3bd66af1bab05 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -82,7 +82,7 @@ public void setUp() throws Exception { CapturingTransport capturingTransport = new CapturingTransport(); transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> clusterService.localNode(), null); + boundAddress -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); bulkAction = new TestTransportBulkAction(); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index e35f98e220e03..af8289f0c45b1 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -49,6 +49,7 @@ import org.junit.BeforeClass; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -92,7 +93,7 @@ private TransportBulkAction createAction(boolean controlled, AtomicLong expected CapturingTransport capturingTransport = new CapturingTransport(); TransportService transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> clusterService.localNode(), null); + boundAddress -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY); diff --git a/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java b/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java index 6cc0afa3fadf2..34f9bc15ecfa6 100644 --- a/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.Collections; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Mockito.mock; @@ -68,7 +69,7 @@ public void testMainActionClusterAvailable() { when(clusterService.state()).thenReturn(state); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - x -> null, null); + x -> null, null, Collections.emptySet()); TransportMainAction action = new TransportMainAction(settings, mock(ThreadPool.class), transportService, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), clusterService); AtomicReference responseRef = new AtomicReference<>(); diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 80fbd4cc43ddf..b0ac2ed5fa0d3 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -73,7 +73,7 @@ public Logger getLogger() { @Override public SearchTask getTask() { - return new SearchTask(0, "n/a", "n/a", "test", null); + return new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap()); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index 73743230d1a14..86ad2f75fedf2 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -130,7 +130,8 @@ private TransportMultiSearchAction createTransportMultiSearchAction(boolean cont Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); TaskManager taskManager = mock(TaskManager.class); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) { + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()) { @Override public TaskManager getTaskManager() { return taskManager; diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index 4410507eef92e..3c2a87e8fef00 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -35,8 +35,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; import java.util.Arrays; import java.util.Collections; @@ -54,6 +57,22 @@ public class TransportMultiSearchActionTests extends ESTestCase { + protected ThreadPool threadPool; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getTestName()); + } + + @After + @Override + public void tearDown() throws Exception { + threadPool.shutdown(); + super.tearDown(); + } + public void testBatchExecute() throws Exception { // Initialize dependencies of TransportMultiSearchAction Settings settings = Settings.builder() @@ -63,8 +82,10 @@ public void testBatchExecute() throws Exception { when(actionFilters.filters()).thenReturn(new ActionFilter[0]); ThreadPool threadPool = new ThreadPool(settings); TaskManager taskManager = mock(TaskManager.class); - TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) { + TransportService transportService = new TransportService(Settings.EMPTY, null, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()) { @Override public TaskManager getTaskManager() { return taskManager; @@ -102,8 +123,8 @@ protected void doExecute(SearchRequest request, ActionListener l }); } }; - - TransportMultiSearchAction action = + + TransportMultiSearchAction action = new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10, System::nanoTime); diff --git a/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java b/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java index 3eb1616348d84..d576d440c0263 100644 --- a/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java @@ -25,12 +25,16 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.Node; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; import org.junit.Before; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -48,10 +52,17 @@ public class TransportActionFilterChainTests extends ESTestCase { private AtomicInteger counter; + private ThreadPool threadPool; @Before public void init() throws Exception { counter = new AtomicInteger(); + threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "TransportActionFilterChainTests").build()); + } + + @After + public void shutdown() throws Exception { + terminate(threadPool); } public void testActionFiltersRequest() throws ExecutionException, InterruptedException { @@ -68,7 +79,9 @@ public void testActionFiltersRequest() throws ExecutionException, InterruptedExc String actionName = randomAlphaOfLength(randomInt(30)); ActionFilters actionFilters = new ActionFilters(filters); - TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) { + TransportAction transportAction = + new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())) { @Override protected void doExecute(TestRequest request, ActionListener listener) { listener.onResponse(new TestResponse()); @@ -144,7 +157,8 @@ public void exe String actionName = randomAlphaOfLength(randomInt(30)); ActionFilters actionFilters = new ActionFilters(filters); - TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) { + TransportAction transportAction = new TransportAction(Settings.EMPTY, + actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())) { @Override protected void doExecute(TestRequest request, ActionListener listener) { listener.onResponse(new TestResponse()); diff --git a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index 54253e9620745..470da323043ae 100644 --- a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -67,6 +67,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -191,7 +192,7 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); final TransportService transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); setClusterState(clusterService, TEST_INDEX); diff --git a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index b14b030a5dc88..de65d2a3f9240 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -88,7 +88,7 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(threadPool); transportService = new TransportService(clusterService.getSettings(), transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); localNode = new DiscoveryNode("local_node", buildNewFakeTransportAddress(), Collections.emptyMap(), diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 7d471f77f83d0..60a46876a7126 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -181,7 +181,7 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); int numNodes = randomIntBetween(3, 10); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index 9f1591f6a540b..3aeab0fa5fb5b 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -96,7 +96,7 @@ threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService, new NamedWr new NetworkService(Collections.emptyList())); clusterService = createClusterService(threadPool); transportService = new TransportService(clusterService.getSettings(), transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); broadcastReplicationAction = new TestBroadcastReplicationAction(Settings.EMPTY, threadPool, clusterService, transportService, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index d2472da34f56c..9356fd12a3a5b 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -163,7 +163,7 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(threadPool); transportService = new TransportService(clusterService.getSettings(), transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); @@ -977,7 +977,7 @@ public void testRetryOnReplicaWithRealTransport() throws Exception { new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), Version.CURRENT); transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - x -> clusterService.localNode(),null); + x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); @@ -1040,7 +1040,7 @@ private void assertIndexShardCounter(int expected) { * half the time. */ private ReplicationTask maybeTask() { - return random().nextBoolean() ? new ReplicationTask(0, null, null, null, null) : null; + return random().nextBoolean() ? new ReplicationTask(0, null, null, null, null, null) : null; } /** diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index b3db10f920973..47ce090d895fa 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -62,6 +62,7 @@ import org.junit.BeforeClass; import org.mockito.ArgumentCaptor; +import java.util.Collections; import java.util.HashSet; import java.util.Locale; import java.util.concurrent.ExecutionException; @@ -254,7 +255,7 @@ public void testDocumentFailureInShardOperationOnReplica() throws Exception { public void testReplicaProxy() throws InterruptedException, ExecutionException { CapturingTransport transport = new CapturingTransport(); TransportService transportService = new TransportService(clusterService.getSettings(), transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); ShardStateAction shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); @@ -355,7 +356,8 @@ protected TestAction() { protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) { super(Settings.EMPTY, "test", - new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null), null, + new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()), null, null, null, null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; diff --git a/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java index 29235329d6669..8db45cc5508ef 100644 --- a/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java @@ -53,6 +53,7 @@ import org.junit.Before; import org.junit.BeforeClass; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -144,7 +145,8 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet() + ); transportService.start(); transportService.acceptIncomingRequests(); action = new TestTransportInstanceSingleOperationAction( diff --git a/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java b/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java index 160c14c243cb2..bca04738d8b89 100644 --- a/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java +++ b/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java @@ -59,7 +59,7 @@ private Actions(Settings settings, ThreadPool threadPool, GenericAction[] action private static class InternalTransportAction extends TransportAction { private InternalTransportAction(Settings settings, String actionName, ThreadPool threadPool) { - super(settings, actionName, threadPool, EMPTY_FILTERS, null, new TaskManager(settings)); + super(settings, actionName, threadPool, EMPTY_FILTERS, null, new TaskManager(settings, threadPool, Collections.emptySet())); } @Override diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index ad894906cfb2d..c8030e1cf4aee 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -163,7 +163,7 @@ public void sendRequest(Transport.Connection conne }, (addr) -> { assert addr == null : "boundAddress: " + addr; return DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()); - }, null); + }, null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); transportClientNodesService = diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 51908a45380f0..828b385f85fa5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -155,7 +155,8 @@ public void setUp() throws Exception { this.threadPool = new TestThreadPool(getClass().getName()); this.transport = new MockTransport(); transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null); + boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 58c9651a82af1..b1ff626fa39b8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -48,6 +48,7 @@ import org.junit.Before; import org.junit.BeforeClass; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -108,7 +109,7 @@ public void setUp() throws Exception { this.transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null); diff --git a/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java b/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java index f2537e746ad0c..0f5f4870ae1bb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java @@ -60,6 +60,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -94,7 +95,7 @@ public void setUp() throws Exception { super.setUp(); clusterService = createClusterService(threadPool); transportService = new TransportService(clusterService.getSettings(), new CapturingTransport(), threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); } diff --git a/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index 1a837b825d867..f32e93bb82dbd 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -145,7 +145,7 @@ namedWriteableRegistry, new NetworkService(Collections.emptyList()), version), (boundAddress) -> new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), boundAddress.publishAddress(), Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version), - null); + null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); return transportService; diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index e0593a694d0b4..44914b1958777 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -402,7 +402,8 @@ public BoundTransportAddress boundAddress() { }; closeables.push(transport); final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null); + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); closeables.push(transportService); final int limitPortCounts = randomIntBetween(1, 10); final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( @@ -447,7 +448,8 @@ public BoundTransportAddress boundAddress() { }; closeables.push(transport); final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null); + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); closeables.push(transportService); final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( executorService, @@ -497,7 +499,8 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi closeables.push(transport); final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null); + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); closeables.push(transportService); final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( @@ -555,7 +558,8 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi closeables.push(transport); final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null); + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); closeables.push(transportService); final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 3)); try { @@ -723,7 +727,8 @@ public BoundTransportAddress boundAddress() { closeables.push(transport); final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null); + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); closeables.push(transportService); final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( executorService, @@ -772,7 +777,8 @@ private NetworkHandle startServices( final Transport transport = supplier.apply(nodeSettings, version); final MockTransportService transportService = new MockTransportService(nodeSettings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> - new DiscoveryNode(nodeId, nodeId, boundAddress.publishAddress(), emptyMap(), nodeRoles, version), null); + new DiscoveryNode(nodeId, nodeId, boundAddress.publishAddress(), emptyMap(), nodeRoles, version), null, + Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); final ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); diff --git a/server/src/test/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskStateTests.java b/server/src/test/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskStateTests.java index 2b15181ca3930..16d9df8c820ee 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskStateTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskStateTests.java @@ -26,6 +26,7 @@ import org.mockito.ArgumentCaptor; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static java.util.Collections.emptyList; @@ -42,7 +43,7 @@ public class LeaderBulkByScrollTaskStateTests extends ESTestCase { @Before public void createTask() { slices = between(2, 50); - task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID); + task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap()); task.setWorkerCount(slices); taskState = task.getLeaderState(); } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java index 64bf52c319e68..db624798bb71c 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java @@ -28,6 +28,7 @@ import org.junit.Before; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CyclicBarrier; @@ -52,7 +53,7 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase { @Before public void createTask() { - task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID); + task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap()); task.setWorker(Float.POSITIVE_INFINITY, null); workerState = task.getWorkerState(); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 7e8949cd15fbf..e80c2df4ea060 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -130,7 +130,8 @@ protected class ReplicationGroup implements AutoCloseable, Iterable private final AtomicInteger replicaId = new AtomicInteger(); private final AtomicInteger docId = new AtomicInteger(); boolean closed = false; - private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY), + private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(Settings.EMPTY, + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), (request, parentTask, primaryAllocationId, primaryTerm, listener) -> { try { new ResyncAction(request, listener, ReplicationGroup.this).execute(); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index 71faecfcea59a..618714fc9d959 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -58,7 +58,7 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(threadPool); transportService = new TransportService(clusterService.getSettings(), transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 1b9a0ff629066..433f662062735 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -50,7 +50,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { public void testSyncerSendsOffCorrectDocuments() throws Exception { IndexShard shard = newStartedShard(true); - TaskManager taskManager = new TaskManager(Settings.EMPTY); + TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); AtomicBoolean syncActionCalled = new AtomicBoolean(); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { @@ -112,7 +112,8 @@ public void testSyncerOnClosingShard() throws Exception { syncCalledLatch.countDown(); threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); }; - PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY), syncAction); + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), syncAction); syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately int numDocs = 10; @@ -158,7 +159,8 @@ public void testStatusSerialization() throws IOException { } public void testStatusEquals() throws IOException { - PrimaryReplicaSyncer.ResyncTask task = new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null); + PrimaryReplicaSyncer.ResyncTask task = + new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null, Collections.emptyMap()); task.setPhase(randomAlphaOfLength(10)); task.setResyncedOperations(randomIntBetween(0, 1000)); task.setTotalOperations(randomIntBetween(0, 1000)); @@ -181,7 +183,8 @@ public void testStatusEquals() throws IOException { } public void testStatusReportsCorrectNumbers() throws IOException { - PrimaryReplicaSyncer.ResyncTask task = new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null); + PrimaryReplicaSyncer.ResyncTask task = + new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null, Collections.emptyMap()); task.setPhase(randomAlphaOfLength(10)); task.setResyncedOperations(randomIntBetween(0, 1000)); task.setTotalOperations(randomIntBetween(0, 1000)); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 6e6eaf726a599..dd10dd2747df6 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -159,7 +159,8 @@ public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool th // services TransportService transportService = new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), clusterSettings); + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), clusterSettings, + Collections.emptySet()); MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry, null, null, null) { // metaData upgrader should do nothing diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index bc5a5b95b958a..d76429c53f3a5 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -401,7 +401,8 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod final Settings settings = Settings.builder().put("node.name", discoveryNode.getName()).build(); final TransportService transportService = new TransportService(settings, null, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null); + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()); final ClusterService clusterService = mock(ClusterService.class); final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService, transportService, null); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 92f018f282a43..5ed708ecb7581 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -213,11 +213,11 @@ public void onFailure(Exception e) { SearchPhaseResult searchPhaseResult = service.executeQueryPhase( new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f), - new SearchTask(123L, "", "", "", null)); + new SearchTask(123L, "", "", "", null, Collections.emptyMap())); IntArrayList intCursors = new IntArrayList(1); intCursors.add(0); ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null /* not a scroll */); - service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null)); + service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap())); } catch (AlreadyClosedException ex) { throw ex; } catch (IllegalStateException ex) { diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index d651c92cd611e..06d738cfb6016 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -62,6 +62,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.hamcrest.Matchers.anyOf; @@ -94,7 +95,7 @@ private void countTestCase(Query query, IndexReader reader, boolean shouldCollec TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(query)); context.setSize(0); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); final IndexSearcher searcher = shouldCollect ? new IndexSearcher(reader) : getAssertingEarlyTerminationSearcher(reader, 0); @@ -166,7 +167,7 @@ public void testPostFilterDisablesCountOptimization() throws Exception { IndexReader reader = DirectoryReader.open(dir); IndexSearcher contextSearcher = getAssertingEarlyTerminationSearcher(reader, 0); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); QueryPhase.execute(context, contextSearcher, checkCancelled -> {}); @@ -195,7 +196,7 @@ public void testMinScoreDisablesCountOptimization() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(0); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); QueryPhase.execute(context, contextSearcher, checkCancelled -> {}); assertEquals(1, context.queryResult().topDocs().totalHits); @@ -209,7 +210,7 @@ public void testMinScoreDisablesCountOptimization() throws Exception { public void testQueryCapturesThreadPoolStats() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); Directory dir = newDirectory(); @@ -251,7 +252,7 @@ public void testInOrderScrollOptimization() throws Exception { scrollContext.maxScore = Float.NaN; scrollContext.totalHits = -1; context.scrollContext(scrollContext); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); int size = randomIntBetween(2, 5); context.setSize(size); @@ -290,7 +291,7 @@ public void testTerminateAfterEarlyTermination() throws Exception { } w.close(); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.terminateAfter(1); @@ -384,7 +385,7 @@ public void testIndexSortingEarlyTermination() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(1); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); context.sort(new SortAndFormats(sort, new DocValueFormat[] {DocValueFormat.RAW})); final IndexReader reader = DirectoryReader.open(dir); @@ -471,7 +472,7 @@ public void testIndexSortScrollOptimization() throws Exception { scrollContext.maxScore = Float.NaN; scrollContext.totalHits = -1; context.scrollContext(scrollContext); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); context.setSize(10); context.sort(searchSortAndFormat); diff --git a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java index 6643a71b0962f..be0624d6bba83 100644 --- a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java @@ -22,6 +22,8 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.test.ESTestCase; +import java.util.Collections; + import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -33,10 +35,11 @@ public void testEmptyToString() { public void testNonEmptyToString() { TaskInfo info = new TaskInfo( - new TaskId("node1", 1), "dummy-type", "dummy-action", "dummy-description", null, 0, 1, true, new TaskId("node1", 0)); + new TaskId("node1", 1), "dummy-type", "dummy-action", "dummy-description", null, 0, 1, true, new TaskId("node1", 0), + Collections.singletonMap("foo", "bar")); ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList()); assertEquals("{\"tasks\":{\"node1:1\":{\"node\":\"node1\",\"id\":1,\"type\":\"dummy-type\",\"action\":\"dummy-action\"," + "\"description\":\"dummy-description\",\"start_time_in_millis\":0,\"running_time_in_nanos\":1,\"cancellable\":true," - + "\"parent_task_id\":\"node1:0\"}}}", tasksResponse.toString()); + + "\"parent_task_id\":\"node1:0\",\"headers\":{\"foo\":\"bar\"}}}}", tasksResponse.toString()); } } diff --git a/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java b/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java index e70c2b7119421..d4da4f8f1c5cb 100644 --- a/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java @@ -134,7 +134,9 @@ private static TaskInfo randomTaskInfo() throws IOException { long runningTimeNanos = randomLong(); boolean cancellable = randomBoolean(); TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId(); - return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId); + Map headers = + randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); + return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers); } private static TaskId randomTaskId() { diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index c4fe88d2fce46..08d88ad2e0486 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -73,7 +73,7 @@ private NetworkHandle startServices(String nodeNameAndId, Settings settings, Ver boundAddress.publishAddress(), emptyMap(), emptySet(), - version), null); + version), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); transportServices.add(transportService); diff --git a/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java index a0752b0048564..73cff7717b44d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java @@ -27,6 +27,8 @@ import org.junit.After; import org.junit.Before; +import java.util.Collections; + public abstract class AbstractAsyncBulkByScrollActionTestCase< Request extends AbstractBulkByScrollRequest, Response extends BulkByScrollResponse> @@ -37,7 +39,7 @@ public abstract class AbstractAsyncBulkByScrollActionTestCase< @Before public void setupForTest() { threadPool = new TestThreadPool(getTestName()); - task = new BulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID); + task = new BulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap()); task.setWorker(Float.POSITIVE_INFINITY, null); } diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 5a6075b1f8571..8d2f6e895504c 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -50,6 +50,7 @@ import java.nio.file.Path; import java.util.Collection; import java.util.Collections; +import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; @@ -115,15 +116,15 @@ protected SearchService newSearchService(ClusterService clusterService, IndicesS protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, - ClusterSettings clusterSettings) { + ClusterSettings clusterSettings, Set taskHeaders) { // we use the MockTransportService.TestPlugin class as a marker to create a network // module with this MockNetworkService. NetworkService is such an integral part of the systme // we don't allow to plug it in from plugins or anything. this is a test-only override and // can't be done in a production env. if (getPluginsService().filterPlugins(MockTransportService.TestPlugin.class).isEmpty()) { - return super.newTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings); + return super.newTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); } else { - return new MockTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings); + return new MockTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java index bccbd537a53b4..dec204537b917 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java @@ -27,8 +27,10 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -41,8 +43,8 @@ public class MockTaskManager extends TaskManager { private final Collection listeners = new CopyOnWriteArrayList<>(); - public MockTaskManager(Settings settings) { - super(settings); + public MockTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + super(settings, threadPool, taskHeaders); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index f5efa9a8c56fa..1c31533c9337d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -111,16 +111,16 @@ public static MockTransportService createNewService(Settings settings, Version v NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); final Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); - return createNewService(settings, transport, version, threadPool, clusterSettings); + return createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); } public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, - @Nullable ClusterSettings clusterSettings) { + @Nullable ClusterSettings clusterSettings, Set taskHeaders) { return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(), Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version), - clusterSettings); + clusterSettings, taskHeaders); } private final Transport original; @@ -135,7 +135,7 @@ public MockTransportService(Settings settings, Transport transport, ThreadPool t @Nullable ClusterSettings clusterSettings) { this(settings, transport, threadPool, interceptor, (boundAddress) -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), settings.get(Node.NODE_NAME_SETTING.getKey(), - UUIDs.randomBase64UUID())), clusterSettings); + UUIDs.randomBase64UUID())), clusterSettings, Collections.emptySet()); } /** @@ -146,8 +146,9 @@ public MockTransportService(Settings settings, Transport transport, ThreadPool t */ public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, - @Nullable ClusterSettings clusterSettings) { - super(settings, new LookupTestTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings); + @Nullable ClusterSettings clusterSettings, Set taskHeaders) { + super(settings, new LookupTestTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings, + taskHeaders); this.original = transport; } @@ -160,11 +161,11 @@ public static TransportAddress[] extractTransportAddresses(TransportService tran } @Override - protected TaskManager createTaskManager() { + protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { - return new MockTaskManager(settings); + return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(); + return super.createTaskManager(settings, threadPool, taskHeaders); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 17423207d56c6..20971b3865ea1 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1931,7 +1931,8 @@ public void testHandshakeWithIncompatVersion() { Version version = Version.fromString("2.0.0"); try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); - MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null)) { + MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null, + Collections.emptySet())) { service.start(); service.acceptIncomingRequests(); DiscoveryNode node = @@ -1953,7 +1954,8 @@ public void testHandshakeUpdatesVersion() throws IOException { Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT); try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); - MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null)) { + MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null, + Collections.emptySet())) { service.start(); service.acceptIncomingRequests(); DiscoveryNode node = @@ -1989,7 +1991,7 @@ protected String handleRequest(TcpChannel mockChannel, String profileName, Strea }; try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, Version.CURRENT, threadPool, - null)) { + null, Collections.emptySet())) { service.start(); service.acceptIncomingRequests(); // this acts like a node that doesn't have support for handshakes diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index 916e97ffd1211..e9f5f86462f54 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -50,7 +50,7 @@ protected Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, T } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings); + MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java index bb36ed9f6db1d..bd7fddf82b858 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java @@ -78,7 +78,7 @@ protected Version getCurrentVersion() { }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings); + MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; }