3131import org .elasticsearch .cluster .metadata .MetaData ;
3232import org .elasticsearch .cluster .node .DiscoveryNodes ;
3333import org .elasticsearch .cluster .service .ClusterService ;
34+ import org .elasticsearch .common .settings .Setting ;
3435import org .elasticsearch .common .settings .Settings ;
36+ import org .elasticsearch .common .unit .TimeValue ;
37+ import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
38+ import org .elasticsearch .common .util .concurrent .FutureUtils ;
3539import org .elasticsearch .persistent .PersistentTasksCustomMetaData .Assignment ;
3640import org .elasticsearch .persistent .PersistentTasksCustomMetaData .PersistentTask ;
3741import org .elasticsearch .persistent .decider .AssignmentDecision ;
3842import org .elasticsearch .persistent .decider .EnableAssignmentDecider ;
43+ import org .elasticsearch .threadpool .ThreadPool ;
3944
4045import java .util .Objects ;
46+ import java .util .concurrent .Future ;
4147
4248/**
4349 * Component that runs only on the master node and is responsible for assigning running tasks to nodes
4450 */
4551public class PersistentTasksClusterService implements ClusterStateListener {
4652
53+ public static final Setting <TimeValue > CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING =
54+ Setting .timeSetting ("cluster.persistent_tasks.allocation.recheck_interval" , TimeValue .timeValueSeconds (30 ),
55+ TimeValue .timeValueSeconds (10 ), Setting .Property .Dynamic , Setting .Property .NodeScope );
56+
4757 private static final Logger logger = LogManager .getLogger (PersistentTasksClusterService .class );
4858
4959 private final ClusterService clusterService ;
5060 private final PersistentTasksExecutorRegistry registry ;
5161 private final EnableAssignmentDecider decider ;
62+ private final ThreadPool threadPool ;
63+ private final PeriodicRechecker periodicRechecker ;
64+ private volatile TimeValue recheckInterval ;
5265
53- public PersistentTasksClusterService (Settings settings , PersistentTasksExecutorRegistry registry , ClusterService clusterService ) {
66+ public PersistentTasksClusterService (Settings settings , PersistentTasksExecutorRegistry registry , ClusterService clusterService ,
67+ ThreadPool threadPool ) {
5468 this .clusterService = clusterService ;
5569 clusterService .addListener (this );
70+ clusterService .getClusterSettings ().addSettingsUpdateConsumer (CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING ,
71+ this ::setRecheckInterval );
5672 this .registry = registry ;
5773 this .decider = new EnableAssignmentDecider (settings , clusterService .getClusterSettings ());
74+ this .recheckInterval = CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING .get (settings );
75+ this .threadPool = threadPool ;
76+ this .periodicRechecker = new PeriodicRechecker ();
77+ }
78+
79+ void setRecheckInterval (TimeValue recheckInterval ) {
80+ this .recheckInterval = recheckInterval ;
81+ periodicRechecker .rescheduleIfScheduled ();
5882 }
5983
6084 /**
@@ -241,24 +265,36 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(final
241265
242266 @ Override
243267 public void clusterChanged (ClusterChangedEvent event ) {
268+ periodicRechecker .cancel ();
244269 if (event .localNodeMaster ()) {
245270 if (shouldReassignPersistentTasks (event )) {
246- logger .trace ("checking task reassignment for cluster state {}" , event .state ().getVersion ());
247- clusterService .submitStateUpdateTask ("reassign persistent tasks" , new ClusterStateUpdateTask () {
248- @ Override
249- public ClusterState execute (ClusterState currentState ) {
250- return reassignTasks (currentState );
251- }
252-
253- @ Override
254- public void onFailure (String source , Exception e ) {
255- logger .warn ("failed to reassign persistent tasks" , e );
256- }
257- });
271+ reassignPersistentTasks (event .state ().getVersion ());
272+ } else {
273+ periodicRechecker .schedule ();
258274 }
259275 }
260276 }
261277
278+ /**
279+ * Submit a cluster state update to reassign any persistent tasks that need reassigning
280+ */
281+ private void reassignPersistentTasks (long currentStateVersion ) {
282+ logger .trace ("checking task reassignment for cluster state {}" , currentStateVersion );
283+ clusterService .submitStateUpdateTask ("reassign persistent tasks" , new ClusterStateUpdateTask () {
284+ @ Override
285+ public ClusterState execute (ClusterState currentState ) {
286+ ClusterState newState = reassignTasks (currentState );
287+ periodicRechecker .schedule ();
288+ return newState ;
289+ }
290+
291+ @ Override
292+ public void onFailure (String source , Exception e ) {
293+ logger .warn ("failed to reassign persistent tasks" , e );
294+ }
295+ });
296+ }
297+
262298 /**
263299 * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following
264300 * situations: a node left or is added, the routing table changed, the master node changed, the metadata changed or the
@@ -278,12 +314,21 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
278314 || event .metaDataChanged ()
279315 || masterChanged ) {
280316
281- for (PersistentTask <?> task : tasks .tasks ()) {
282- if (needsReassignment (task .getAssignment (), event .state ().nodes ())) {
283- Assignment assignment = createAssignment (task .getTaskName (), task .getParams (), event .state ());
284- if (Objects .equals (assignment , task .getAssignment ()) == false ) {
285- return true ;
286- }
317+ return anyTaskNeedsReassignment (tasks , event .state ());
318+ }
319+ return false ;
320+ }
321+
322+ /**
323+ * Returns true if any persistent task provided requires reassignment,
324+ * i.e. is not assigned or is assigned to a non-existing node.
325+ */
326+ private boolean anyTaskNeedsReassignment (final PersistentTasksCustomMetaData tasks , final ClusterState state ) {
327+ for (PersistentTask <?> task : tasks .tasks ()) {
328+ if (needsReassignment (task .getAssignment (), state .nodes ())) {
329+ Assignment assignment = createAssignment (task .getTaskName (), task .getParams (), state );
330+ if (Objects .equals (assignment , task .getAssignment ()) == false ) {
331+ return true ;
287332 }
288333 }
289334 }
@@ -347,4 +392,51 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus
347392 return currentState ;
348393 }
349394 }
395+
396+ /**
397+ * Class to periodically try to reassign unassigned persistent tasks.
398+ */
399+ private class PeriodicRechecker implements Runnable {
400+
401+ private volatile Future <?> nextRun ;
402+
403+ void schedule () {
404+ try {
405+ synchronized (this ) {
406+ FutureUtils .cancel (nextRun );
407+ nextRun = threadPool .schedule (recheckInterval , ThreadPool .Names .GENERIC , this );
408+ }
409+ } catch (EsRejectedExecutionException e ) {
410+ logger .debug ("could not schedule periodic persistent task assignment check" , e );
411+ }
412+ }
413+
414+ synchronized void cancel () {
415+ FutureUtils .cancel (nextRun );
416+ nextRun = null ;
417+ }
418+
419+ synchronized void rescheduleIfScheduled () {
420+ if (nextRun != null ) {
421+ schedule ();
422+ }
423+ }
424+
425+ @ Override
426+ public void run () {
427+ synchronized (this ) {
428+ nextRun = null ;
429+ }
430+ if (clusterService .localNode ().isMasterNode ()) {
431+ logger .trace ("periodic persistent task assignment check running" );
432+ ClusterState state = clusterService .state ();
433+ final PersistentTasksCustomMetaData tasks = state .getMetaData ().custom (PersistentTasksCustomMetaData .TYPE );
434+ if (tasks != null && anyTaskNeedsReassignment (tasks , state )) {
435+ reassignPersistentTasks (state .getVersion ());
436+ } else {
437+ schedule ();
438+ }
439+ }
440+ }
441+ }
350442}
0 commit comments