From 55a14230a7b0d563d81184b1158dd21f0104c45c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 11 Jan 2018 18:34:17 -0500 Subject: [PATCH 1/8] Do not keep 5.x commits once having 6.x commits (#28188) Currently we keep a 5.x index commit as a safe commit until we have a 6.x safe commit. During that time, if peer-recovery happens, a primary will send a 5.x commit in file-based sync and the recovery will even fail as the snapshotted commit does not have sequence number tags. This commit updates the combined deletion policy to delete legacy commits if there are 6.x commits. Relates #27606 Relates #28038 --- .../elasticsearch/index/engine/CombinedDeletionPolicy.java | 4 ++-- .../index/engine/CombinedDeletionPolicyTests.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 6a61843c263e4..e5d8cacf73657 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -160,9 +160,9 @@ private static int indexOfKeptCommits(List commits, long if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) { return i + 1; } - // 5.x commits do not contain MAX_SEQ_NO. + // 5.x commits do not contain MAX_SEQ_NO, we should not keep it and the older commits. if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) { - return i; + return Math.min(commits.size() - 1, i + 1); } final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)); if (maxSeqNoFromCommit <= globalCheckpoint) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index ca75c70137b5a..e74cde52aa418 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -171,15 +171,15 @@ public void testLegacyIndex() throws Exception { globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1)); indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit)); - verify(legacyCommit, times(0)).delete(); + verify(legacyCommit, times(1)).delete(); // Do not keep the legacy commit once we have a new commit. verify(freshCommit, times(0)).delete(); - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen)); + assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen)); assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen)); // Make the fresh commit safe. globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE)); indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit)); - verify(legacyCommit, times(1)).delete(); + verify(legacyCommit, times(2)).delete(); verify(freshCommit, times(0)).delete(); assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen)); assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen)); From f15c2044c47ad8ce8e375cec92723f0b3a3fac5d Mon Sep 17 00:00:00 2001 From: Clinton Gormley Date: Fri, 12 Jan 2018 11:19:57 +0100 Subject: [PATCH 2/8] Fixed the cat.health REST test to accept 4ms, not just 4.0ms (#28186) --- .../main/resources/rest-api-spec/test/cat.health/10_basic.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.health/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.health/10_basic.yml index 380caec931899..504b7c8f9b1b6 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.health/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.health/10_basic.yml @@ -45,7 +45,7 @@ \d+ \s+ # init \d+ \s+ # unassign \d+ \s+ # pending_tasks - (-|\d+[.]\d+ms|s) \s+ # max task waiting time + (-|\d+(?:[.]\d+)?m?s) \s+ # max task waiting time \d+\.\d+% # active shards percent \n )+ @@ -72,7 +72,7 @@ \d+ \s+ # init \d+ \s+ # unassign \d+ \s+ # pending_tasks - (-|\d+[.]\d+ms|s) \s+ # max task waiting time + (-|\d+(?:[.]\d+)?m?s) \s+ # max task waiting time \d+\.\d+% # active shards percent \n )+ From c417427ecda8cf93614501c9a3c0672d0cf4f690 Mon Sep 17 00:00:00 2001 From: Andrew Banchich Date: Fri, 12 Jan 2018 05:47:39 -0500 Subject: [PATCH 3/8] [Docs] Spelling fix in painless-getting-started.asciidoc (#28187) --- docs/painless/painless-getting-started.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/painless/painless-getting-started.asciidoc b/docs/painless/painless-getting-started.asciidoc index 98209fc7da9e5..155b5f272b426 100644 --- a/docs/painless/painless-getting-started.asciidoc +++ b/docs/painless/painless-getting-started.asciidoc @@ -234,7 +234,7 @@ POST hockey/player/_update_by_query ---------------------------------------------------------------- // CONSOLE -Using the match operator (`==~`) you can update all the hockey players who's +Using the match operator (`==~`) you can update all the hockey players whose names start with a consonant and end with a vowel: [source,js] From 3a96518b582f014ee17dd2a79bb1ebded10c644f Mon Sep 17 00:00:00 2001 From: Boudewijn <31416818+boudewijnk@users.noreply.github.com> Date: Wed, 10 Jan 2018 18:00:12 +0100 Subject: [PATCH 4/8] Update getting-started.asciidoc (#28145) Replaced single quotation marks with double quotation marks surrounding localhost address --- docs/reference/getting-started.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/reference/getting-started.asciidoc b/docs/reference/getting-started.asciidoc index 2ebe8c038655b..0a6dbd0eb8359 100755 --- a/docs/reference/getting-started.asciidoc +++ b/docs/reference/getting-started.asciidoc @@ -669,8 +669,8 @@ You can download the sample dataset (accounts.json) from https://github.com/elas [source,sh] -------------------------------------------------- -curl -H "Content-Type: application/json" -XPOST 'localhost:9200/bank/account/_bulk?pretty&refresh' --data-binary "@accounts.json" -curl 'localhost:9200/_cat/indices?v' +curl -H "Content-Type: application/json" -XPOST "localhost:9200/bank/account/_bulk?pretty&refresh" --data-binary "@accounts.json" +curl "localhost:9200/_cat/indices?v" -------------------------------------------------- // NOTCONSOLE From a10c406292b1c856ee3cd05c495526b82be320ca Mon Sep 17 00:00:00 2001 From: Andrew Banchich Date: Wed, 10 Jan 2018 11:59:01 -0500 Subject: [PATCH 5/8] text fixes (#28136) --- .../aggregations/pipeline/bucket-selector-aggregation.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc b/docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc index cd0218e7c4353..5dc1b80d4adda 100644 --- a/docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc @@ -6,7 +6,7 @@ in the parent multi-bucket aggregation. The specified metric must be numeric and If the script language is `expression` then a numeric return value is permitted. In this case 0.0 will be evaluated as `false` and all other values will evaluate to true. -NOTE: The bucket_selector aggregation, like all pipeline aggregations, executions after all other sibling aggregations. This means that +NOTE: The bucket_selector aggregation, like all pipeline aggregations, executes after all other sibling aggregations. This means that using the bucket_selector aggregation to filter the returned buckets in the response does not save on execution time running the aggregations. ==== Syntax From 6a5807ad8feffa0e22e6fe4b136c90d73892040d Mon Sep 17 00:00:00 2001 From: akadko Date: Fri, 12 Jan 2018 18:36:48 +0300 Subject: [PATCH 6/8] [DOCS] Removed differencies between text and code (#27993) --- .../aggregations/metrics/cardinality-aggregation.asciidoc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/reference/aggregations/metrics/cardinality-aggregation.asciidoc b/docs/reference/aggregations/metrics/cardinality-aggregation.asciidoc index 938d42a70abd5..d458d377a6077 100644 --- a/docs/reference/aggregations/metrics/cardinality-aggregation.asciidoc +++ b/docs/reference/aggregations/metrics/cardinality-aggregation.asciidoc @@ -5,8 +5,7 @@ A `single-value` metrics aggregation that calculates an approximate count of distinct values. Values can be extracted either from specific fields in the document or generated by a script. -Assume you are indexing books and would like to count the unique authors that -match a query: +Assume you are indexing store sales and would like to count the unique number of sold products that match a query: [source,js] -------------------------------------------------- From c75ac319a655f9a2b37c853913ef989e7159f65e Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Fri, 12 Jan 2018 15:34:17 -0500 Subject: [PATCH 7/8] Add ability to associate an ID with tasks (#27764) Adds support for capturing the X-Opaque-Id header from a REST request and storing it's value in the tasks that this request started. It works for all user-initiated tasks (not only search). Closes #23250 Usage: ``` $ curl -H "X-Opaque-Id: imotov" -H "foo:bar" "localhost:9200/_tasks?pretty&group_by=parents" { "tasks" : { "7qrTVbiDQKiZfubUP7DPkg:6998" : { "node" : "7qrTVbiDQKiZfubUP7DPkg", "id" : 6998, "type" : "transport", "action" : "cluster:monitor/tasks/lists", "start_time_in_millis" : 1513029940042, "running_time_in_nanos" : 266794, "cancellable" : false, "headers" : { "X-Opaque-Id" : "imotov" }, "children" : [ { "node" : "V-PuCjPhRp2ryuEsNw6V1g", "id" : 6088, "type" : "netty", "action" : "cluster:monitor/tasks/lists[n]", "start_time_in_millis" : 1513029940043, "running_time_in_nanos" : 67785, "cancellable" : false, "parent_task_id" : "7qrTVbiDQKiZfubUP7DPkg:6998", "headers" : { "X-Opaque-Id" : "imotov" } }, { "node" : "7qrTVbiDQKiZfubUP7DPkg", "id" : 6999, "type" : "direct", "action" : "cluster:monitor/tasks/lists[n]", "start_time_in_millis" : 1513029940043, "running_time_in_nanos" : 98754, "cancellable" : false, "parent_task_id" : "7qrTVbiDQKiZfubUP7DPkg:6998", "headers" : { "X-Opaque-Id" : "imotov" } } ] } } } ``` --- docs/reference/cluster/tasks.asciidoc | 68 +++++++++++++++++++ .../reindex/AsyncBulkByScrollActionTests.java | 8 ++- .../TransportRethrottleActionTests.java | 9 ++- .../netty4/SimpleNetty4TransportTests.java | 2 +- .../nio/SimpleNioTransportTests.java | 2 +- .../test/old_cluster/10_basic.yml | 41 +++++++++++ .../test/upgraded_cluster/10_basic.yml | 39 +++++++++++ .../rest-api-spec/api/tasks.list.json | 2 +- .../test/tasks.list/10_basic.yml | 16 +++++ .../elasticsearch/action/ActionModule.java | 7 +- .../cancel/TransportCancelTasksAction.java | 2 +- .../node/tasks/list/ListTasksResponse.java | 15 ++++ .../action/index/IndexRequest.java | 2 +- .../action/search/SearchRequest.java | 5 +- .../action/search/SearchScrollRequest.java | 5 +- .../action/search/SearchTask.java | 6 +- .../replication/ReplicationRequest.java | 5 +- .../support/replication/ReplicationTask.java | 5 +- .../TransportReplicationAction.java | 5 +- .../client/transport/TransportClient.java | 2 +- .../reindex/AbstractBulkByScrollRequest.java | 5 +- .../index/reindex/BulkByScrollTask.java | 6 +- .../index/shard/PrimaryReplicaSyncer.java | 9 +-- .../java/org/elasticsearch/node/Node.java | 11 ++- .../elasticsearch/plugins/ActionPlugin.java | 7 ++ .../admin/cluster/RestListTasksAction.java | 13 +++- .../search/fetch/ShardFetchRequest.java | 5 +- .../internal/InternalScrollSearchRequest.java | 5 +- .../internal/ShardSearchTransportRequest.java | 5 +- .../search/query/QuerySearchRequest.java | 5 +- .../elasticsearch/tasks/CancellableTask.java | 5 +- .../java/org/elasticsearch/tasks/Task.java | 21 ++++-- .../elasticsearch/tasks/TaskAwareRequest.java | 6 +- .../org/elasticsearch/tasks/TaskInfo.java | 43 ++++++++++-- .../org/elasticsearch/tasks/TaskManager.java | 34 +++++++++- .../tasks/TaskResultsService.java | 19 +++++- .../transport/TransportService.java | 10 +-- .../tasks/task-index-mapping.json | 7 ++ .../node/tasks/CancellableTasksTests.java | 9 +-- .../node/tasks/TaskManagerTestCase.java | 15 ++-- .../admin/cluster/node/tasks/TaskTests.java | 5 +- .../admin/cluster/node/tasks/TasksIT.java | 61 ++++++++++++----- .../cluster/node/tasks/TestTaskPlugin.java | 19 ++++-- .../node/tasks/TransportTasksActionTests.java | 8 +-- .../action/bulk/TransportBulkActionTests.java | 2 +- .../bulk/TransportBulkActionTookTests.java | 3 +- .../action/main/MainActionTests.java | 3 +- .../action/search/MockSearchPhaseContext.java | 2 +- .../search/MultiSearchActionTookTests.java | 3 +- .../TransportMultiSearchActionTests.java | 29 ++++++-- .../TransportActionFilterChainTests.java | 18 ++++- .../TransportBroadcastByNodeActionTests.java | 3 +- .../TransportMasterNodeActionTests.java | 2 +- .../nodes/TransportNodesActionTests.java | 2 +- .../BroadcastReplicationTests.java | 2 +- .../TransportReplicationActionTests.java | 6 +- .../TransportWriteActionTests.java | 6 +- ...ortInstanceSingleOperationActionTests.java | 4 +- .../client/node/NodeClientHeadersTests.java | 2 +- .../TransportClientNodesServiceTests.java | 2 +- .../cluster/NodeConnectionsServiceTests.java | 3 +- .../action/shard/ShardStateActionTests.java | 3 +- .../health/ClusterStateHealthTests.java | 3 +- .../discovery/ZenFaultDetectionTests.java | 2 +- .../discovery/zen/UnicastZenPingTests.java | 18 +++-- .../LeaderBulkByScrollTaskStateTests.java | 3 +- .../WorkerBulkByScrollTaskStateTests.java | 3 +- .../ESIndexLevelReplicationTestCase.java | 3 +- .../GlobalCheckpointSyncActionTests.java | 2 +- .../shard/PrimaryReplicaSyncerTests.java | 11 +-- .../indices/cluster/ClusterStateChanges.java | 3 +- ...ClusterStateServiceRandomUpdatesTests.java | 3 +- .../search/SearchServiceTests.java | 4 +- .../search/query/QueryPhaseTests.java | 17 ++--- .../tasks/ListTasksResponseTests.java | 7 +- .../elasticsearch/tasks/TaskResultTests.java | 4 +- .../TransportServiceHandshakeTests.java | 2 +- ...stractAsyncBulkByScrollActionTestCase.java | 4 +- .../java/org/elasticsearch/node/MockNode.java | 7 +- .../test/tasks/MockTaskManager.java | 6 +- .../test/transport/MockTransportService.java | 19 +++--- .../AbstractSimpleTransportTestCase.java | 8 ++- .../transport/MockTcpTransportTests.java | 2 +- .../nio/SimpleMockNioTransportTests.java | 2 +- 84 files changed, 627 insertions(+), 180 deletions(-) diff --git a/docs/reference/cluster/tasks.asciidoc b/docs/reference/cluster/tasks.asciidoc index ed73290883d23..b3457953f46e5 100644 --- a/docs/reference/cluster/tasks.asciidoc +++ b/docs/reference/cluster/tasks.asciidoc @@ -195,3 +195,71 @@ The following command will change the grouping to parent tasks: GET _tasks?group_by=parents -------------------------------------------------- // CONSOLE + +The grouping can be disabled by specifying `none` as a `group_by` parameter: + +[source,js] +-------------------------------------------------- +GET _tasks?group_by=none +-------------------------------------------------- +// CONSOLE + +[float] +=== Identifying running tasks + +The `X-Opaque-Id` header, when provided on the HTTP request header, is going to be returned as a header in the response as well as +in the `headers` field for in the task information. This allows to track certain calls, or associate certain tasks with +a the client that started them: + +[source,sh] +-------------------------------------------------- +curl -i -H "X-Opaque-Id: 123456" "http://localhost:9200/_tasks?group_by=parents" +-------------------------------------------------- +// NOTCONSOLE + +The result will look similar to the following: + +[source,js] +-------------------------------------------------- +HTTP/1.1 200 OK +X-Opaque-Id: 123456 <1> +content-type: application/json; charset=UTF-8 +content-length: 831 + +{ + "tasks" : { + "u5lcZHqcQhu-rUoFaqDphA:45" : { + "node" : "u5lcZHqcQhu-rUoFaqDphA", + "id" : 45, + "type" : "transport", + "action" : "cluster:monitor/tasks/lists", + "start_time_in_millis" : 1513823752749, + "running_time_in_nanos" : 293139, + "cancellable" : false, + "headers" : { + "X-Opaque-Id" : "123456" <2> + }, + "children" : [ + { + "node" : "u5lcZHqcQhu-rUoFaqDphA", + "id" : 46, + "type" : "direct", + "action" : "cluster:monitor/tasks/lists[n]", + "start_time_in_millis" : 1513823752750, + "running_time_in_nanos" : 92133, + "cancellable" : false, + "parent_task_id" : "u5lcZHqcQhu-rUoFaqDphA:45", + "headers" : { + "X-Opaque-Id" : "123456" <3> + } + } + ] + } + } +} +-------------------------------------------------- +// NOTCONSOLE + +<1> id as a part of the response header +<2> id for the tasks that was initiated by the REST request +<3> the child task of the task initiated by the REST request diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index a13bdea0ef2f4..db259de411165 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -81,6 +81,7 @@ import org.junit.Before; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Iterator; @@ -123,6 +124,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { private SearchRequest firstSearchRequest; private PlainActionFuture listener; private String scrollId; + private ThreadPool threadPool; private TaskManager taskManager; private BulkByScrollTask testTask; private WorkerBulkByScrollTaskState worker; @@ -141,7 +143,8 @@ public void setupForTest() { testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest); listener = new PlainActionFuture<>(); scrollId = null; - taskManager = new TaskManager(Settings.EMPTY); + threadPool = new TestThreadPool(getClass().getName()); + taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest); testTask.setWorker(testRequest.getRequestsPerSecond(), null); worker = testTask.getWorkerState(); @@ -159,8 +162,9 @@ private void setupClient(ThreadPool threadPool) { } @After - public void tearDownAndVerifyCommonStuff() { + public void tearDownAndVerifyCommonStuff() throws Exception { client.close(); + terminate(threadPool); } /** diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java index 62a2c34ea582c..3c2f5194fceda 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java @@ -32,6 +32,7 @@ import org.mockito.ArgumentCaptor; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.Consumer; @@ -53,7 +54,7 @@ public class TransportRethrottleActionTests extends ESTestCase { @Before public void createTask() { slices = between(2, 50); - task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID); + task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap()); task.setWorkerCount(slices); } @@ -101,7 +102,8 @@ public void testRethrottleSuccessfulResponse() { List sliceStatuses = new ArrayList<>(slices); for (int i = 0; i < slices; i++) { BulkByScrollTask.Status status = believeableInProgressStatus(i); - tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()))); + tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()), + Collections.emptyMap())); sliceStatuses.add(new BulkByScrollTask.StatusOrException(status)); } rethrottleTestCase(slices, @@ -121,7 +123,8 @@ public void testRethrottleWithSomeSucceeded() { List tasks = new ArrayList<>(); for (int i = succeeded; i < slices; i++) { BulkByScrollTask.Status status = believeableInProgressStatus(i); - tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()))); + tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()), + Collections.emptyMap())); sliceStatuses.add(new BulkByScrollTask.StatusOrException(status)); } rethrottleTestCase(slices - succeeded, diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index b2126b1b61185..efa296b6278af 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -73,7 +73,7 @@ protected Version getCurrentVersion() { } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings); + MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index ee65eb5ccbd0d..c78ae25e44a06 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -77,7 +77,7 @@ protected Version getCurrentVersion() { } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings); + MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml index 29790e2146190..067eba6e4b860 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml @@ -212,3 +212,44 @@ field3: value - match: { hits.total: 1 } - match: { hits.hits.0._id: q3 } + +--- +"Create a task result record in the old cluster": + - do: + indices.create: + index: reindexed_index + body: + settings: + index: + number_of_replicas: 0 + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "reindexed_index", "_type": "doc"}}' + - '{"f1": "1"}' + - '{"index": {"_index": "reindexed_index", "_type": "doc"}}' + - '{"f1": "2"}' + - '{"index": {"_index": "reindexed_index", "_type": "doc"}}' + - '{"f1": "3"}' + - '{"index": {"_index": "reindexed_index", "_type": "doc"}}' + - '{"f1": "4"}' + - '{"index": {"_index": "reindexed_index", "_type": "doc"}}' + - '{"f1": "5"}' + + - do: + reindex: + wait_for_completion: false + body: + source: + index: reindexed_index + size: 1 + dest: + index: reindexed_index_copy + - match: {task: '/.+:\d+/'} + - set: {task: task} + + - do: + tasks.get: + wait_for_completion: true + task_id: $task diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml index af58a2f362c78..338b8728b6a82 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml @@ -126,3 +126,42 @@ field3: value - match: { hits.total: 1 } - match: { hits.hits.0._id: q3 } + +--- +"Find a task result record from the old cluster": + - do: + search: + index: .tasks + body: + query: + match_all: {} + - match: { hits.total: 1 } + - match: { hits.hits.0._id: '/.+:\d+/' } + - set: {hits.hits.0._id: task_id} + + - do: + tasks.get: + wait_for_completion: true + task_id: $task_id + + - is_false: node_failures + - is_true: task + + - do: + headers: { "X-Opaque-Id": "Reindexing Again" } + reindex: + wait_for_completion: false + body: + source: + index: reindexed_index_copy + size: 1 + dest: + index: reindexed_index_another_copy + - match: { task: '/.+:\d+/' } + - set: { task: task_id } + + - do: + tasks.get: + wait_for_completion: true + task_id: $task_id + - match: { task.headers.X-Opaque-Id: "Reindexing Again" } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json index fbe355ee164b0..1110c3c111b99 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json @@ -34,7 +34,7 @@ "group_by": { "type" : "enum", "description": "Group tasks by nodes or parent/child relationships", - "options" : ["nodes", "parents"], + "options" : ["nodes", "parents", "none"], "default" : "nodes" } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yml index dd1c415876fa7..57bf5b629b76a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yml @@ -17,3 +17,19 @@ group_by: parents - is_true: tasks + +--- +"tasks_list headers": + - skip: + version: " - 6.99.99" + reason: task headers has been added in 7.0.0 + + - do: + headers: { "X-Opaque-Id": "That is me" } + tasks.list: + actions: "cluster:monitor/tasks/lists" + group_by: none + + - is_true: tasks + - match: { tasks.0.headers.X-Opaque-Id: "That is me" } + diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 28fd3458b902a..872c217f98091 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -312,6 +312,7 @@ import org.elasticsearch.rest.action.search.RestMultiSearchAction; import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.rest.action.search.RestSearchScrollAction; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.usage.UsageService; @@ -324,6 +325,7 @@ import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.unmodifiableMap; @@ -362,7 +364,10 @@ public ActionModule(boolean transportClient, Settings settings, IndexNameExpress actionFilters = setupActionFilters(actionPlugins); autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver); destructiveOperations = new DestructiveOperations(settings, clusterSettings); - Set headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet()); + Set headers = Stream.concat( + actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()), + Stream.of("X-Opaque-Id") + ).collect(Collectors.toSet()); UnaryOperator restWrapper = null; for (ActionPlugin plugin : actionPlugins) { UnaryOperator newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java index aca1be7adff4c..0bd1ff2945bd7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java @@ -56,7 +56,7 @@ * Transport action that can be used to cancel currently running cancellable tasks. *

* For a task to be cancellable it has to return an instance of - * {@link CancellableTask} from {@link TransportRequest#createTask(long, String, String, TaskId)} + * {@link CancellableTask} from {@link TransportRequest#createTask} */ public class TransportCancelTasksAction extends TransportTasksAction { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java index de5fcf9345d23..88d8ff4679917 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java @@ -186,6 +186,21 @@ public XContentBuilder toXContentGroupedByParents(XContentBuilder builder, Param return builder; } + /** + * Presents a flat list of tasks + */ + public XContentBuilder toXContentGroupedByNone(XContentBuilder builder, Params params) throws IOException { + toXContentCommon(builder, params); + builder.startArray("tasks"); + for (TaskInfo taskInfo : getTasks()) { + builder.startObject(); + taskInfo.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + return builder; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 59a0b5e198087..46d51ee0b40e4 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -77,7 +77,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement /** * Max length of the source document to include into toString() * - * @see ReplicationRequest#createTask(long, java.lang.String, java.lang.String, org.elasticsearch.tasks.TaskId) + * @see ReplicationRequest#createTask */ static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048; diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 07d2229a5366d..2c699bf6d9b3a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.Map; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -428,9 +429,9 @@ public boolean isSuggestOnly() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { // generating description in a lazy way since source can be quite big - return new SearchTask(id, type, action, null, parentTaskId) { + return new SearchTask(id, type, action, null, parentTaskId, headers) { @Override public String getDescription() { StringBuilder sb = new StringBuilder(); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java index be83ef6d5839e..68f6a6afce091 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java @@ -32,6 +32,7 @@ import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -117,8 +118,8 @@ public void readFrom(StreamInput in) throws IOException { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new SearchTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java index d0a1cdd456f47..699448909a2b5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java @@ -22,13 +22,15 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; +import java.util.Map; + /** * Task storing information about a currently running search request. */ public class SearchTask extends CancellableTask { - public SearchTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + public SearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 07822bebaa9ac..81584a7bb6467 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -35,6 +35,7 @@ import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -207,8 +208,8 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new ReplicationTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ReplicationTask(id, type, action, getDescription(), parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java index 2e0baa057b223..1cf8b8bf0ff68 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java @@ -27,6 +27,7 @@ import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; import static java.util.Objects.requireNonNull; @@ -36,8 +37,8 @@ public class ReplicationTask extends Task { private volatile String phase = "starting"; - public ReplicationTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + public ReplicationTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index a63d14d7f9d12..1a57b6a5d9500 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -81,6 +81,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -1228,8 +1229,8 @@ public TaskId getParentTask() { return request.getParentTask(); } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return request.createTask(id, type, action, parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return request.createTask(id, type, action, parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 0d4a87f3bfa24..dc0f7b015632e 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -180,7 +180,7 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings final TransportService transportService = new TransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), boundTransportAddress -> DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 0), - UUIDs.randomBase64UUID()), null); + UUIDs.randomBase64UUID()), null, Collections.emptySet()); modules.add((b -> { b.bind(BigArrays.class).toInstance(bigArrays); b.bind(PluginsService.class).toInstance(pluginsService); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 9d2c221fcb24d..fc4163ddd19b2 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Map; import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -408,8 +409,8 @@ protected Self doForSlice(Self request, TaskId slicingTask, int totalSlices) { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index d5e656489558c..276484b055253 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.CancellableTask; @@ -38,6 +37,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import static java.lang.Math.min; @@ -62,8 +62,8 @@ public class BulkByScrollTask extends CancellableTask { private volatile LeaderBulkByScrollTaskState leaderState; private volatile WorkerBulkByScrollTaskState workerState; - public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index b1bd1c5b3138e..1e31eae7d417f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -271,8 +272,8 @@ public ResyncRequest(ShardId shardId, String allocationId) { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new ResyncTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ResyncTask(id, type, action, getDescription(), parentTaskId, headers); } @Override @@ -297,8 +298,8 @@ public static class ResyncTask extends Task { private volatile int resyncedOperations; private volatile int skippedOperations; - public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 21ce12c59f8fb..62fc271f99084 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -156,6 +156,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -424,8 +425,12 @@ protected Node(final Environment environment, Collection metaDataIndexUpgradeService, metaDataUpgrader); new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); + Set taskHeaders = Stream.concat( + pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), + Stream.of("X-Opaque-Id") + ).collect(Collectors.toSet()); final TransportService transportService = newTransportService(settings, transport, threadPool, - networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); + networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService); final SearchTransportService searchTransportService = new SearchTransportService(settings, transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); @@ -543,8 +548,8 @@ static void warnIfPreRelease(final Version version, final boolean isSnapshot, fi protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, - ClusterSettings clusterSettings) { - return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings); + ClusterSettings clusterSettings, Set taskHeaders) { + return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); } protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) { diff --git a/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java b/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java index 377da56f6018b..41f0ed86116ad 100644 --- a/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java @@ -84,6 +84,13 @@ default Collection getRestHeaders() { return Collections.emptyList(); } + /** + * Returns headers which should be copied from internal requests into tasks. + */ + default Collection getTaskHeaders() { + return Collections.emptyList(); + } + /** * Returns a function used to wrap each rest request before handling the request. * diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java index 6ef5d5a2de2bf..8e6447e0e4980 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java @@ -103,10 +103,21 @@ public RestResponse buildResponse(T response, XContentBuilder builder) throws Ex return new BytesRestResponse(RestStatus.OK, builder); } }; + } else if ("none".equals(groupBy)) { + return new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(T response, XContentBuilder builder) throws Exception { + builder.startObject(); + response.toXContentGroupedByNone(builder, channel.request()); + builder.endObject(); + return new BytesRestResponse(RestStatus.OK, builder); + } + }; + } else if ("parents".equals(groupBy)) { return new RestToXContentListener<>(channel); } else { - throw new IllegalArgumentException("[group_by] must be one of [nodes] or [parents] but was [" + groupBy + "]"); + throw new IllegalArgumentException("[group_by] must be one of [nodes], [parents] or [none] but was [" + groupBy + "]"); } } diff --git a/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java b/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java index ac71b84d54f34..6957eea4758d9 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java @@ -31,6 +31,7 @@ import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Map; /** * Shard level fetch base request. Holds all the info needed to execute a fetch. @@ -115,8 +116,8 @@ public void readFrom(StreamInput in) throws IOException { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new SearchTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java index d1fba0f761526..c551205f6b5db 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java @@ -29,6 +29,7 @@ import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Map; public class InternalScrollSearchRequest extends TransportRequest { @@ -76,8 +77,8 @@ public void readFrom(StreamInput in) throws IOException { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new SearchTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java index 76a13b7b02f24..a0fecc9bf8b3d 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Map; /** * Shard level search request that represents an actual search sent from the coordinating node to the nodes holding @@ -177,8 +178,8 @@ public boolean isProfile() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new SearchTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java index c893ed93046f0..dbb14fda71783 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java @@ -32,6 +32,7 @@ import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Map; import static org.elasticsearch.search.dfs.AggregatedDfs.readAggregatedDfs; @@ -91,8 +92,8 @@ public void readFrom(StreamInput in) throws IOException { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new SearchTask(id, type, action, getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); } public String getDescription() { diff --git a/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java b/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java index 685e9bcf35251..1d43076305ccd 100644 --- a/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java +++ b/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.Nullable; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; /** @@ -30,8 +31,8 @@ public abstract class CancellableTask extends Task { private final AtomicReference reason = new AtomicReference<>(); - public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/tasks/Task.java b/server/src/main/java/org/elasticsearch/tasks/Task.java index e59970b84ee47..9fd9019cd213c 100644 --- a/server/src/main/java/org/elasticsearch/tasks/Task.java +++ b/server/src/main/java/org/elasticsearch/tasks/Task.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import java.io.IOException; +import java.util.Map; /** * Current task information @@ -43,6 +44,8 @@ public class Task { private final TaskId parentTask; + private final Map headers; + /** * The task's start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style). */ @@ -53,11 +56,12 @@ public class Task { */ private final long startTimeNanos; - public Task(long id, String type, String action, String description, TaskId parentTask) { - this(id, type, action, description, parentTask, System.currentTimeMillis(), System.nanoTime()); + public Task(long id, String type, String action, String description, TaskId parentTask, Map headers) { + this(id, type, action, description, parentTask, System.currentTimeMillis(), System.nanoTime(), headers); } - public Task(long id, String type, String action, String description, TaskId parentTask, long startTime, long startTimeNanos) { + public Task(long id, String type, String action, String description, TaskId parentTask, long startTime, long startTimeNanos, + Map headers) { this.id = id; this.type = type; this.action = action; @@ -65,6 +69,7 @@ public Task(long id, String type, String action, String description, TaskId pare this.parentTask = parentTask; this.startTime = startTime; this.startTimeNanos = startTimeNanos; + this.headers = headers; } /** @@ -92,7 +97,7 @@ public final TaskInfo taskInfo(String localNodeId, boolean detailed) { */ protected final TaskInfo taskInfo(String localNodeId, String description, Status status) { return new TaskInfo(new TaskId(localNodeId, getId()), getType(), getAction(), description, status, startTime, - System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask); + System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask, headers); } /** @@ -149,6 +154,14 @@ public Status getStatus() { public interface Status extends ToXContentObject, NamedWriteable {} + + /** + * Returns stored task header associated with the task + */ + public String getHeader(String header) { + return headers.get(header); + } + public TaskResult result(DiscoveryNode node, Exception error) throws IOException { return new TaskResult(taskInfo(node.getId(), true), error); } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java index a2364ac8e4047..86ba59ebcc804 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.tasks; +import java.util.Map; + /** * An interface for a request that can be used to register a task manager task */ @@ -47,8 +49,8 @@ default void setParentTask(String parentTaskNode, long parentTaskId) { * A request can override this method and return null to avoid being tracked by the task * manager. */ - default Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new Task(id, type, action, getDescription(), parentTaskId); + default Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new Task(id, type, action, getDescription(), parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java b/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java index d0fd66703e09e..2bd16a9addf6a 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java @@ -19,6 +19,7 @@ package org.elasticsearch.tasks; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -31,6 +32,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -65,8 +68,10 @@ public final class TaskInfo implements Writeable, ToXContentFragment { private final TaskId parentTaskId; + private final Map headers; + public TaskInfo(TaskId taskId, String type, String action, String description, Task.Status status, long startTime, - long runningTimeNanos, boolean cancellable, TaskId parentTaskId) { + long runningTimeNanos, boolean cancellable, TaskId parentTaskId, Map headers) { this.taskId = taskId; this.type = type; this.action = action; @@ -76,6 +81,7 @@ public TaskInfo(TaskId taskId, String type, String action, String description, T this.runningTimeNanos = runningTimeNanos; this.cancellable = cancellable; this.parentTaskId = parentTaskId; + this.headers = headers; } /** @@ -91,6 +97,11 @@ public TaskInfo(StreamInput in) throws IOException { runningTimeNanos = in.readLong(); cancellable = in.readBoolean(); parentTaskId = TaskId.readFromStream(in); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + headers = in.readMap(StreamInput::readString, StreamInput::readString); + } else { + headers = Collections.emptyMap(); + } } @Override @@ -104,6 +115,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(runningTimeNanos); out.writeBoolean(cancellable); parentTaskId.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); + } } public TaskId getTaskId() { @@ -162,6 +176,13 @@ public TaskId getParentTaskId() { return parentTaskId; } + /** + * Returns the task headers + */ + public Map getHeaders() { + return headers; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("node", taskId.getNodeId()); @@ -180,6 +201,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (parentTaskId.isSet()) { builder.field("parent_task_id", parentTaskId.toString()); } + builder.startObject("headers"); + for(Map.Entry attribute : headers.entrySet()) { + builder.field(attribute.getKey(), attribute.getValue()); + } + builder.endObject(); return builder; } @@ -195,10 +221,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws long runningTimeNanos = (Long) a[i++]; boolean cancellable = (Boolean) a[i++]; String parentTaskIdString = (String) a[i++]; - + @SuppressWarnings("unchecked") Map headers = (Map) a[i++]; + if (headers == null) { + // This might happen if we are reading an old version of task info + headers = Collections.emptyMap(); + } RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes); TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString); - return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId); + return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, + headers); }); static { // Note for the future: this has to be backwards and forwards compatible with all changes to the task storage format @@ -212,6 +243,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws PARSER.declareLong(constructorArg(), new ParseField("running_time_in_nanos")); PARSER.declareBoolean(constructorArg(), new ParseField("cancellable")); PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id")); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers")); } @Override @@ -234,11 +266,12 @@ public boolean equals(Object obj) { && Objects.equals(runningTimeNanos, other.runningTimeNanos) && Objects.equals(parentTaskId, other.parentTaskId) && Objects.equals(cancellable, other.cancellable) - && Objects.equals(status, other.status); + && Objects.equals(status, other.status) + && Objects.equals(headers, other.headers); } @Override public int hashCode() { - return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status); + return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status, headers); } } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index afeeeeedd1168..16212e066bbff 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -32,19 +32,26 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; /** * Task Manager service for keeping track of currently running tasks on the nodes @@ -52,6 +59,10 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplier { private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100); + /** Rest headers that are copied to the task */ + private final List taskHeaders; + private final ThreadPool threadPool; + private final ConcurrentMapLong tasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); private final ConcurrentMapLong cancellableTasks = ConcurrentCollections @@ -65,8 +76,13 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES; - public TaskManager(Settings settings) { + private final ByteSizeValue maxHeaderSize; + + public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { super(settings); + this.threadPool = threadPool; + this.taskHeaders = new ArrayList<>(taskHeaders); + this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); } public void setTaskResultsService(TaskResultsService taskResultsService) { @@ -80,7 +96,21 @@ public void setTaskResultsService(TaskResultsService taskResultsService) { * Returns the task manager tracked task or null if the task doesn't support the task manager */ public Task register(String type, String action, TaskAwareRequest request) { - Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask()); + Map headers = new HashMap<>(); + long headerSize = 0; + long maxSize = maxHeaderSize.getBytes(); + ThreadContext threadContext = threadPool.getThreadContext(); + for (String key : taskHeaders) { + String httpHeader = threadContext.getHeader(key); + if (httpHeader != null) { + headerSize += key.length() * 2 + httpHeader.length() * 2; + if (headerSize > maxSize) { + throw new IllegalArgumentException("Request exceeded the maximum size of task headers " + maxHeaderSize); + } + headers.put(key, httpHeader); + } + } + Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers); if (task == null) { return null; } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index 69549c611f1e6..f661095d6bd47 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -34,6 +34,7 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -48,6 +49,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Map; /** * Service that can store task results. @@ -60,6 +62,10 @@ public class TaskResultsService extends AbstractComponent { public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json"; + public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version"; + + public static final int TASK_RESULT_MAPPING_VERSION = 2; + private final Client client; private final ClusterService clusterService; @@ -109,7 +115,7 @@ public void onFailure(Exception e) { }); } else { IndexMetaData metaData = state.getMetaData().index(TASK_INDEX); - if (metaData.getMappings().containsKey(TASK_TYPE) == false) { + if (getTaskResultMappingVersion(metaData) < TASK_RESULT_MAPPING_VERSION) { // The index already exists but doesn't have our mapping client.admin().indices().preparePutMapping(TASK_INDEX).setType(TASK_TYPE) .setSource(taskResultIndexMapping(), XContentType.JSON) @@ -131,6 +137,17 @@ public void onFailure(Exception e) { } } + private int getTaskResultMappingVersion(IndexMetaData metaData) { + MappingMetaData mappingMetaData = metaData.getMappings().get(TASK_TYPE); + if (mappingMetaData == null) { + return 0; + } + @SuppressWarnings("unchecked") Map meta = (Map) mappingMetaData.sourceAsMap().get("_meta"); + if (meta == null || meta.containsKey(TASK_RESULT_MAPPING_VERSION_META_FIELD) == false) { + return 1; // The mapping was created before meta field was introduced + } + return (int) meta.get(TASK_RESULT_MAPPING_VERSION_META_FIELD); + } private void doStoreResult(TaskResult taskResult, ActionListener listener) { IndexRequestBuilder index = client.prepareIndex(TASK_INDEX, TASK_TYPE, taskResult.getTask().getTaskId().toString()); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index a59ffcaa872d2..7687844231ccd 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -61,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -149,7 +150,8 @@ public void close() throws IOException { * updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}. */ public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, - Function localNodeFactory, @Nullable ClusterSettings clusterSettings) { + Function localNodeFactory, @Nullable ClusterSettings clusterSettings, + Set taskHeaders) { super(settings); this.transport = transport; this.threadPool = threadPool; @@ -158,7 +160,7 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa setTracerLogInclude(TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); - taskManager = createTaskManager(); + taskManager = createTaskManager(settings, threadPool, taskHeaders); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings); @@ -184,8 +186,8 @@ public TaskManager getTaskManager() { return taskManager; } - protected TaskManager createTaskManager() { - return new TaskManager(settings); + protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + return new TaskManager(settings, threadPool, taskHeaders); } /** diff --git a/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json b/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json index 0f1a32e1bef81..435e6c5759cbb 100644 --- a/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json +++ b/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json @@ -1,5 +1,8 @@ { "task" : { + "_meta": { + "version": 2 + }, "dynamic" : "strict", "properties" : { "completed": { @@ -37,6 +40,10 @@ }, "description": { "type": "text" + }, + "headers": { + "type" : "object", + "enabled" : false } } }, diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index c28fddf68ad72..6b2e2040bca80 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -45,6 +45,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -91,8 +92,8 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) { @Override public boolean shouldCancelChildrenOnCancellation() { return false; @@ -131,8 +132,8 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) { @Override public boolean shouldCancelChildrenOnCancellation() { return true; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 8927fed567ed9..62313d01b95c3 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -56,9 +56,11 @@ import org.junit.BeforeClass; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -175,15 +177,16 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) { }; transportService = new TransportService(settings, new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), - new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), - new NetworkService(Collections.emptyList())), - threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null) { + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), + new NetworkService(Collections.emptyList())), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null, + Collections.emptySet()) { @Override - protected TaskManager createTaskManager() { + protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { - return new MockTaskManager(settings); + return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(); + return super.createTaskManager(settings, threadPool, taskHeaders); } } }; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java index c5d8b39c3da39..8628a8ee2c391 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.test.ESTestCase; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Map; public class TaskTests extends ESTestCase { @@ -36,7 +37,8 @@ public void testTaskInfoToString() { long runningTime = randomNonNegativeLong(); boolean cancellable = randomBoolean(); TaskInfo taskInfo = new TaskInfo(new TaskId(nodeId, taskId), "test_type", - "test_action", "test_description", null, startTime, runningTime, cancellable, TaskId.EMPTY_TASK_ID); + "test_action", "test_description", null, startTime, runningTime, cancellable, TaskId.EMPTY_TASK_ID, + Collections.singletonMap("foo", "bar")); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); assertEquals(((Number)map.get("id")).longValue(), taskId); @@ -46,6 +48,7 @@ public void testTaskInfoToString() { assertEquals(((Number)map.get("start_time_in_millis")).longValue(), startTime); assertEquals(((Number)map.get("running_time_in_nanos")).longValue(), runningTime); assertEquals(map.get("cancellable"), cancellable); + assertEquals(map.get("headers"), Collections.singletonMap("foo", "bar")); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 778930e7d05ac..b04205ed01813 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -84,6 +84,7 @@ import static java.util.Collections.singleton; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; @@ -355,19 +356,26 @@ public void testSearchTaskDescriptions() { client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}", XContentType.JSON) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - assertSearchResponse(client().prepareSearch("test").setTypes("doc").setQuery(QueryBuilders.matchAllQuery()).get()); + Map headers = new HashMap<>(); + headers.put("X-Opaque-Id", "my_id"); + headers.put("Foo-Header", "bar"); + headers.put("Custom-Task-Header", "my_value"); + assertSearchResponse( + client().filterWithHeader(headers).prepareSearch("test").setTypes("doc").setQuery(QueryBuilders.matchAllQuery()).get()); // the search operation should produce one main task List mainTask = findEvents(SearchAction.NAME, Tuple::v1); assertEquals(1, mainTask.size()); assertThat(mainTask.get(0).getDescription(), startsWith("indices[test], types[doc], search_type[")); assertThat(mainTask.get(0).getDescription(), containsString("\"query\":{\"match_all\"")); + assertTaskHeaders(mainTask.get(0)); // check that if we have any shard-level requests they all have non-zero length description List shardTasks = findEvents(SearchAction.NAME + "[*]", Tuple::v1); for (TaskInfo taskInfo : shardTasks) { assertThat(taskInfo.getParentTaskId(), notNullValue()); assertEquals(mainTask.get(0).getTaskId(), taskInfo.getParentTaskId()); + assertTaskHeaders(taskInfo); switch (taskInfo.getAction()) { case SearchTransportService.QUERY_ACTION_NAME: case SearchTransportService.DFS_ACTION_NAME: @@ -392,6 +400,25 @@ public void testSearchTaskDescriptions() { } + public void testSearchTaskHeaderLimit() { + int maxSize = Math.toIntExact(SETTING_HTTP_MAX_HEADER_SIZE.getDefault(Settings.EMPTY).getBytes() / 2 + 1); + + Map headers = new HashMap<>(); + headers.put("X-Opaque-Id", "my_id"); + headers.put("Custom-Task-Header", randomAlphaOfLengthBetween(maxSize, maxSize + 100)); + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> client().filterWithHeader(headers).admin().cluster().prepareListTasks().get() + ); + assertThat(ex.getMessage(), startsWith("Request exceeded the maximum size of task headers ")); + } + + private void assertTaskHeaders(TaskInfo taskInfo) { + assertThat(taskInfo.getHeaders().keySet(), hasSize(2)); + assertEquals("my_id", taskInfo.getHeaders().get("X-Opaque-Id")); + assertEquals("my_value", taskInfo.getHeaders().get("Custom-Task-Header")); + } + /** * Very basic "is it plugged in" style test that indexes a document and makes sure that you can fetch the status of the process. The * goal here is to verify that the large moving parts that make fetching task status work fit together rather than to verify any @@ -802,24 +829,24 @@ public void testNodeNotFoundButTaskFound() throws Exception { // Save a fake task that looks like it is from a node that isn't part of the cluster CyclicBarrier b = new CyclicBarrier(2); TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class); - resultsService.storeResult( - new TaskResult(new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID), - new RuntimeException("test")), - new ActionListener() { - @Override - public void onResponse(Void response) { - try { - b.await(); - } catch (InterruptedException | BrokenBarrierException e) { - onFailure(e); - } + resultsService.storeResult(new TaskResult( + new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()), + new RuntimeException("test")), + new ActionListener() { + @Override + public void onResponse(Void response) { + try { + b.await(); + } catch (InterruptedException | BrokenBarrierException e) { + onFailure(e); } + } - @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); - } - }); + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }); b.await(); // Now we can find it! diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 88674bfec74d8..5bf000a17bac7 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -59,9 +59,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import static org.elasticsearch.test.ESTestCase.awaitBusy; @@ -76,12 +78,17 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(UnblockTestTasksAction.INSTANCE, TransportUnblockTestTasksAction.class)); } + @Override + public Collection getTaskHeaders() { + return Collections.singleton("Custom-Task-Header"); + } + static class TestTask extends CancellableTask { private volatile boolean blocked = true; - TestTask(long id, String type, String action, String description, TaskId parentTaskId) { - super(id, type, action, description, parentTaskId); + TestTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); } @Override @@ -178,8 +185,8 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new TestTask(id, type, action, this.getDescription(), parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new TestTask(id, type, action, this.getDescription(), parentTaskId, headers); } } @@ -247,8 +254,8 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers) { @Override public boolean shouldCancelChildrenOnCancellation() { return true; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index d0d5be5b4178d..2fb23b26709bd 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -109,9 +109,9 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { if (enableTaskManager) { - return super.createTask(id, type, action, parentTaskId); + return super.createTask(id, type, action, parentTaskId, headers); } else { return null; } @@ -156,9 +156,9 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { if (enableTaskManager) { - return super.createTask(id, type, action, parentTaskId); + return super.createTask(id, type, action, parentTaskId, headers); } else { return null; } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 5141b9cd47187..3bd66af1bab05 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -82,7 +82,7 @@ public void setUp() throws Exception { CapturingTransport capturingTransport = new CapturingTransport(); transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> clusterService.localNode(), null); + boundAddress -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); bulkAction = new TestTransportBulkAction(); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index e35f98e220e03..af8289f0c45b1 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -49,6 +49,7 @@ import org.junit.BeforeClass; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -92,7 +93,7 @@ private TransportBulkAction createAction(boolean controlled, AtomicLong expected CapturingTransport capturingTransport = new CapturingTransport(); TransportService transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> clusterService.localNode(), null); + boundAddress -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY); diff --git a/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java b/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java index 6cc0afa3fadf2..34f9bc15ecfa6 100644 --- a/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.Collections; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Mockito.mock; @@ -68,7 +69,7 @@ public void testMainActionClusterAvailable() { when(clusterService.state()).thenReturn(state); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - x -> null, null); + x -> null, null, Collections.emptySet()); TransportMainAction action = new TransportMainAction(settings, mock(ThreadPool.class), transportService, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), clusterService); AtomicReference responseRef = new AtomicReference<>(); diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 80fbd4cc43ddf..b0ac2ed5fa0d3 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -73,7 +73,7 @@ public Logger getLogger() { @Override public SearchTask getTask() { - return new SearchTask(0, "n/a", "n/a", "test", null); + return new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap()); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index 73743230d1a14..86ad2f75fedf2 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -130,7 +130,8 @@ private TransportMultiSearchAction createTransportMultiSearchAction(boolean cont Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); TaskManager taskManager = mock(TaskManager.class); TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) { + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()) { @Override public TaskManager getTaskManager() { return taskManager; diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index 4410507eef92e..3c2a87e8fef00 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -35,8 +35,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; import java.util.Arrays; import java.util.Collections; @@ -54,6 +57,22 @@ public class TransportMultiSearchActionTests extends ESTestCase { + protected ThreadPool threadPool; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getTestName()); + } + + @After + @Override + public void tearDown() throws Exception { + threadPool.shutdown(); + super.tearDown(); + } + public void testBatchExecute() throws Exception { // Initialize dependencies of TransportMultiSearchAction Settings settings = Settings.builder() @@ -63,8 +82,10 @@ public void testBatchExecute() throws Exception { when(actionFilters.filters()).thenReturn(new ActionFilter[0]); ThreadPool threadPool = new ThreadPool(settings); TaskManager taskManager = mock(TaskManager.class); - TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) { + TransportService transportService = new TransportService(Settings.EMPTY, null, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()) { @Override public TaskManager getTaskManager() { return taskManager; @@ -102,8 +123,8 @@ protected void doExecute(SearchRequest request, ActionListener l }); } }; - - TransportMultiSearchAction action = + + TransportMultiSearchAction action = new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10, System::nanoTime); diff --git a/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java b/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java index 3eb1616348d84..d576d440c0263 100644 --- a/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java @@ -25,12 +25,16 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.Node; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; import org.junit.Before; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -48,10 +52,17 @@ public class TransportActionFilterChainTests extends ESTestCase { private AtomicInteger counter; + private ThreadPool threadPool; @Before public void init() throws Exception { counter = new AtomicInteger(); + threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "TransportActionFilterChainTests").build()); + } + + @After + public void shutdown() throws Exception { + terminate(threadPool); } public void testActionFiltersRequest() throws ExecutionException, InterruptedException { @@ -68,7 +79,9 @@ public void testActionFiltersRequest() throws ExecutionException, InterruptedExc String actionName = randomAlphaOfLength(randomInt(30)); ActionFilters actionFilters = new ActionFilters(filters); - TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) { + TransportAction transportAction = + new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())) { @Override protected void doExecute(TestRequest request, ActionListener listener) { listener.onResponse(new TestResponse()); @@ -144,7 +157,8 @@ public void exe String actionName = randomAlphaOfLength(randomInt(30)); ActionFilters actionFilters = new ActionFilters(filters); - TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) { + TransportAction transportAction = new TransportAction(Settings.EMPTY, + actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())) { @Override protected void doExecute(TestRequest request, ActionListener listener) { listener.onResponse(new TestResponse()); diff --git a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index 54253e9620745..470da323043ae 100644 --- a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -67,6 +67,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -191,7 +192,7 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); final TransportService transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); setClusterState(clusterService, TEST_INDEX); diff --git a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index b14b030a5dc88..de65d2a3f9240 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -88,7 +88,7 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(threadPool); transportService = new TransportService(clusterService.getSettings(), transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); localNode = new DiscoveryNode("local_node", buildNewFakeTransportAddress(), Collections.emptyMap(), diff --git a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 7d471f77f83d0..60a46876a7126 100644 --- a/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -181,7 +181,7 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); int numNodes = randomIntBetween(3, 10); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index 9f1591f6a540b..3aeab0fa5fb5b 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -96,7 +96,7 @@ threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService, new NamedWr new NetworkService(Collections.emptyList())); clusterService = createClusterService(threadPool); transportService = new TransportService(clusterService.getSettings(), transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); broadcastReplicationAction = new TestBroadcastReplicationAction(Settings.EMPTY, threadPool, clusterService, transportService, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index d2472da34f56c..9356fd12a3a5b 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -163,7 +163,7 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(threadPool); transportService = new TransportService(clusterService.getSettings(), transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); @@ -977,7 +977,7 @@ public void testRetryOnReplicaWithRealTransport() throws Exception { new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), Version.CURRENT); transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - x -> clusterService.localNode(),null); + x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); @@ -1040,7 +1040,7 @@ private void assertIndexShardCounter(int expected) { * half the time. */ private ReplicationTask maybeTask() { - return random().nextBoolean() ? new ReplicationTask(0, null, null, null, null) : null; + return random().nextBoolean() ? new ReplicationTask(0, null, null, null, null, null) : null; } /** diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index b3db10f920973..47ce090d895fa 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -62,6 +62,7 @@ import org.junit.BeforeClass; import org.mockito.ArgumentCaptor; +import java.util.Collections; import java.util.HashSet; import java.util.Locale; import java.util.concurrent.ExecutionException; @@ -254,7 +255,7 @@ public void testDocumentFailureInShardOperationOnReplica() throws Exception { public void testReplicaProxy() throws InterruptedException, ExecutionException { CapturingTransport transport = new CapturingTransport(); TransportService transportService = new TransportService(clusterService.getSettings(), transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); ShardStateAction shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); @@ -355,7 +356,8 @@ protected TestAction() { protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) { super(Settings.EMPTY, "test", - new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null), null, + new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()), null, null, null, null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; diff --git a/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java index 29235329d6669..8db45cc5508ef 100644 --- a/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java @@ -53,6 +53,7 @@ import org.junit.Before; import org.junit.BeforeClass; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -144,7 +145,8 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet() + ); transportService.start(); transportService.acceptIncomingRequests(); action = new TestTransportInstanceSingleOperationAction( diff --git a/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java b/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java index 160c14c243cb2..bca04738d8b89 100644 --- a/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java +++ b/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java @@ -59,7 +59,7 @@ private Actions(Settings settings, ThreadPool threadPool, GenericAction[] action private static class InternalTransportAction extends TransportAction { private InternalTransportAction(Settings settings, String actionName, ThreadPool threadPool) { - super(settings, actionName, threadPool, EMPTY_FILTERS, null, new TaskManager(settings)); + super(settings, actionName, threadPool, EMPTY_FILTERS, null, new TaskManager(settings, threadPool, Collections.emptySet())); } @Override diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index ad894906cfb2d..c8030e1cf4aee 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -163,7 +163,7 @@ public void sendRequest(Transport.Connection conne }, (addr) -> { assert addr == null : "boundAddress: " + addr; return DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()); - }, null); + }, null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); transportClientNodesService = diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 51908a45380f0..828b385f85fa5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -155,7 +155,8 @@ public void setUp() throws Exception { this.threadPool = new TestThreadPool(getClass().getName()); this.transport = new MockTransport(); transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null); + boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 58c9651a82af1..b1ff626fa39b8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -48,6 +48,7 @@ import org.junit.Before; import org.junit.BeforeClass; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -108,7 +109,7 @@ public void setUp() throws Exception { this.transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null); diff --git a/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java b/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java index f2537e746ad0c..0f5f4870ae1bb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java @@ -60,6 +60,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -94,7 +95,7 @@ public void setUp() throws Exception { super.setUp(); clusterService = createClusterService(threadPool); transportService = new TransportService(clusterService.getSettings(), new CapturingTransport(), threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); } diff --git a/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index 1a837b825d867..f32e93bb82dbd 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -145,7 +145,7 @@ namedWriteableRegistry, new NetworkService(Collections.emptyList()), version), (boundAddress) -> new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), boundAddress.publishAddress(), Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version), - null); + null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); return transportService; diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index e0593a694d0b4..44914b1958777 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -402,7 +402,8 @@ public BoundTransportAddress boundAddress() { }; closeables.push(transport); final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null); + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); closeables.push(transportService); final int limitPortCounts = randomIntBetween(1, 10); final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( @@ -447,7 +448,8 @@ public BoundTransportAddress boundAddress() { }; closeables.push(transport); final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null); + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); closeables.push(transportService); final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( executorService, @@ -497,7 +499,8 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi closeables.push(transport); final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null); + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); closeables.push(transportService); final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( @@ -555,7 +558,8 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi closeables.push(transport); final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null); + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); closeables.push(transportService); final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 3)); try { @@ -723,7 +727,8 @@ public BoundTransportAddress boundAddress() { closeables.push(transport); final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null); + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); closeables.push(transportService); final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( executorService, @@ -772,7 +777,8 @@ private NetworkHandle startServices( final Transport transport = supplier.apply(nodeSettings, version); final MockTransportService transportService = new MockTransportService(nodeSettings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> - new DiscoveryNode(nodeId, nodeId, boundAddress.publishAddress(), emptyMap(), nodeRoles, version), null); + new DiscoveryNode(nodeId, nodeId, boundAddress.publishAddress(), emptyMap(), nodeRoles, version), null, + Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); final ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); diff --git a/server/src/test/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskStateTests.java b/server/src/test/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskStateTests.java index 2b15181ca3930..16d9df8c820ee 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskStateTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskStateTests.java @@ -26,6 +26,7 @@ import org.mockito.ArgumentCaptor; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static java.util.Collections.emptyList; @@ -42,7 +43,7 @@ public class LeaderBulkByScrollTaskStateTests extends ESTestCase { @Before public void createTask() { slices = between(2, 50); - task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID); + task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap()); task.setWorkerCount(slices); taskState = task.getLeaderState(); } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java index 64bf52c319e68..db624798bb71c 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java @@ -28,6 +28,7 @@ import org.junit.Before; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CyclicBarrier; @@ -52,7 +53,7 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase { @Before public void createTask() { - task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID); + task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap()); task.setWorker(Float.POSITIVE_INFINITY, null); workerState = task.getWorkerState(); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 7e8949cd15fbf..e80c2df4ea060 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -130,7 +130,8 @@ protected class ReplicationGroup implements AutoCloseable, Iterable private final AtomicInteger replicaId = new AtomicInteger(); private final AtomicInteger docId = new AtomicInteger(); boolean closed = false; - private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY), + private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(Settings.EMPTY, + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), (request, parentTask, primaryAllocationId, primaryTerm, listener) -> { try { new ResyncAction(request, listener, ReplicationGroup.this).execute(); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index 71faecfcea59a..618714fc9d959 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -58,7 +58,7 @@ public void setUp() throws Exception { transport = new CapturingTransport(); clusterService = createClusterService(threadPool); transportService = new TransportService(clusterService.getSettings(), transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> clusterService.localNode(), null); + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> clusterService.localNode(), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 1b9a0ff629066..433f662062735 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -50,7 +50,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { public void testSyncerSendsOffCorrectDocuments() throws Exception { IndexShard shard = newStartedShard(true); - TaskManager taskManager = new TaskManager(Settings.EMPTY); + TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); AtomicBoolean syncActionCalled = new AtomicBoolean(); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { @@ -112,7 +112,8 @@ public void testSyncerOnClosingShard() throws Exception { syncCalledLatch.countDown(); threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); }; - PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY), syncAction); + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), syncAction); syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately int numDocs = 10; @@ -158,7 +159,8 @@ public void testStatusSerialization() throws IOException { } public void testStatusEquals() throws IOException { - PrimaryReplicaSyncer.ResyncTask task = new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null); + PrimaryReplicaSyncer.ResyncTask task = + new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null, Collections.emptyMap()); task.setPhase(randomAlphaOfLength(10)); task.setResyncedOperations(randomIntBetween(0, 1000)); task.setTotalOperations(randomIntBetween(0, 1000)); @@ -181,7 +183,8 @@ public void testStatusEquals() throws IOException { } public void testStatusReportsCorrectNumbers() throws IOException { - PrimaryReplicaSyncer.ResyncTask task = new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null); + PrimaryReplicaSyncer.ResyncTask task = + new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null, Collections.emptyMap()); task.setPhase(randomAlphaOfLength(10)); task.setResyncedOperations(randomIntBetween(0, 1000)); task.setTotalOperations(randomIntBetween(0, 1000)); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 6e6eaf726a599..dd10dd2747df6 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -159,7 +159,8 @@ public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool th // services TransportService transportService = new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), clusterSettings); + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), clusterSettings, + Collections.emptySet()); MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry, null, null, null) { // metaData upgrader should do nothing diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index bc5a5b95b958a..d76429c53f3a5 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -401,7 +401,8 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod final Settings settings = Settings.builder().put("node.name", discoveryNode.getName()).build(); final TransportService transportService = new TransportService(settings, null, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null); + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()); final ClusterService clusterService = mock(ClusterService.class); final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService, transportService, null); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 92f018f282a43..5ed708ecb7581 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -213,11 +213,11 @@ public void onFailure(Exception e) { SearchPhaseResult searchPhaseResult = service.executeQueryPhase( new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f), - new SearchTask(123L, "", "", "", null)); + new SearchTask(123L, "", "", "", null, Collections.emptyMap())); IntArrayList intCursors = new IntArrayList(1); intCursors.add(0); ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null /* not a scroll */); - service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null)); + service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap())); } catch (AlreadyClosedException ex) { throw ex; } catch (IllegalStateException ex) { diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index d651c92cd611e..06d738cfb6016 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -62,6 +62,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.hamcrest.Matchers.anyOf; @@ -94,7 +95,7 @@ private void countTestCase(Query query, IndexReader reader, boolean shouldCollec TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(query)); context.setSize(0); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); final IndexSearcher searcher = shouldCollect ? new IndexSearcher(reader) : getAssertingEarlyTerminationSearcher(reader, 0); @@ -166,7 +167,7 @@ public void testPostFilterDisablesCountOptimization() throws Exception { IndexReader reader = DirectoryReader.open(dir); IndexSearcher contextSearcher = getAssertingEarlyTerminationSearcher(reader, 0); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); QueryPhase.execute(context, contextSearcher, checkCancelled -> {}); @@ -195,7 +196,7 @@ public void testMinScoreDisablesCountOptimization() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(0); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); QueryPhase.execute(context, contextSearcher, checkCancelled -> {}); assertEquals(1, context.queryResult().topDocs().totalHits); @@ -209,7 +210,7 @@ public void testMinScoreDisablesCountOptimization() throws Exception { public void testQueryCapturesThreadPoolStats() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); Directory dir = newDirectory(); @@ -251,7 +252,7 @@ public void testInOrderScrollOptimization() throws Exception { scrollContext.maxScore = Float.NaN; scrollContext.totalHits = -1; context.scrollContext(scrollContext); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); int size = randomIntBetween(2, 5); context.setSize(size); @@ -290,7 +291,7 @@ public void testTerminateAfterEarlyTermination() throws Exception { } w.close(); TestSearchContext context = new TestSearchContext(null, indexShard); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.terminateAfter(1); @@ -384,7 +385,7 @@ public void testIndexSortingEarlyTermination() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(1); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); context.sort(new SortAndFormats(sort, new DocValueFormat[] {DocValueFormat.RAW})); final IndexReader reader = DirectoryReader.open(dir); @@ -471,7 +472,7 @@ public void testIndexSortScrollOptimization() throws Exception { scrollContext.maxScore = Float.NaN; scrollContext.totalHits = -1; context.scrollContext(scrollContext); - context.setTask(new SearchTask(123L, "", "", "", null)); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); context.setSize(10); context.sort(searchSortAndFormat); diff --git a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java index 6643a71b0962f..be0624d6bba83 100644 --- a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java @@ -22,6 +22,8 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.test.ESTestCase; +import java.util.Collections; + import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -33,10 +35,11 @@ public void testEmptyToString() { public void testNonEmptyToString() { TaskInfo info = new TaskInfo( - new TaskId("node1", 1), "dummy-type", "dummy-action", "dummy-description", null, 0, 1, true, new TaskId("node1", 0)); + new TaskId("node1", 1), "dummy-type", "dummy-action", "dummy-description", null, 0, 1, true, new TaskId("node1", 0), + Collections.singletonMap("foo", "bar")); ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList()); assertEquals("{\"tasks\":{\"node1:1\":{\"node\":\"node1\",\"id\":1,\"type\":\"dummy-type\",\"action\":\"dummy-action\"," + "\"description\":\"dummy-description\",\"start_time_in_millis\":0,\"running_time_in_nanos\":1,\"cancellable\":true," - + "\"parent_task_id\":\"node1:0\"}}}", tasksResponse.toString()); + + "\"parent_task_id\":\"node1:0\",\"headers\":{\"foo\":\"bar\"}}}}", tasksResponse.toString()); } } diff --git a/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java b/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java index e70c2b7119421..d4da4f8f1c5cb 100644 --- a/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/TaskResultTests.java @@ -134,7 +134,9 @@ private static TaskInfo randomTaskInfo() throws IOException { long runningTimeNanos = randomLong(); boolean cancellable = randomBoolean(); TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId(); - return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId); + Map headers = + randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); + return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers); } private static TaskId randomTaskId() { diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index c4fe88d2fce46..08d88ad2e0486 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -73,7 +73,7 @@ private NetworkHandle startServices(String nodeNameAndId, Settings settings, Ver boundAddress.publishAddress(), emptyMap(), emptySet(), - version), null); + version), null, Collections.emptySet()); transportService.start(); transportService.acceptIncomingRequests(); transportServices.add(transportService); diff --git a/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java index a0752b0048564..73cff7717b44d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java @@ -27,6 +27,8 @@ import org.junit.After; import org.junit.Before; +import java.util.Collections; + public abstract class AbstractAsyncBulkByScrollActionTestCase< Request extends AbstractBulkByScrollRequest, Response extends BulkByScrollResponse> @@ -37,7 +39,7 @@ public abstract class AbstractAsyncBulkByScrollActionTestCase< @Before public void setupForTest() { threadPool = new TestThreadPool(getTestName()); - task = new BulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID); + task = new BulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap()); task.setWorker(Float.POSITIVE_INFINITY, null); } diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 5a6075b1f8571..8d2f6e895504c 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -50,6 +50,7 @@ import java.nio.file.Path; import java.util.Collection; import java.util.Collections; +import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; @@ -115,15 +116,15 @@ protected SearchService newSearchService(ClusterService clusterService, IndicesS protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, - ClusterSettings clusterSettings) { + ClusterSettings clusterSettings, Set taskHeaders) { // we use the MockTransportService.TestPlugin class as a marker to create a network // module with this MockNetworkService. NetworkService is such an integral part of the systme // we don't allow to plug it in from plugins or anything. this is a test-only override and // can't be done in a production env. if (getPluginsService().filterPlugins(MockTransportService.TestPlugin.class).isEmpty()) { - return super.newTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings); + return super.newTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); } else { - return new MockTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings); + return new MockTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java index bccbd537a53b4..dec204537b917 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java @@ -27,8 +27,10 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -41,8 +43,8 @@ public class MockTaskManager extends TaskManager { private final Collection listeners = new CopyOnWriteArrayList<>(); - public MockTaskManager(Settings settings) { - super(settings); + public MockTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + super(settings, threadPool, taskHeaders); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index f5efa9a8c56fa..1c31533c9337d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -111,16 +111,16 @@ public static MockTransportService createNewService(Settings settings, Version v NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); final Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); - return createNewService(settings, transport, version, threadPool, clusterSettings); + return createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); } public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, - @Nullable ClusterSettings clusterSettings) { + @Nullable ClusterSettings clusterSettings, Set taskHeaders) { return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(), Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version), - clusterSettings); + clusterSettings, taskHeaders); } private final Transport original; @@ -135,7 +135,7 @@ public MockTransportService(Settings settings, Transport transport, ThreadPool t @Nullable ClusterSettings clusterSettings) { this(settings, transport, threadPool, interceptor, (boundAddress) -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), settings.get(Node.NODE_NAME_SETTING.getKey(), - UUIDs.randomBase64UUID())), clusterSettings); + UUIDs.randomBase64UUID())), clusterSettings, Collections.emptySet()); } /** @@ -146,8 +146,9 @@ public MockTransportService(Settings settings, Transport transport, ThreadPool t */ public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, - @Nullable ClusterSettings clusterSettings) { - super(settings, new LookupTestTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings); + @Nullable ClusterSettings clusterSettings, Set taskHeaders) { + super(settings, new LookupTestTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings, + taskHeaders); this.original = transport; } @@ -160,11 +161,11 @@ public static TransportAddress[] extractTransportAddresses(TransportService tran } @Override - protected TaskManager createTaskManager() { + protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { - return new MockTaskManager(settings); + return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(); + return super.createTaskManager(settings, threadPool, taskHeaders); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 17423207d56c6..20971b3865ea1 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1931,7 +1931,8 @@ public void testHandshakeWithIncompatVersion() { Version version = Version.fromString("2.0.0"); try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); - MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null)) { + MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null, + Collections.emptySet())) { service.start(); service.acceptIncomingRequests(); DiscoveryNode node = @@ -1953,7 +1954,8 @@ public void testHandshakeUpdatesVersion() throws IOException { Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT); try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); - MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null)) { + MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, null, + Collections.emptySet())) { service.start(); service.acceptIncomingRequests(); DiscoveryNode node = @@ -1989,7 +1991,7 @@ protected String handleRequest(TcpChannel mockChannel, String profileName, Strea }; try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, transport, Version.CURRENT, threadPool, - null)) { + null, Collections.emptySet())) { service.start(); service.acceptIncomingRequests(); // this acts like a node that doesn't have support for handshakes diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index 916e97ffd1211..e9f5f86462f54 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -50,7 +50,7 @@ protected Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, T } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings); + MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java index bb36ed9f6db1d..bd7fddf82b858 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java @@ -78,7 +78,7 @@ protected Version getCurrentVersion() { }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings); + MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } From a15ba75d934823d8867b5c69438c627563db7a86 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 12 Jan 2018 16:17:30 -0500 Subject: [PATCH 8/8] Fix lock accounting in releasable lock Releasble locks hold accounting on who holds the lock when assertions are enabled. However, the underlying lock can be re-entrant yet we mark the lock as not held by the current thread as soon as the releasable is closed. For a re-entrant lock this is not right because the thread could have entered the lock multiple times. Instead, we have to count how many times the thread has entered the lock and only mark the lock as not held by the current thread when the counter reaches zero. Relates #28202 --- .../util/concurrent/ReleasableLock.java | 20 ++-- .../util/concurrent/ReleasableLockTests.java | 97 +++++++++++++++++++ 2 files changed, 111 insertions(+), 6 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java index 46d6abff17632..9c90b3bbde313 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java @@ -31,8 +31,9 @@ public class ReleasableLock implements Releasable { private final Lock lock; - /* a per thread boolean indicating the lock is held by it. only works when assertions are enabled */ - private final ThreadLocal holdingThreads; + + // a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled + private final ThreadLocal holdingThreads; public ReleasableLock(Lock lock) { this.lock = lock; @@ -57,12 +58,19 @@ public ReleasableLock acquire() throws EngineException { } private boolean addCurrentThread() { - holdingThreads.set(true); + final Integer current = holdingThreads.get(); + holdingThreads.set(current == null ? 1 : current + 1); return true; } private boolean removeCurrentThread() { - holdingThreads.remove(); + final Integer count = holdingThreads.get(); + assert count != null && count > 0; + if (count == 1) { + holdingThreads.remove(); + } else { + holdingThreads.set(count - 1); + } return true; } @@ -70,7 +78,7 @@ public Boolean isHeldByCurrentThread() { if (holdingThreads == null) { throw new UnsupportedOperationException("asserts must be enabled"); } - Boolean b = holdingThreads.get(); - return b != null && b.booleanValue(); + final Integer count = holdingThreads.get(); + return count != null && count > 0; } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java new file mode 100644 index 0000000000000..6a303449ce1c1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java @@ -0,0 +1,97 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class ReleasableLockTests extends ESTestCase { + + /** + * Test that accounting on whether or not a thread holds a releasable lock is correct. Previously we had a bug where on a re-entrant + * lock that if a thread entered the lock twice we would declare that it does not hold the lock after it exits its first entrance but + * not its second entrance. + * + * @throws BrokenBarrierException if awaiting on the synchronization barrier breaks + * @throws InterruptedException if awaiting on the synchronization barrier is interrupted + */ + public void testIsHeldByCurrentThread() throws BrokenBarrierException, InterruptedException { + final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + final ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock()); + final ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock()); + + final int numberOfThreads = scaledRandomIntBetween(1, 32); + final int iterations = scaledRandomIntBetween(1, 32); + final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + final List threads = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; i++) { + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + for (int j = 0; j < iterations; j++) { + if (randomBoolean()) { + acquire(readLock, writeLock); + } else { + acquire(writeLock, readLock); + } + } + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + threads.add(thread); + thread.start(); + } + + barrier.await(); + barrier.await(); + for (final Thread thread : threads) { + thread.join(); + } + } + + private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) { + try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) { + assertTrue(lockToAcquire.isHeldByCurrentThread()); + assertFalse(otherLock.isHeldByCurrentThread()); + try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) { + assertTrue(lockToAcquire.isHeldByCurrentThread()); + assertFalse(otherLock.isHeldByCurrentThread()); + } + // previously there was a bug here and this would return false + assertTrue(lockToAcquire.isHeldByCurrentThread()); + assertFalse(otherLock.isHeldByCurrentThread()); + } + assertFalse(lockToAcquire.isHeldByCurrentThread()); + assertFalse(otherLock.isHeldByCurrentThread()); + } + +}