Skip to content

Commit

Permalink
Add ability to associate an ID with tasks (#27764)
Browse files Browse the repository at this point in the history
Adds support for capturing the X-Opaque-Id header from a REST request and storing it's value in the tasks that this request started. It works for all user-initiated tasks (not only search).

Closes #23250

Usage:
```
$ curl -H "X-Opaque-Id: imotov" -H "foo:bar" "localhost:9200/_tasks?pretty&group_by=parents"
{
  "tasks" : {
    "7qrTVbiDQKiZfubUP7DPkg:6998" : {
      "node" : "7qrTVbiDQKiZfubUP7DPkg",
      "id" : 6998,
      "type" : "transport",
      "action" : "cluster:monitor/tasks/lists",
      "start_time_in_millis" : 1513029940042,
      "running_time_in_nanos" : 266794,
      "cancellable" : false,
      "headers" : {
        "X-Opaque-Id" : "imotov"
      },
      "children" : [
        {
          "node" : "V-PuCjPhRp2ryuEsNw6V1g",
          "id" : 6088,
          "type" : "netty",
          "action" : "cluster:monitor/tasks/lists[n]",
          "start_time_in_millis" : 1513029940043,
          "running_time_in_nanos" : 67785,
          "cancellable" : false,
          "parent_task_id" : "7qrTVbiDQKiZfubUP7DPkg:6998",
          "headers" : {
            "X-Opaque-Id" : "imotov"
          }
        },
        {
          "node" : "7qrTVbiDQKiZfubUP7DPkg",
          "id" : 6999,
          "type" : "direct",
          "action" : "cluster:monitor/tasks/lists[n]",
          "start_time_in_millis" : 1513029940043,
          "running_time_in_nanos" : 98754,
          "cancellable" : false,
          "parent_task_id" : "7qrTVbiDQKiZfubUP7DPkg:6998",
          "headers" : {
            "X-Opaque-Id" : "imotov"
          }
        }
      ]
    }
  }
}
```
  • Loading branch information
imotov authored Jan 12, 2018
1 parent 6a5807a commit c75ac31
Show file tree
Hide file tree
Showing 84 changed files with 627 additions and 180 deletions.
68 changes: 68 additions & 0 deletions docs/reference/cluster/tasks.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,71 @@ The following command will change the grouping to parent tasks:
GET _tasks?group_by=parents
--------------------------------------------------
// CONSOLE

The grouping can be disabled by specifying `none` as a `group_by` parameter:

[source,js]
--------------------------------------------------
GET _tasks?group_by=none
--------------------------------------------------
// CONSOLE

[float]
=== Identifying running tasks

The `X-Opaque-Id` header, when provided on the HTTP request header, is going to be returned as a header in the response as well as
in the `headers` field for in the task information. This allows to track certain calls, or associate certain tasks with
a the client that started them:

[source,sh]
--------------------------------------------------
curl -i -H "X-Opaque-Id: 123456" "http://localhost:9200/_tasks?group_by=parents"
--------------------------------------------------
// NOTCONSOLE

The result will look similar to the following:

[source,js]
--------------------------------------------------
HTTP/1.1 200 OK
X-Opaque-Id: 123456 <1>
content-type: application/json; charset=UTF-8
content-length: 831
{
"tasks" : {
"u5lcZHqcQhu-rUoFaqDphA:45" : {
"node" : "u5lcZHqcQhu-rUoFaqDphA",
"id" : 45,
"type" : "transport",
"action" : "cluster:monitor/tasks/lists",
"start_time_in_millis" : 1513823752749,
"running_time_in_nanos" : 293139,
"cancellable" : false,
"headers" : {
"X-Opaque-Id" : "123456" <2>
},
"children" : [
{
"node" : "u5lcZHqcQhu-rUoFaqDphA",
"id" : 46,
"type" : "direct",
"action" : "cluster:monitor/tasks/lists[n]",
"start_time_in_millis" : 1513823752750,
"running_time_in_nanos" : 92133,
"cancellable" : false,
"parent_task_id" : "u5lcZHqcQhu-rUoFaqDphA:45",
"headers" : {
"X-Opaque-Id" : "123456" <3>
}
}
]
}
}
}
--------------------------------------------------
// NOTCONSOLE

<1> id as a part of the response header
<2> id for the tasks that was initiated by the REST request
<3> the child task of the task initiated by the REST request
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -123,6 +124,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
private SearchRequest firstSearchRequest;
private PlainActionFuture<BulkByScrollResponse> listener;
private String scrollId;
private ThreadPool threadPool;
private TaskManager taskManager;
private BulkByScrollTask testTask;
private WorkerBulkByScrollTaskState worker;
Expand All @@ -141,7 +143,8 @@ public void setupForTest() {
testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest);
listener = new PlainActionFuture<>();
scrollId = null;
taskManager = new TaskManager(Settings.EMPTY);
threadPool = new TestThreadPool(getClass().getName());
taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest);
testTask.setWorker(testRequest.getRequestsPerSecond(), null);
worker = testTask.getWorkerState();
Expand All @@ -159,8 +162,9 @@ private void setupClient(ThreadPool threadPool) {
}

