Skip to content

Commit 3bf3444

Browse files
imotovmartijnvg
authored andcommitted
Persistent Tasks: Add waitForPersistentTaskStatus method (#901)
This method allows to wait for tasks to change their status to match the supplied predicate.
1 parent 68c42fc commit 3bf3444

File tree

5 files changed

+74
-28
lines changed

5 files changed

+74
-28
lines changed

server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.elasticsearch.Version;
2222
import org.elasticsearch.cluster.AbstractNamedDiffable;
23+
import org.elasticsearch.cluster.ClusterState;
2324
import org.elasticsearch.cluster.NamedDiff;
2425
import org.elasticsearch.cluster.metadata.MetaData;
2526
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -182,6 +183,15 @@ public static PersistentTasksCustomMetaData fromXContent(XContentParser parser)
182183
return PERSISTENT_TASKS_PARSER.parse(parser, null).build();
183184
}
184185

186+
@SuppressWarnings("unchecked")
187+
public static <Request extends PersistentTaskRequest> PersistentTask<Request> getTaskWithId(ClusterState clusterState, long taskId) {
188+
PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
189+
if (tasks != null) {
190+
return (PersistentTask<Request>)tasks.getTask(taskId);
191+
}
192+
return null;
193+
}
194+
185195
public static class Assignment {
186196
@Nullable
187197
private final String executorNode;
@@ -228,8 +238,6 @@ public String toString() {
228238

229239
public static final Assignment INITIAL_ASSIGNMENT = new Assignment(null, "waiting for initial assignment");
230240

231-
public static final Assignment FINISHED_TASK_ASSIGNMENT = new Assignment(null, "task has finished");
232-
233241
/**
234242
* A record that represents a single running persistent task
235243
*/

server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,21 @@
2121
import org.elasticsearch.action.ActionListener;
2222
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
2323
import org.elasticsearch.client.Client;
24+
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.ClusterStateObserver;
2426
import org.elasticsearch.cluster.node.DiscoveryNode;
2527
import org.elasticsearch.cluster.service.ClusterService;
28+
import org.elasticsearch.common.Nullable;
2629
import org.elasticsearch.common.component.AbstractComponent;
2730
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.common.unit.TimeValue;
32+
import org.elasticsearch.node.NodeClosedException;
2833
import org.elasticsearch.tasks.Task;
2934
import org.elasticsearch.tasks.TaskId;
35+
import org.elasticsearch.threadpool.ThreadPool;
36+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
37+
38+
import java.util.function.Predicate;
3039

3140
/**
3241
* This service is used by persistent actions to propagate changes in the action state and notify about completion
@@ -35,11 +44,13 @@ public class PersistentTasksService extends AbstractComponent {
3544

3645
private final Client client;
3746
private final ClusterService clusterService;
47+
private final ThreadPool threadPool;
3848

39-
public PersistentTasksService(Settings settings, ClusterService clusterService, Client client) {
49+
public PersistentTasksService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
4050
super(settings);
4151
this.client = client;
4252
this.clusterService = clusterService;
53+
this.threadPool = threadPool;
4354
}
4455

4556
/**
@@ -115,15 +126,33 @@ public void removeTask(long taskId, PersistentTaskOperationListener listener) {
115126
}
116127

117128
/**
118-
* Starts a persistent task
129+
* Waits for the persistent task with giving id (taskId) to achieve the desired status.
119130
*/
120-
public void startTask(long taskId, PersistentTaskOperationListener listener) {
121-
StartPersistentTaskAction.Request startRequest = new StartPersistentTaskAction.Request(taskId);
122-
try {
123-
client.execute(StartPersistentTaskAction.INSTANCE, startRequest, ActionListener.wrap(o -> listener.onResponse(taskId),
124-
listener::onFailure));
125-
} catch (Exception e) {
126-
listener.onFailure(e);
131+
public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
132+
WaitForPersistentTaskStatusListener listener) {
133+
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
134+
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
135+
@Override
136+
public void onNewClusterState(ClusterState state) {
137+
listener.onResponse(taskId);
138+
}
139+
140+
@Override
141+
public void onClusterServiceClose() {
142+
listener.onFailure(new NodeClosedException(clusterService.localNode()));
143+
144+
}
145+
146+
@Override
147+
public void onTimeout(TimeValue timeout) {
148+
listener.onTimeout(timeout);
149+
}
150+
}, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId)));
151+
}
152+
153+
public interface WaitForPersistentTaskStatusListener extends PersistentTaskOperationListener {
154+
default void onTimeout(TimeValue timeout) {
155+
onFailure(new IllegalStateException("timed out after " + timeout));
127156
}
128157
}
129158

