Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

high level REST api: cancel task #30745

Merged
merged 21 commits into from
Jun 7, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
Expand Down Expand Up @@ -87,4 +89,5 @@ public void putPipelineAsync(PutPipelineRequest request, ActionListener<PutPipel
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
PutPipelineResponse::fromXContent, listener, emptySet(), headers);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods should be gone now, they have been moved to IngestClient upstream


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove empty line?

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.http.entity.ContentType;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
Expand Down Expand Up @@ -103,6 +104,17 @@ private RequestConverters() {
// Contains only status utility methods
}

static Request cancelTasks(CancelTasksRequest cancelTasksRequest) {
Request request = new Request(HttpPost.METHOD_NAME, "/_tasks/_cancel");
Params params = new Params(request);
params.withTimeout(
cancelTasksRequest.getTimeout())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you adjust the indentation?

.withTaskId(cancelTasksRequest.getTaskId())
.withNodes(cancelTasksRequest.getNodes())
.withActions(cancelTasksRequest.getActions());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the REST action seems to also support parent_task_id, the parameter is also declared in the REST spec so it should be here too?

return request;
}

static Request delete(DeleteRequest deleteRequest) {
String endpoint = endpoint(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
Expand Down Expand Up @@ -1005,6 +1017,13 @@ Params withActions(String[] actions) {
return this;
}

Params withTaskId(TaskId taskId) {
if (taskId != null && taskId.isSet()) {
return putParam("task_id", taskId.toString());
}
return this;
}

Params withParentTaskId(TaskId parentTaskId) {
if (parentTaskId != null && parentTaskId.isSet()) {
return putParam("parent_task_id", parentTaskId.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;

Expand Down Expand Up @@ -61,4 +63,37 @@ public void listAsync(ListTasksRequest request, ActionListener<ListTasksResponse
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
listener, emptySet(), headers);
}

/**
* Cancel one or more cluster tasks using the Task Management API
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a punctuation mark at the end of the sentence? It will make the generated html slightly better

* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
* </p>
*/
public CancelTasksResponse cancelTasks(CancelTasksRequest cancelTasksRequest, Header... headers) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call the methods just cancel given that they now belong to the TasksClient?

return restHighLevelClient.performRequestAndParseEntity(
cancelTasksRequest,
RequestConverters::cancelTasks,
parser -> CancelTasksResponse.fromXContent(parser),
emptySet(),
headers);
}

/**
* Asynchronously cancel one or more cluster tasks using the Task Management API
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, can you add the punctuation mark

* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
* </p>
*/
public void cancelTasksAsync(CancelTasksRequest cancelTasksRequest, ActionListener<CancelTasksResponse> listener, Header... headers) {
Copy link
Member

@javanna javanna Jun 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that #30490 is in, could you replace the header argument with the RequestOptions one? In the method that accept a listener we add RequestOptions between the request and the listener though.

restHighLevelClient.performRequestAsyncAndParseEntity(
cancelTasksRequest,
RequestConverters::cancelTasks,
parser -> CancelTasksResponse.fromXContent(parser),
listener,
emptySet(),
headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
package org.elasticsearch.client;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
Expand All @@ -34,6 +39,8 @@
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -147,4 +154,5 @@ public void testPutPipeline() throws IOException {
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
assertTrue(putPipelineResponse.isAcknowledged());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
Expand Down Expand Up @@ -1489,6 +1491,20 @@ public void testIndexPutSettings() throws IOException {
assertEquals(expectedParams, request.getParameters());
}

public void testCancelTasks() {
CancelTasksRequest request = new CancelTasksRequest();
Map<String, String> expectedParams = new HashMap<>();
TaskId taskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
request.setTaskId(taskId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also test the other supported parameters?

expectedParams.put("task_id", taskId.toString());
Request httpRequest = RequestConverters.cancelTasks(request);
assertThat(httpRequest, notNullValue());
assertThat(httpRequest.getMethod(), equalTo(HttpPost.METHOD_NAME));
assertThat(httpRequest.getEntity(), nullValue());
assertThat(httpRequest.getEndpoint(), equalTo("/_tasks/_cancel"));
assertThat(httpRequest.getParameters(), equalTo(expectedParams));
}

public void testListTasks() {
{
ListTasksRequest request = new ListTasksRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

package org.elasticsearch.client;

import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;

import java.io.IOException;
Expand Down Expand Up @@ -58,4 +61,32 @@ public void testListTasks() throws IOException {
assertTrue("List tasks were not found", listTasksFound);
}

public void testCancelTasks() throws IOException {
ListTasksRequest listRequest = new ListTasksRequest();
ListTasksResponse listResponse = execute(
listRequest,
highLevelClient().tasks()::list,
highLevelClient().tasks()::listAsync
);

// TODO[PCS] submit a task that is cancellable and assert it's cancelled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this TODO still valid? do we need to address it before merging?

// this case is covered in TasksIT.testTasksCancellation
TaskInfo firstTask = listResponse.getTasks().get(0);
String node = listResponse.getPerNodeTasks().keySet().iterator().next();

CancelTasksRequest request = new CancelTasksRequest();
request.setTaskId(new TaskId(node, firstTask.getId()));
request.setReason("testreason");
CancelTasksResponse response = execute(
request,
highLevelClient().tasks()::cancelTasks,
highLevelClient().tasks()::cancelTasksAsync
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: adjust indentation?

// Since the task may or may not have been cancelled, assert that we received a response only
// The actual testing of task cancellation is covered by TasksIT.testTasksCancellation
assertThat(response, notNullValue());
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

package org.elasticsearch.client.documentation;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
Expand All @@ -34,15 +38,20 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

/**
* This class is used to generate the Java Cluster API documentation.
Expand Down Expand Up @@ -257,4 +266,5 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
Expand Down Expand Up @@ -145,4 +147,74 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

public void testCancelTasks() throws IOException {
RestHighLevelClient client = highLevelClient();
{
// tag::cancel-tasks-request
CancelTasksRequest request = new CancelTasksRequest();
// end::cancel-tasks-request

// tag::cancel-tasks-request-filter
request.setTaskId(new TaskId("nodeId1", 42)); //<1>
request.setActions("cluster:*"); // <2>
request.setNodes("nodeId1", "nodeId2"); // <3>
// end::cancel-tasks-request-filter

}

CancelTasksRequest request = new CancelTasksRequest();
request.setTaskId(TaskId.EMPTY_TASK_ID);

// tag::cancel-tasks-execute
CancelTasksResponse response = client.tasks().cancelTasks(request);
// end::cancel-tasks-execute

assertThat(response, notNullValue());

// tag::cancel-tasks-response-tasks
List<TaskInfo> tasks = response.getTasks(); // <1>
// end::cancel-tasks-response-tasks


// tag::cancel-tasks-response-failures
List<ElasticsearchException> nodeFailures = response.getNodeFailures(); // <1>
List<TaskOperationFailure> taskFailures = response.getTaskFailures(); // <2>
// end::-tasks-response-failures

assertThat(response.getNodeFailures(), equalTo(emptyList()));
assertThat(response.getTaskFailures(), equalTo(emptyList()));
}

public void testAsyncCancelTasks() throws InterruptedException {

RestHighLevelClient client = highLevelClient();
{
CancelTasksRequest request = new CancelTasksRequest();

// tag::cancel-tasks-execute-listener
ActionListener<CancelTasksResponse> listener =
new ActionListener<CancelTasksResponse>() {
@Override
public void onResponse(CancelTasksResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::cancel-tasks-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::cancel-tasks-execute-async
client.tasks().cancelTasksAsync(request, listener); // <1>
// end::cancel-tasks-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}
2 changes: 2 additions & 0 deletions docs/java-rest/high-level/supported-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,7 @@ include::snapshot/delete_repository.asciidoc[]
The Java High Level REST Client supports the following Tasks APIs:

* <<java-rest-high-tasks-list>>
* <<java-rest-high-cluster-cancel-tasks>>

include::tasks/list_tasks.asciidoc[]
include::tasks/cancel_tasks.asciidoc[]
Loading