@After
public void tearDownAndVerifyCommonStuff() {
public void tearDownAndVerifyCommonStuff() throws Exception {
client.close();
terminate(threadPool);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

Expand All @@ -53,7 +54,7 @@ public class TransportRethrottleActionTests extends ESTestCase {
@Before
public void createTask() {
slices = between(2, 50);
task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID);
task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap());
task.setWorkerCount(slices);
}

Expand Down Expand Up @@ -101,7 +102,8 @@ public void testRethrottleSuccessfulResponse() {
List<BulkByScrollTask.StatusOrException> sliceStatuses = new ArrayList<>(slices);
for (int i = 0; i < slices; i++) {
BulkByScrollTask.Status status = believeableInProgressStatus(i);
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId())));
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
Collections.emptyMap()));
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
}
rethrottleTestCase(slices,
Expand All @@ -121,7 +123,8 @@ public void testRethrottleWithSomeSucceeded() {
List<TaskInfo> tasks = new ArrayList<>();
for (int i = succeeded; i < slices; i++) {
BulkByScrollTask.Status status = believeableInProgressStatus(i);
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId())));
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
Collections.emptyMap()));
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
}
rethrottleTestCase(slices - succeeded,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Version getCurrentVersion() {
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings);
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected Version getCurrentVersion() {
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings);
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,44 @@
field3: value
- match: { hits.total: 1 }
- match: { hits.hits.0._id: q3 }

---
"Create a task result record in the old cluster":
- do:
indices.create:
index: reindexed_index
body:
settings:
index:
number_of_replicas: 0
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "1"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "2"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "3"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "4"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "5"}'

- do:
reindex:
wait_for_completion: false
body:
source:
index: reindexed_index
size: 1
dest:
index: reindexed_index_copy
- match: {task: '/.+:\d+/'}
- set: {task: task}

- do:
tasks.get:
wait_for_completion: true
task_id: $task
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,42 @@
field3: value
- match: { hits.total: 1 }
- match: { hits.hits.0._id: q3 }

---
"Find a task result record from the old cluster":
- do:
search:
index: .tasks
body:
query:
match_all: {}
- match: { hits.total: 1 }
- match: { hits.hits.0._id: '/.+:\d+/' }
- set: {hits.hits.0._id: task_id}

- do:
tasks.get:
wait_for_completion: true
task_id: $task_id

- is_false: node_failures
- is_true: task

- do:
headers: { "X-Opaque-Id": "Reindexing Again" }
reindex:
wait_for_completion: false
body:
source:
index: reindexed_index_copy
size: 1
dest:
index: reindexed_index_another_copy
- match: { task: '/.+:\d+/' }
- set: { task: task_id }

- do:
tasks.get:
wait_for_completion: true
task_id: $task_id
- match: { task.headers.X-Opaque-Id: "Reindexing Again" }
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"group_by": {
"type" : "enum",
"description": "Group tasks by nodes or parent/child relationships",
"options" : ["nodes", "parents"],
"options" : ["nodes", "parents", "none"],
"default" : "nodes"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,19 @@
group_by: parents

- is_true: tasks

---
"tasks_list headers":
- skip:
version: " - 6.99.99"
reason: task headers has been added in 7.0.0

- do:
headers: { "X-Opaque-Id": "That is me" }
tasks.list:
actions: "cluster:monitor/tasks/lists"
group_by: none

- is_true: tasks
- match: { tasks.0.headers.X-Opaque-Id: "That is me" }

Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.usage.UsageService;

Expand All @@ -324,6 +325,7 @@
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableMap;

Expand Down Expand Up @@ -362,7 +364,10 @@ public ActionModule(boolean transportClient, Settings settings, IndexNameExpress
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<String> headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet());
Set<String> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of("X-Opaque-Id")
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
* Transport action that can be used to cancel currently running cancellable tasks.
* <p>
* For a task to be cancellable it has to return an instance of
* {@link CancellableTask} from {@link TransportRequest#createTask(long, String, String, TaskId)}
* {@link CancellableTask} from {@link TransportRequest#createTask}
*/
public class TransportCancelTasksAction extends TransportTasksAction<CancellableTask, CancelTasksRequest, CancelTasksResponse, TaskInfo> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@ public XContentBuilder toXContentGroupedByParents(XContentBuilder builder, Param
return builder;
}

/**
* Presents a flat list of tasks
*/
public XContentBuilder toXContentGroupedByNone(XContentBuilder builder, Params params) throws IOException {
toXContentCommon(builder, params);
builder.startArray("tasks");
for (TaskInfo taskInfo : getTasks()) {
builder.startObject();
taskInfo.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
return builder;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
/**
* Max length of the source document to include into toString()
*
* @see ReplicationRequest#createTask(long, java.lang.String, java.lang.String, org.elasticsearch.tasks.TaskId)
* @see ReplicationRequest#createTask
*/
static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -428,9 +429,9 @@ public boolean isSuggestOnly() {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// generating description in a lazy way since source can be quite big
return new SearchTask(id, type, action, null, parentTaskId) {
return new SearchTask(id, type, action, null, parentTaskId, headers) {
@Override
public String getDescription() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -117,8 +118,8 @@ public void readFrom(StreamInput in) throws IOException {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
Expand Down
Loading

0 comments on commit c75ac31

Please sign in to comment.