100100import java .util .List ;
101101import java .util .Locale ;
102102import java .util .Map ;
103+ import java .util .Objects ;
103104import java .util .Optional ;
104105import java .util .Set ;
105106import java .util .concurrent .ConcurrentHashMap ;
@@ -165,7 +166,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
165166 // Set of snapshots that are currently being ended by this node
166167 private final Set <Snapshot > endingSnapshots = Collections .synchronizedSet (new HashSet <>());
167168
168- private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor ();
169169 private final UpdateSnapshotStatusAction updateSnapshotStatusHandler ;
170170
171171 private final TransportService transportService ;
@@ -2452,101 +2452,130 @@ public boolean assertAllListenersResolved() {
24522452 return true ;
24532453 }
24542454
2455- private static class SnapshotStateExecutor implements ClusterStateTaskExecutor <UpdateIndexShardSnapshotStatusRequest > {
2456-
2457- @ Override
2458- public ClusterTasksResult <UpdateIndexShardSnapshotStatusRequest >
2459- execute (ClusterState currentState , List <UpdateIndexShardSnapshotStatusRequest > tasks ) {
2460- int changedCount = 0 ;
2461- int startedCount = 0 ;
2462- final List <SnapshotsInProgress .Entry > entries = new ArrayList <>();
2463- // Tasks to check for updates for running snapshots.
2464- final List <UpdateIndexShardSnapshotStatusRequest > unconsumedTasks = new ArrayList <>(tasks );
2465- // Tasks that were used to complete an existing in-progress shard snapshot
2466- final Set <UpdateIndexShardSnapshotStatusRequest > executedTasks = new HashSet <>();
2467- for (SnapshotsInProgress .Entry entry : currentState .custom (SnapshotsInProgress .TYPE , SnapshotsInProgress .EMPTY ).entries ()) {
2468- if (entry .state ().completed ()) {
2469- entries .add (entry );
2455+ private static final ClusterStateTaskExecutor <ShardSnapshotUpdate > SHARD_STATE_EXECUTOR = (currentState , tasks ) -> {
2456+ int changedCount = 0 ;
2457+ int startedCount = 0 ;
2458+ final List <SnapshotsInProgress .Entry > entries = new ArrayList <>();
2459+ // Tasks to check for updates for running snapshots.
2460+ final List <ShardSnapshotUpdate > unconsumedTasks = new ArrayList <>(tasks );
2461+ // Tasks that were used to complete an existing in-progress shard snapshot
2462+ final Set <ShardSnapshotUpdate > executedTasks = new HashSet <>();
2463+ for (SnapshotsInProgress .Entry entry : currentState .custom (SnapshotsInProgress .TYPE , SnapshotsInProgress .EMPTY ).entries ()) {
2464+ if (entry .state ().completed ()) {
2465+ entries .add (entry );
2466+ continue ;
2467+ }
2468+ ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shards = null ;
2469+ for (Iterator <ShardSnapshotUpdate > iterator = unconsumedTasks .iterator (); iterator .hasNext (); ) {
2470+ final ShardSnapshotUpdate updateSnapshotState = iterator .next ();
2471+ final Snapshot updatedSnapshot = updateSnapshotState .snapshot ;
2472+ final String updatedRepository = updatedSnapshot .getRepository ();
2473+ if (entry .repository ().equals (updatedRepository ) == false ) {
24702474 continue ;
24712475 }
2472- ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shards = null ;
2473- for (Iterator <UpdateIndexShardSnapshotStatusRequest > iterator = unconsumedTasks .iterator (); iterator .hasNext (); ) {
2474- final UpdateIndexShardSnapshotStatusRequest updateSnapshotState = iterator .next ();
2475- final Snapshot updatedSnapshot = updateSnapshotState .snapshot ();
2476- final String updatedRepository = updatedSnapshot .getRepository ();
2477- if (entry .repository ().equals (updatedRepository ) == false ) {
2476+ final ShardId finishedShardId = updateSnapshotState .shardId ;
2477+ if (entry .snapshot ().getSnapshotId ().equals (updatedSnapshot .getSnapshotId ())) {
2478+ final ShardSnapshotStatus existing = entry .shards ().get (finishedShardId );
2479+ if (existing == null ) {
2480+ logger .warn ("Received shard snapshot status update [{}] but this shard is not tracked in [{}]" ,
2481+ updateSnapshotState , entry );
2482+ assert false : "This should never happen, data nodes should only send updates for expected shards" ;
24782483 continue ;
24792484 }
2480- final ShardId finishedShardId = updateSnapshotState .shardId ();
2481- if (entry .snapshot ().getSnapshotId ().equals (updatedSnapshot .getSnapshotId ())) {
2482- final ShardSnapshotStatus existing = entry .shards ().get (finishedShardId );
2483- if (existing == null ) {
2484- logger .warn ("Received shard snapshot status update [{}] but this shard is not tracked in [{}]" ,
2485- updateSnapshotState , entry );
2486- assert false : "This should never happen, data nodes should only send updates for expected shards" ;
2487- continue ;
2488- }
2489- if (existing .state ().completed ()) {
2490- // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
2491- iterator .remove ();
2492- continue ;
2493- }
2494- logger .trace ("[{}] Updating shard [{}] with status [{}]" , updatedSnapshot ,
2495- finishedShardId , updateSnapshotState .status ().state ());
2496- if (shards == null ) {
2497- shards = ImmutableOpenMap .builder (entry .shards ());
2498- }
2499- shards .put (finishedShardId , updateSnapshotState .status ());
2500- executedTasks .add (updateSnapshotState );
2501- changedCount ++;
2502- } else if (executedTasks .contains (updateSnapshotState )) {
2503- // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
2504- final ShardSnapshotStatus existingStatus = entry .shards ().get (finishedShardId );
2505- if (existingStatus == null || existingStatus .state () != ShardState .QUEUED ) {
2506- continue ;
2507- }
2508- if (shards == null ) {
2509- shards = ImmutableOpenMap .builder (entry .shards ());
2510- }
2511- final ShardSnapshotStatus finishedStatus = updateSnapshotState .status ();
2512- logger .trace ("Starting [{}] on [{}] with generation [{}]" , finishedShardId ,
2513- finishedStatus .nodeId (), finishedStatus .generation ());
2514- shards .put (finishedShardId , new ShardSnapshotStatus (finishedStatus .nodeId (), finishedStatus .generation ()));
2485+ if (existing .state ().completed ()) {
2486+ // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
25152487 iterator .remove ();
2516- startedCount ++;
2488+ continue ;
2489+ }
2490+ logger .trace ("[{}] Updating shard [{}] with status [{}]" , updatedSnapshot ,
2491+ finishedShardId , updateSnapshotState .updatedState .state ());
2492+ if (shards == null ) {
2493+ shards = ImmutableOpenMap .builder (entry .shards ());
2494+ }
2495+ shards .put (finishedShardId , updateSnapshotState .updatedState );
2496+ executedTasks .add (updateSnapshotState );
2497+ changedCount ++;
2498+ } else if (executedTasks .contains (updateSnapshotState )) {
2499+ // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
2500+ final ShardSnapshotStatus existingStatus = entry .shards ().get (finishedShardId );
2501+ if (existingStatus == null || existingStatus .state () != ShardState .QUEUED ) {
2502+ continue ;
2503+ }
2504+ if (shards == null ) {
2505+ shards = ImmutableOpenMap .builder (entry .shards ());
25172506 }
2507+ final ShardSnapshotStatus finishedStatus = updateSnapshotState .updatedState ;
2508+ logger .trace ("Starting [{}] on [{}] with generation [{}]" , finishedShardId ,
2509+ finishedStatus .nodeId (), finishedStatus .generation ());
2510+ shards .put (finishedShardId , new ShardSnapshotStatus (finishedStatus .nodeId (), finishedStatus .generation ()));
2511+ iterator .remove ();
2512+ startedCount ++;
25182513 }
2514+ }
25192515
2520- if (shards == null ) {
2521- entries .add (entry );
2522- } else {
2523- entries .add (entry .withShardStates (shards .build ()));
2524- }
2516+ if (shards == null ) {
2517+ entries .add (entry );
2518+ } else {
2519+ entries .add (entry .withShardStates (shards .build ()));
25252520 }
2526- if (changedCount > 0 ) {
2527- logger .trace ("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
2528- "[{}] shard snapshots" , changedCount , startedCount );
2529- return ClusterTasksResult .<UpdateIndexShardSnapshotStatusRequest >builder ().successes (tasks )
2530- .build (ClusterState .builder (currentState ).putCustom (SnapshotsInProgress .TYPE ,
2531- SnapshotsInProgress .of (entries )).build ());
2521+ }
2522+ if (changedCount > 0 ) {
2523+ logger .trace ("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
2524+ "[{}] shard snapshots" , changedCount , startedCount );
2525+ return ClusterStateTaskExecutor .ClusterTasksResult .<ShardSnapshotUpdate >builder ().successes (tasks ).build (
2526+ ClusterState .builder (currentState ).putCustom (SnapshotsInProgress .TYPE , SnapshotsInProgress .of (entries )).build ());
2527+ }
2528+ return ClusterStateTaskExecutor .ClusterTasksResult .<ShardSnapshotUpdate >builder ().successes (tasks ).build (currentState );
2529+ };
2530+
2531+ /**
2532+ * An update to the snapshot state of a shard.
2533+ */
2534+ private static final class ShardSnapshotUpdate {
2535+
2536+ private final Snapshot snapshot ;
2537+
2538+ private final ShardId shardId ;
2539+
2540+ private final ShardSnapshotStatus updatedState ;
2541+
2542+ private ShardSnapshotUpdate (Snapshot snapshot , ShardId shardId , ShardSnapshotStatus updatedState ) {
2543+ this .snapshot = snapshot ;
2544+ this .shardId = shardId ;
2545+ this .updatedState = updatedState ;
2546+ }
2547+
2548+ @ Override
2549+ public boolean equals (Object other ) {
2550+ if (this == other ) {
2551+ return true ;
25322552 }
2533- return ClusterTasksResult .<UpdateIndexShardSnapshotStatusRequest >builder ().successes (tasks ).build (currentState );
2553+ if ((other instanceof ShardSnapshotUpdate ) == false ) {
2554+ return false ;
2555+ }
2556+ final ShardSnapshotUpdate that = (ShardSnapshotUpdate ) other ;
2557+ return this .snapshot .equals (that .snapshot ) && this .shardId .equals (that .shardId ) && this .updatedState == that .updatedState ;
2558+ }
2559+
2560+
2561+ @ Override
2562+ public int hashCode () {
2563+ return Objects .hash (snapshot , shardId , updatedState );
25342564 }
25352565 }
25362566
25372567 /**
2538- * Updates the shard status on master node
2568+ * Updates the shard status in the cluster state
25392569 *
2540- * @param request update shard status request
2570+ * @param update shard snapshot status update
25412571 */
2542- private void innerUpdateSnapshotState (final UpdateIndexShardSnapshotStatusRequest request ,
2543- ActionListener <UpdateIndexShardSnapshotStatusResponse > listener ) {
2544- logger .trace ("received updated snapshot restore state [{}]" , request );
2572+ private void innerUpdateSnapshotState (ShardSnapshotUpdate update , ActionListener <Void > listener ) {
2573+ logger .trace ("received updated snapshot restore state [{}]" , update );
25452574 clusterService .submitStateUpdateTask (
25462575 "update snapshot state" ,
2547- request ,
2576+ update ,
25482577 ClusterStateTaskConfig .build (Priority .NORMAL ),
2549- snapshotStateExecutor ,
2578+ SHARD_STATE_EXECUTOR ,
25502579 new ClusterStateTaskListener () {
25512580 @ Override
25522581 public void onFailure (String source , Exception e ) {
@@ -2556,13 +2585,13 @@ public void onFailure(String source, Exception e) {
25562585 @ Override
25572586 public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
25582587 try {
2559- listener .onResponse (new UpdateIndexShardSnapshotStatusResponse () );
2588+ listener .onResponse (null );
25602589 } finally {
25612590 // Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
25622591 // state update we check if its state is completed and end it if it is.
2563- if (endingSnapshots .contains (request .snapshot () ) == false ) {
2592+ if (endingSnapshots .contains (update .snapshot ) == false ) {
25642593 final SnapshotsInProgress snapshotsInProgress = newState .custom (SnapshotsInProgress .TYPE );
2565- final SnapshotsInProgress .Entry updatedEntry = snapshotsInProgress .snapshot (request .snapshot () );
2594+ final SnapshotsInProgress .Entry updatedEntry = snapshotsInProgress .snapshot (update .snapshot );
25662595 // If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
25672596 if (updatedEntry != null && updatedEntry .state ().completed ()) {
25682597 endSnapshot (updatedEntry , newState .metadata (), null );
@@ -2590,13 +2619,14 @@ protected String executor() {
25902619
25912620 @ Override
25922621 protected UpdateIndexShardSnapshotStatusResponse read (StreamInput in ) throws IOException {
2593- return new UpdateIndexShardSnapshotStatusResponse ( in ) ;
2622+ return UpdateIndexShardSnapshotStatusResponse . INSTANCE ;
25942623 }
25952624
25962625 @ Override
25972626 protected void masterOperation (UpdateIndexShardSnapshotStatusRequest request , ClusterState state ,
25982627 ActionListener <UpdateIndexShardSnapshotStatusResponse > listener ) throws Exception {
2599- innerUpdateSnapshotState (request , listener );
2628+ innerUpdateSnapshotState (new ShardSnapshotUpdate (request .snapshot (), request .shardId (), request .status ()),
2629+ ActionListener .delegateFailure (listener , (l , v ) -> l .onResponse (UpdateIndexShardSnapshotStatusResponse .INSTANCE )));
26002630 }
26012631
26022632 @ Override
0 commit comments