server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020
package org.elasticsearch.persistent;
2121

2222
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.unit.TimeValue;
2324
import org.elasticsearch.common.util.concurrent.BaseFuture;
2425
import org.elasticsearch.plugins.Plugin;
2526
import org.elasticsearch.tasks.TaskId;
2627
import org.elasticsearch.tasks.TaskInfo;
2728
import org.elasticsearch.test.ESIntegTestCase;
28-
import org.elasticsearch.persistent.PersistentTasksService.PersistentTaskOperationListener;
29+
import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
2930
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
3031
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest;
3132
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder;
@@ -34,12 +35,10 @@
3435
import java.util.Collection;
3536
import java.util.Collections;
3637
import java.util.List;
38+
import java.util.Objects;
3739

38-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3940
import static org.hamcrest.Matchers.empty;
4041
import static org.hamcrest.Matchers.equalTo;
41-
import static org.hamcrest.Matchers.not;
42-
import static org.hamcrest.Matchers.notNullValue;
4342
import static org.hamcrest.Matchers.nullValue;
4443

4544
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
@@ -64,7 +63,7 @@ public void cleanup() throws Exception {
6463
assertNoRunningTasks();
6564
}
6665

67-
public static class PersistentTaskOperationFuture extends BaseFuture<Long> implements PersistentTaskOperationListener {
66+
public static class PersistentTaskOperationFuture extends BaseFuture<Long> implements WaitForPersistentTaskStatusListener {
6867

6968
@Override
7069
public void onResponse(long taskId) {
@@ -166,7 +165,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
166165
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
167166
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
168167
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
169-
future.get();
168+
long taskId = future.get();
170169

171170
assertBusy(() -> {
172171
// Wait for the task to start
@@ -189,20 +188,30 @@ public void testPersistentActionStatusUpdate() throws Exception {
189188
.get().getTasks().size(), equalTo(1));
190189

191190
int finalI = i;
192-
assertBusy(() -> {
193-
PersistentTasksCustomMetaData tasks = internalCluster().clusterService().state().getMetaData()
194-
.custom(PersistentTasksCustomMetaData.TYPE);
195-
assertThat(tasks.tasks().size(), equalTo(1));
196-
assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue());
197-
assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}"));
198-
});
199-
191+
PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture();
192+
persistentTasksService.waitForPersistentTaskStatus(taskId,
193+
task -> task != null && task.isCurrentStatus()&& task.getStatus().toString() != null &&
194+
task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
195+
TimeValue.timeValueSeconds(10), future1);
196+
assertThat(future1.get(), equalTo(taskId));
200197
}
201198

199+
PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture();
200+
persistentTasksService.waitForPersistentTaskStatus(taskId,
201+
task -> false, TimeValue.timeValueMillis(10), future1);
202+
203+
expectThrows(Exception.class, future1::get);
204+
205+
// Wait for the task to disappear
206+
PersistentTaskOperationFuture future2 = new PersistentTaskOperationFuture();
207+
persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);
208+
202209
logger.info("Completing the running task");
203210
// Complete the running task and make sure it finishes properly
204211
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
205212
.get().getTasks().size(), equalTo(1));
213+
214+
assertThat(future2.get(), equalTo(taskId));
206215
}
207216

208217

server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public void testTaskCancellation() {
164164
ClusterService clusterService = createClusterService();
165165
AtomicLong capturedTaskId = new AtomicLong();
166166
AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>();
167-
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null) {
167+
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) {
168168
@Override
169169
public void sendCancellation(long taskId, PersistentTaskOperationListener listener) {
170170
capturedTaskId.set(taskId);
@@ -242,7 +242,7 @@ public void testNotificationFailure() throws Exception {
242242
AtomicReference<Exception> capturedException = new AtomicReference<>();
243243
AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>();
244244
PersistentTasksService persistentTasksService =
245-
new PersistentTasksService(Settings.EMPTY, clusterService, null) {
245+
new PersistentTasksService(Settings.EMPTY, clusterService, null, null) {
246246
@Override
247247
public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) {
248248
capturedTaskId.set(taskId);

server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
103103
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
104104
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
105105
NamedXContentRegistry xContentRegistry) {
106-
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, client);
106+
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, client);
107107
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, persistentTasksService,
108108
clusterService);
109109
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,

0 commit comments

Comments
 (0)