diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java index 3b957b2defb0..9d105248e66c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java @@ -22,6 +22,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; @@ -108,4 +110,17 @@ public void cancelAsync(CancelTasksRequest cancelTasksRequest, RequestOptions op emptySet() ); } + /** + * Get current task using the Task Management API. + * See + * Task Management API on elastic.co + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public GetTaskResponse get(GetTaskRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(request, TasksRequestConverters::getTask, options, + GetTaskResponse::fromXContent, emptySet()); + } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java index 45723dcc938c..1ae170a60488 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java @@ -22,6 +22,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; final class TasksRequestConverters { @@ -54,4 +55,12 @@ static Request listTasks(ListTasksRequest listTaskRequest) { .putParam("group_by", "none"); return request; } + + public static Request getTask(GetTaskRequest req) { + String endpoint = new RequestConverters.EndpointBuilder() + .addPathPartAsIs("_tasks") + .addPathPart(req.getTaskId().toString()) + .build(); + return new Request(HttpGet.METHOD_NAME, endpoint); + } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/migration/IndexUpgradeSubmissionResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/migration/IndexUpgradeSubmissionResponse.java index 7c0aefc722f7..21191ec6437e 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/migration/IndexUpgradeSubmissionResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/migration/IndexUpgradeSubmissionResponse.java @@ -18,79 +18,23 @@ */ package org.elasticsearch.client.migration; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.TaskId; import java.io.IOException; -import java.util.Objects; /** * Response object that contains the taskID from submitted IndexUpgradeRequest */ -public class IndexUpgradeSubmissionResponse extends ActionResponse implements ToXContentObject { - - private static final ParseField TASK = new ParseField("task"); - - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "upgrade_submission_response", - true, a-> new IndexUpgradeSubmissionResponse( (TaskId) a[0])); - - static { - PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); - } +public class IndexUpgradeSubmissionResponse extends TaskSubmissionResponse { public static IndexUpgradeSubmissionResponse fromXContent(XContentParser parser) throws IOException { - return PARSER.parse(parser, null); - } - - private final TaskId task; - - IndexUpgradeSubmissionResponse(@Nullable TaskId task) { - this.task = task; - } - - - /** - * Get the task id - * @return the id of the upgrade task. - */ - public TaskId getTask() { - return task; - } - - @Override - public int hashCode() { - return Objects.hash(task); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other == null || getClass() != other.getClass()) { - return false; - } - - IndexUpgradeSubmissionResponse that = (IndexUpgradeSubmissionResponse) other; - return Objects.equals(task, that.task); + TaskId taskId = PARSER.parse(parser, null); + return new IndexUpgradeSubmissionResponse(taskId); } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - if (task != null) { - builder.field(TASK.getPreferredName(), task.toString()); - } - builder.endObject(); - return builder; + IndexUpgradeSubmissionResponse(TaskId task) { + super(task); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java new file mode 100644 index 000000000000..d4ee17531a20 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.tasks; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; + +/** + * Response object that contains the task id + */ +public class TaskSubmissionResponse extends ActionResponse implements ToXContentObject { + + protected static final ParseField TASK = new ParseField("task"); + + protected static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "task_submission_response", true, a -> (TaskId) a[0]); + + static { + PARSER.declareField(ConstructingObjectParser.constructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); + } + + public static TaskSubmissionResponse fromXContent(XContentParser parser) throws IOException { + TaskId taskId = PARSER.parse(parser, null); + return new TaskSubmissionResponse(taskId); + } + + private final TaskId task; + + protected TaskSubmissionResponse(TaskId task) { + this.task = task; + } + + /** + * Get the task id of the submitted task + */ + public TaskId getTask() { + return task; + } + + @Override + public int hashCode() { + return task.hashCode(); + } + + @Override + public boolean equals(Object other) { + return task.equals(other); + + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (task != null) { + builder.field(TASK.getPreferredName(), task.toString()); + } + builder.endObject(); + return builder; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java index baa97cfa5b4e..149afaf0e4e1 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java @@ -19,17 +19,30 @@ package org.elasticsearch.client; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; +import org.elasticsearch.client.migration.IndexUpgradeRequest; +import org.elasticsearch.client.migration.IndexUpgradeSubmissionResponse; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import java.io.IOException; +import java.util.Map; +import java.util.function.BooleanSupplier; +import java.util.function.Predicate; +import java.util.function.Supplier; import static java.util.Collections.emptyList; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; @@ -83,4 +96,57 @@ public void testCancelTasks() throws IOException { // The actual testing of task cancellation is covered by TasksIT.testTasksCancellation assertThat(response, notNullValue()); } + + public void testGetTask() throws IOException { + TaskId taskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong()); + + GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(taskId); + + ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, + () -> highLevelClient().tasks().get(getTaskRequest, RequestOptions.DEFAULT)); + + assertThat(exception.status(), equalTo(RestStatus.NOT_FOUND)); + + + ListTasksRequest request = new ListTasksRequest(); + ListTasksResponse response = execute(request, highLevelClient().tasks()::list, highLevelClient().tasks()::listAsync); + + assertThat(response, notNullValue()); + + } + + public void testGetTaskFoundWithError() throws IOException, InterruptedException { + createIndex("test", Settings.EMPTY); + + IndexUpgradeSubmissionResponse submissionResult = highLevelClient().migration() + .submitUpgradeTask(new IndexUpgradeRequest("test"), RequestOptions.DEFAULT); + + GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(submissionResult.getTask()); + + CheckedSupplier getTaskResponse = + () -> highLevelClient().tasks().get(getTaskRequest, RequestOptions.DEFAULT); + Predicate isCompleted = r -> r.getTask().isCompleted(); + + GetTaskResponse response = await(getTaskResponse, isCompleted); + Map errorAsMap = response.getTask().getErrorAsMap(); + assertThat((String) errorAsMap.get("reason"), containsString("cannot be upgraded")); + } + + private GetTaskResponse await(CheckedSupplier supplier, Predicate predicate) + throws InterruptedException { + BooleanSupplier isCompleted = () -> predicate.test(call(supplier).get()); + awaitBusy(isCompleted); + return call(supplier).get(); + } + + private Supplier call(CheckedSupplier supplier) { + return () -> { + try { + return supplier.get(); + } catch (IOException e) { + logger.warn("Exception while fetching task response", e); + return null; + } + }; + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java index ff6726faee18..bfdbfc9f7697 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java @@ -22,6 +22,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; @@ -112,4 +113,16 @@ public void testListTasks() { assertEquals("TaskId cannot be used for list tasks request", exception.getMessage()); } } + + public void testGetTasks() { + GetTaskRequest request = new GetTaskRequest(); + TaskId taskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong()); + request.setTaskId(taskId); + + Request httpRequest = TasksRequestConverters.getTask(request); + assertThat(httpRequest, notNullValue()); + assertThat(httpRequest.getMethod(), equalTo(HttpGet.METHOD_NAME)); + assertThat(httpRequest.getEntity(), nullValue()); + assertThat(httpRequest.getEndpoint(), equalTo("/_tasks/"+taskId.toString())); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java index 72f26d2d5769..18611a7d849d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java @@ -20,11 +20,17 @@ package org.elasticsearch.action.admin.cluster.node.tasks.get; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskResult; import java.io.IOException; @@ -75,4 +81,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public String toString() { return Strings.toString(this); } + + public static GetTaskResponse fromXContent(XContentParser parser) { + TaskResult result = TaskResult.PARSER.apply(parser, null); + return new GetTaskResponse(result); + } }