diff --git a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java index b311e559c6e91..e48b6c972cb04 100644 --- a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java +++ b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; @@ -38,18 +37,16 @@ * Represents a executor node operation that corresponds to a persistent task */ public class AllocatedPersistentTask extends CancellableTask { - private volatile String persistentTaskId; - private volatile long allocationId; private final AtomicReference state; - @Nullable - private volatile Exception failure; + private volatile String persistentTaskId; + private volatile long allocationId; + private volatile @Nullable Exception failure; private volatile PersistentTasksService persistentTasksService; private volatile Logger logger; private volatile TaskManager taskManager; - public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { super(id, type, action, description, parentTask, headers); @@ -101,24 +98,10 @@ public Exception getFailure() { return failure; } - boolean markAsCancelled() { - return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.PENDING_CANCEL); - } - - public State getState() { - return state.get(); - } - public long getAllocationId() { return allocationId; } - public enum State { - STARTED, // the task is currently running - PENDING_CANCEL, // the task is cancelled on master, cancelling it locally - COMPLETED // the task is done running and trying to notify caller - } - /** * Waits for this persistent task to have the desired state. */ @@ -128,6 +111,14 @@ public void waitForPersistentTaskStatus(Predicate action = mock(PersistentTasksExecutor.class); when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); @@ -131,8 +131,8 @@ public void testStartTask() throws Exception { if (added == false) { logger.info("No local node action was added"); - } + MetaData.Builder metaData = MetaData.builder(state.metaData()); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build(); @@ -149,6 +149,7 @@ public void testStartTask() throws Exception { // Make sure action wasn't called again assertThat(executor.executions.size(), equalTo(1)); + assertThat(executor.get(0).task.isCompleted(), is(false)); // Start another task on this node state = newClusterState; @@ -157,10 +158,15 @@ public void testStartTask() throws Exception { // Make sure action was called this time assertThat(executor.size(), equalTo(2)); + assertThat(executor.get(1).task.isCompleted(), is(false)); // Finish both tasks executor.get(0).task.markAsFailed(new RuntimeException()); executor.get(1).task.markAsCompleted(); + + assertThat(executor.get(0).task.isCompleted(), is(true)); + assertThat(executor.get(1).task.isCompleted(), is(true)); + String failedTaskId = executor.get(0).task.getPersistentTaskId(); String finishedTaskId = executor.get(1).task.getPersistentTaskId(); executor.clear(); @@ -186,7 +192,6 @@ public void testStartTask() throws Exception { // Make sure action was only allocated on this node once assertThat(executor.size(), equalTo(1)); } - } public void testParamsStatusAndNodeTaskAreDelegated() throws Exception { @@ -300,7 +305,6 @@ public void sendCompletionNotification(String taskId, long allocationId, Excepti // Check the the task is now removed from task manager assertThat(taskManager.getTasks().values(), empty()); - } private ClusterState addTask(ClusterState state, String action, Params params,