From 0d92c7d7e27922ef9e798958ee6dc083b0b7cb8a Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 29 Jan 2019 21:05:11 +0100 Subject: [PATCH 1/9] simpler step 1 --- .../snapshots/SnapshotShardsService.java | 88 +++++++++---------- 1 file changed, 41 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 6b7b506114361..0fe0d17a2461d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -213,7 +213,7 @@ public Map currentSnapshotShards(Snapshot sna * @param event cluster state changed event */ private void processIndexShardSnapshots(ClusterChangedEvent event) { - SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); Map> survivors = new HashMap<>(); // First, remove snapshots that are no longer there for (Map.Entry> entry : shardSnapshots.entrySet()) { @@ -313,51 +313,48 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { shutdownLock.unlock(); } - // We have new shards to starts - if (newSnapshots.isEmpty() == false) { - Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - for (final Map.Entry> entry : newSnapshots.entrySet()) { - final Snapshot snapshot = entry.getKey(); - final Map indicesMap = snapshotIndices.get(snapshot); - assert indicesMap != null; - - for (final Map.Entry shardEntry : entry.getValue().entrySet()) { - final ShardId shardId = shardEntry.getKey(); - final IndexId indexId = indicesMap.get(shardId.getIndexName()); - executor.execute(new AbstractRunnable() { - - final SetOnce failure = new SetOnce<>(); - - @Override - public void doRun() { - final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); - assert indexId != null; - snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); - } + Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + for (final Map.Entry> entry : newSnapshots.entrySet()) { + final Snapshot snapshot = entry.getKey(); + final Map indicesMap = snapshotIndices.get(snapshot); + assert indicesMap != null; - @Override - public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", - shardId, snapshot), e); - failure.set(e); - } + for (final Map.Entry shardEntry : entry.getValue().entrySet()) { + final ShardId shardId = shardEntry.getKey(); + final IndexId indexId = indicesMap.get(shardId.getIndexName()); + executor.execute(new AbstractRunnable() { - @Override - public void onRejection(Exception e) { - failure.set(e); - } + final SetOnce failure = new SetOnce<>(); - @Override - public void onAfter() { - final Exception exception = failure.get(); - if (exception != null) { - notifyFailedSnapshotShard(snapshot, shardId, localNodeId, ExceptionsHelper.detailedMessage(exception)); - } else { - notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId); - } + @Override + public void doRun() { + final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + assert indexId != null; + snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", + shardId, snapshot), e); + failure.set(e); + } + + @Override + public void onRejection(Exception e) { + failure.set(e); + } + + @Override + public void onAfter() { + final Exception exception = failure.get(); + if (exception != null) { + notifyFailedSnapshotShard(snapshot, shardId, localNodeId, ExceptionsHelper.detailedMessage(exception)); + } else { + notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId); } - }); - } + } + }); } } } @@ -549,7 +546,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - class SnapshotStateExecutor implements ClusterStateTaskExecutor { + private class SnapshotStateExecutor implements ClusterStateTaskExecutor { @Override public ClusterTasksResult @@ -592,11 +589,8 @@ class SnapshotStateExecutor implements ClusterStateTaskExecutor 0) { logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); - - final SnapshotsInProgress updatedSnapshots = - new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); return ClusterTasksResult.builder().successes(tasks).build( - ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build()); + ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entries)).build()); } } return ClusterTasksResult.builder().successes(tasks).build(currentState); From 29278069bbc10cd53d65b58b5471ee20fab9259a Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 29 Jan 2019 21:41:19 +0100 Subject: [PATCH 2/9] simpler step 1 --- .../snapshots/SnapshotShardsService.java | 127 +++++++++--------- 1 file changed, 65 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 0fe0d17a2461d..71322efdfb2d6 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -233,71 +233,20 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { // For now we will be mostly dealing with a single snapshot at a time but might have multiple simultaneously running // snapshots in the future - Map> newSnapshots = new HashMap<>(); + Map> newSnapshots; // Now go through all snapshots and update existing or create missing final String localNodeId = event.state().nodes().getLocalNodeId(); final Map> snapshotIndices = new HashMap<>(); if (snapshotsInProgress != null) { - for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { - snapshotIndices.put(entry.snapshot(), - entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()))); - if (entry.state() == State.STARTED) { - Map startedShards = new HashMap<>(); - Map snapshotShards = shardSnapshots.get(entry.snapshot()); - for (ObjectObjectCursor shard : entry.shards()) { - // Add all new shards to start processing on - if (localNodeId.equals(shard.value.nodeId())) { - if (shard.value.state() == State.INIT && (snapshotShards == null || !snapshotShards.containsKey(shard.key))) { - logger.trace("[{}] - Adding shard to the queue", shard.key); - startedShards.put(shard.key, IndexShardSnapshotStatus.newInitializing()); - } - } - } - if (!startedShards.isEmpty()) { - newSnapshots.put(entry.snapshot(), startedShards); - if (snapshotShards != null) { - // We already saw this snapshot but we need to add more started shards - Map shards = new HashMap<>(); - // Put all shards that were already running on this node - shards.putAll(snapshotShards); - // Put all newly started shards - shards.putAll(startedShards); - survivors.put(entry.snapshot(), unmodifiableMap(shards)); - } else { - // Brand new snapshot that we haven't seen before - survivors.put(entry.snapshot(), unmodifiableMap(startedShards)); - } - } - } else if (entry.state() == State.ABORTED) { - // Abort all running shards for this snapshot - Map snapshotShards = shardSnapshots.get(entry.snapshot()); - if (snapshotShards != null) { - final String failure = "snapshot has been aborted"; - for (ObjectObjectCursor shard : entry.shards()) { - - final IndexShardSnapshotStatus snapshotStatus = snapshotShards.get(shard.key); - if (snapshotStatus != null) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.abortIfNotCompleted(failure); - final Stage stage = lastSnapshotStatus.getStage(); - if (stage == Stage.FINALIZE) { - logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + - "letting it finish", entry.snapshot(), shard.key); - - } else if (stage == Stage.DONE) { - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + - "updating status on the master", entry.snapshot(), shard.key); - notifySuccessfulSnapshotShard(entry.snapshot(), shard.key, localNodeId); - - } else if (stage == Stage.FAILURE) { - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + - "updating status on the master", entry.snapshot(), shard.key); - notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, lastSnapshotStatus.getFailure()); - } - } - } - } + newSnapshots = newSnapshots(snapshotsInProgress, survivors, localNodeId); + if (newSnapshots.isEmpty() == false) { + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + snapshotIndices.put( + entry.snapshot(), entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()))); } } + } else { + newSnapshots = emptyMap(); } // Update the list of snapshots that we saw and tried to started @@ -324,7 +273,7 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { final IndexId indexId = indicesMap.get(shardId.getIndexName()); executor.execute(new AbstractRunnable() { - final SetOnce failure = new SetOnce<>(); + private final SetOnce failure = new SetOnce<>(); @Override public void doRun() { @@ -335,8 +284,7 @@ public void doRun() { @Override public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", - shardId, snapshot), e); + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); failure.set(e); } @@ -359,6 +307,61 @@ public void onAfter() { } } + private Map> newSnapshots(SnapshotsInProgress snapshotsInProgress, + Map> survivors, String localNodeId) { + final Map> newSnapshots = new HashMap<>(); + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + final State entryState = entry.state(); + if (entryState == State.STARTED) { + Map startedShards = new HashMap<>(); + final Snapshot snapshot = entry.snapshot(); + Map snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap()); + for (ObjectObjectCursor shard : entry.shards()) { + // Add all new shards to start processing on + if (localNodeId.equals(shard.value.nodeId()) && shard.value.state() == State.INIT + && snapshotShards.containsKey(shard.key) == false) { + logger.trace("[{}] - Adding shard to the queue", shard.key); + startedShards.put(shard.key, IndexShardSnapshotStatus.newInitializing()); + } + } + if (startedShards.isEmpty() == false) { + newSnapshots.put(snapshot, startedShards); + // We already saw this snapshot but we need to add more started shards + // Put all shards that were already running on this node + final Map shards = new HashMap<>(snapshotShards); + // Put all newly started shards + shards.putAll(startedShards); + survivors.put(snapshot, unmodifiableMap(shards)); + } + } else if (entryState == State.ABORTED) { + // Abort all running shards for this snapshot + final Snapshot snapshot = entry.snapshot(); + Map snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap()); + final String failure = "snapshot has been aborted"; + for (ObjectObjectCursor shard : entry.shards()) { + final IndexShardSnapshotStatus snapshotStatus = snapshotShards.get(shard.key); + if (snapshotStatus != null) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.abortIfNotCompleted(failure); + final Stage stage = lastSnapshotStatus.getStage(); + if (stage == Stage.FINALIZE) { + logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + + "letting it finish", snapshot, shard.key); + } else if (stage == Stage.DONE) { + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + + "updating status on the master", snapshot, shard.key); + notifySuccessfulSnapshotShard(snapshot, shard.key, localNodeId); + } else if (stage == Stage.FAILURE) { + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + + "updating status on the master", snapshot, shard.key); + notifyFailedSnapshotShard(snapshot, shard.key, localNodeId, lastSnapshotStatus.getFailure()); + } + } + } + } + } + return newSnapshots; + } + /** * Creates shard snapshot * From 82f58f8662064e4cd5d5912d8bcc0be680e6f109 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 30 Jan 2019 08:06:58 +0100 Subject: [PATCH 3/9] nicer --- .../snapshots/SnapshotShardsService.java | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 71322efdfb2d6..f58cc8d996cc8 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -214,22 +214,7 @@ public Map currentSnapshotShards(Snapshot sna */ private void processIndexShardSnapshots(ClusterChangedEvent event) { final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - Map> survivors = new HashMap<>(); - // First, remove snapshots that are no longer there - for (Map.Entry> entry : shardSnapshots.entrySet()) { - final Snapshot snapshot = entry.getKey(); - if (snapshotsInProgress != null && snapshotsInProgress.snapshot(snapshot) != null) { - survivors.put(entry.getKey(), entry.getValue()); - } else { - // abort any running snapshots of shards for the removed entry; - // this could happen if for some reason the cluster state update for aborting - // running shards is missed, then the snapshot is removed is a subsequent cluster - // state update, which is being processed here - for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().values()) { - snapshotStatus.abortIfNotCompleted("snapshot has been removed in cluster state, aborting"); - } - } - } + final Map> survivors = cancelRemoved(snapshotsInProgress); // For now we will be mostly dealing with a single snapshot at a time but might have multiple simultaneously running // snapshots in the future @@ -307,6 +292,26 @@ public void onAfter() { } } + private Map> cancelRemoved(SnapshotsInProgress snapshotsInProgress) { + Map> survivors = new HashMap<>(); + // First, remove snapshots that are no longer there + for (Map.Entry> entry : shardSnapshots.entrySet()) { + final Snapshot snapshot = entry.getKey(); + if (snapshotsInProgress != null && snapshotsInProgress.snapshot(snapshot) != null) { + survivors.put(entry.getKey(), entry.getValue()); + } else { + // abort any running snapshots of shards for the removed entry; + // this could happen if for some reason the cluster state update for aborting + // running shards is missed, then the snapshot is removed is a subsequent cluster + // state update, which is being processed here + for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().values()) { + snapshotStatus.abortIfNotCompleted("snapshot has been removed in cluster state, aborting"); + } + } + } + return survivors; + } + private Map> newSnapshots(SnapshotsInProgress snapshotsInProgress, Map> survivors, String localNodeId) { final Map> newSnapshots = new HashMap<>(); @@ -318,10 +323,11 @@ private Map> newSnapshots(Snaps Map snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap()); for (ObjectObjectCursor shard : entry.shards()) { // Add all new shards to start processing on + final ShardId shardId = shard.key; if (localNodeId.equals(shard.value.nodeId()) && shard.value.state() == State.INIT - && snapshotShards.containsKey(shard.key) == false) { - logger.trace("[{}] - Adding shard to the queue", shard.key); - startedShards.put(shard.key, IndexShardSnapshotStatus.newInitializing()); + && snapshotShards.containsKey(shardId) == false) { + logger.trace("[{}] - Adding shard to the queue", shardId); + startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing()); } } if (startedShards.isEmpty() == false) { From ea5cea51038871d30c6c2729d529f5dd17e359f9 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 30 Jan 2019 09:47:10 +0100 Subject: [PATCH 4/9] nicer --- .../snapshots/SnapshotShardsService.java | 165 ++++++++---------- 1 file changed, 69 insertions(+), 96 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index f58cc8d996cc8..62440cd965723 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -72,13 +72,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -106,11 +103,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final ThreadPool threadPool; - private final Lock shutdownLock = new ReentrantLock(); - - private final Condition shutdownCondition = shutdownLock.newCondition(); - - private volatile Map> shardSnapshots = emptyMap(); + private final Map> shardSnapshots = new HashMap<>(); private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; @@ -142,16 +135,6 @@ protected void doStart() { @Override protected void doStop() { - shutdownLock.lock(); - try { - while(!shardSnapshots.isEmpty() && shutdownCondition.await(5, TimeUnit.SECONDS)) { - // Wait for at most 5 second for locally running snapshots to finish - } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } finally { - shutdownLock.unlock(); - } } @Override @@ -166,7 +149,9 @@ public void clusterChanged(ClusterChangedEvent event) { SnapshotsInProgress currentSnapshots = event.state().custom(SnapshotsInProgress.TYPE); if ((previousSnapshots == null && currentSnapshots != null) || (previousSnapshots != null && previousSnapshots.equals(currentSnapshots) == false)) { - processIndexShardSnapshots(event); + synchronized (shardSnapshots) { + processIndexShardSnapshots(event); + } } String previousMasterNodeId = event.previousState().nodes().getMasterNodeId(); @@ -183,13 +168,14 @@ public void clusterChanged(ClusterChangedEvent event) { @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { // abort any snapshots occurring on the soon-to-be closed shard - Map> snapshotShardsMap = shardSnapshots; - for (Map.Entry> snapshotShards : snapshotShardsMap.entrySet()) { - Map shards = snapshotShards.getValue(); - if (shards.containsKey(shardId)) { - logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", - shardId, snapshotShards.getKey().getSnapshotId()); - shards.get(shardId).abortIfNotCompleted("shard is closing, aborting"); + synchronized (shardSnapshots) { + for (Map.Entry> snapshotShards : shardSnapshots.entrySet()) { + Map shards = snapshotShards.getValue(); + if (shards.containsKey(shardId)) { + logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", + shardId, snapshotShards.getKey().getSnapshotId()); + shards.get(shardId).abortIfNotCompleted("shard is closing, aborting"); + } } } } @@ -204,7 +190,10 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh * @return map of shard id to snapshot status */ public Map currentSnapshotShards(Snapshot snapshot) { - return shardSnapshots.get(snapshot); + synchronized (shardSnapshots) { + final Map current = shardSnapshots.get(snapshot); + return current == null ? null : new HashMap<>(current); + } } /** @@ -214,92 +203,77 @@ public Map currentSnapshotShards(Snapshot sna */ private void processIndexShardSnapshots(ClusterChangedEvent event) { final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - final Map> survivors = cancelRemoved(snapshotsInProgress); + + cancelRemoved(snapshotsInProgress); // For now we will be mostly dealing with a single snapshot at a time but might have multiple simultaneously running // snapshots in the future - Map> newSnapshots; // Now go through all snapshots and update existing or create missing final String localNodeId = event.state().nodes().getLocalNodeId(); - final Map> snapshotIndices = new HashMap<>(); if (snapshotsInProgress != null) { - newSnapshots = newSnapshots(snapshotsInProgress, survivors, localNodeId); + Map> newSnapshots = newSnapshots(snapshotsInProgress, localNodeId); if (newSnapshots.isEmpty() == false) { + final Map> snapshotIndices = new HashMap<>(); for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { snapshotIndices.put( entry.snapshot(), entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()))); } - } - } else { - newSnapshots = emptyMap(); - } - - // Update the list of snapshots that we saw and tried to started - // If startup of these shards fails later, we don't want to try starting these shards again - shutdownLock.lock(); - try { - shardSnapshots = unmodifiableMap(survivors); - if (shardSnapshots.isEmpty()) { - // Notify all waiting threads that no more snapshots - shutdownCondition.signalAll(); - } - } finally { - shutdownLock.unlock(); - } - - Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - for (final Map.Entry> entry : newSnapshots.entrySet()) { - final Snapshot snapshot = entry.getKey(); - final Map indicesMap = snapshotIndices.get(snapshot); - assert indicesMap != null; - - for (final Map.Entry shardEntry : entry.getValue().entrySet()) { - final ShardId shardId = shardEntry.getKey(); - final IndexId indexId = indicesMap.get(shardId.getIndexName()); - executor.execute(new AbstractRunnable() { - - private final SetOnce failure = new SetOnce<>(); - - @Override - public void doRun() { - final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); - assert indexId != null; - snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); - } + Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + for (final Map.Entry> entry : newSnapshots.entrySet()) { + final Snapshot snapshot = entry.getKey(); + final Map indicesMap = snapshotIndices.get(snapshot); + assert indicesMap != null; + + for (final Map.Entry shardEntry : entry.getValue().entrySet()) { + final ShardId shardId = shardEntry.getKey(); + final IndexId indexId = indicesMap.get(shardId.getIndexName()); + executor.execute(new AbstractRunnable() { + + private final SetOnce failure = new SetOnce<>(); + + @Override + public void doRun() { + final IndexShard indexShard = + indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + assert indexId != null; + snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); + } - @Override - public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); - failure.set(e); - } + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + failure.set(e); + } - @Override - public void onRejection(Exception e) { - failure.set(e); - } + @Override + public void onRejection(Exception e) { + failure.set(e); + } - @Override - public void onAfter() { - final Exception exception = failure.get(); - if (exception != null) { - notifyFailedSnapshotShard(snapshot, shardId, localNodeId, ExceptionsHelper.detailedMessage(exception)); - } else { - notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId); - } + @Override + public void onAfter() { + final Exception exception = failure.get(); + if (exception != null) { + notifyFailedSnapshotShard(snapshot, shardId, localNodeId, ExceptionsHelper.detailedMessage(exception)); + } else { + notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId); + } + } + }); } - }); + } } } } - private Map> cancelRemoved(SnapshotsInProgress snapshotsInProgress) { - Map> survivors = new HashMap<>(); + private void cancelRemoved(SnapshotsInProgress snapshotsInProgress) { // First, remove snapshots that are no longer there - for (Map.Entry> entry : shardSnapshots.entrySet()) { + for (Iterator>> it = shardSnapshots.entrySet().iterator(); + it.hasNext(); ) { + final Map.Entry> entry = it.next(); final Snapshot snapshot = entry.getKey(); - if (snapshotsInProgress != null && snapshotsInProgress.snapshot(snapshot) != null) { - survivors.put(entry.getKey(), entry.getValue()); - } else { + if (snapshotsInProgress == null || snapshotsInProgress.snapshot(snapshot) == null) { + it.remove(); // abort any running snapshots of shards for the removed entry; // this could happen if for some reason the cluster state update for aborting // running shards is missed, then the snapshot is removed is a subsequent cluster @@ -309,11 +283,10 @@ private Map> cancelRemoved(Snap } } } - return survivors; } private Map> newSnapshots(SnapshotsInProgress snapshotsInProgress, - Map> survivors, String localNodeId) { + String localNodeId) { final Map> newSnapshots = new HashMap<>(); for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { final State entryState = entry.state(); @@ -337,7 +310,7 @@ private Map> newSnapshots(Snaps final Map shards = new HashMap<>(snapshotShards); // Put all newly started shards shards.putAll(startedShards); - survivors.put(snapshot, unmodifiableMap(shards)); + shardSnapshots.put(snapshot, unmodifiableMap(shards)); } } else if (entryState == State.ABORTED) { // Abort all running shards for this snapshot From 3a9134a31322657719b69c81e7d6dfae1f31d03c Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 30 Jan 2019 10:49:25 +0100 Subject: [PATCH 5/9] nicer --- .../snapshots/SnapshotShardsService.java | 93 +++++++++---------- 1 file changed, 42 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 62440cd965723..f2a706baf74cc 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -209,75 +209,66 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { // For now we will be mostly dealing with a single snapshot at a time but might have multiple simultaneously running // snapshots in the future // Now go through all snapshots and update existing or create missing - final String localNodeId = event.state().nodes().getLocalNodeId(); if (snapshotsInProgress != null) { - Map> newSnapshots = newSnapshots(snapshotsInProgress, localNodeId); - if (newSnapshots.isEmpty() == false) { - final Map> snapshotIndices = new HashMap<>(); - for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { - snapshotIndices.put( - entry.snapshot(), entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()))); - } - Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - for (final Map.Entry> entry : newSnapshots.entrySet()) { - final Snapshot snapshot = entry.getKey(); - final Map indicesMap = snapshotIndices.get(snapshot); - assert indicesMap != null; - - for (final Map.Entry shardEntry : entry.getValue().entrySet()) { - final ShardId shardId = shardEntry.getKey(); - final IndexId indexId = indicesMap.get(shardId.getIndexName()); - executor.execute(new AbstractRunnable() { - - private final SetOnce failure = new SetOnce<>(); - - @Override - public void doRun() { - final IndexShard indexShard = - indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); - assert indexId != null; - snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); - } + final String localNodeId = event.state().nodes().getLocalNodeId(); + final Map> newSnapshots = newSnapshots(snapshotsInProgress, localNodeId); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + for (final Map.Entry> entry : newSnapshots.entrySet()) { + final Snapshot snapshot = entry.getKey(); + final Map indicesMap = snapshotsInProgress.snapshot(snapshot).indices().stream() + .collect(Collectors.toMap(IndexId::getName, Function.identity())); + for (final Map.Entry shardEntry : entry.getValue().entrySet()) { + final ShardId shardId = shardEntry.getKey(); + final IndexId indexId = indicesMap.get(shardId.getIndexName()); + executor.execute(new AbstractRunnable() { + + private final SetOnce failure = new SetOnce<>(); + + @Override + public void doRun() { + final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + assert indexId != null; + snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); + } - @Override - public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); - failure.set(e); - } + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + failure.set(e); + } - @Override - public void onRejection(Exception e) { - failure.set(e); - } + @Override + public void onRejection(Exception e) { + failure.set(e); + } - @Override - public void onAfter() { - final Exception exception = failure.get(); - if (exception != null) { - notifyFailedSnapshotShard(snapshot, shardId, localNodeId, ExceptionsHelper.detailedMessage(exception)); - } else { - notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId); - } + @Override + public void onAfter() { + final Exception exception = failure.get(); + if (exception != null) { + notifyFailedSnapshotShard(snapshot, shardId, localNodeId, ExceptionsHelper.detailedMessage(exception)); + } else { + notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId); } - }); - } + } + }); } } } } - private void cancelRemoved(SnapshotsInProgress snapshotsInProgress) { + private void cancelRemoved(@Nullable SnapshotsInProgress snapshotsInProgress) { // First, remove snapshots that are no longer there for (Iterator>> it = shardSnapshots.entrySet().iterator(); it.hasNext(); ) { final Map.Entry> entry = it.next(); final Snapshot snapshot = entry.getKey(); if (snapshotsInProgress == null || snapshotsInProgress.snapshot(snapshot) == null) { - it.remove(); // abort any running snapshots of shards for the removed entry; // this could happen if for some reason the cluster state update for aborting // running shards is missed, then the snapshot is removed is a subsequent cluster // state update, which is being processed here + it.remove(); for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().values()) { snapshotStatus.abortIfNotCompleted("snapshot has been removed in cluster state, aborting"); } @@ -316,11 +307,11 @@ private Map> newSnapshots(Snaps // Abort all running shards for this snapshot final Snapshot snapshot = entry.snapshot(); Map snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap()); - final String failure = "snapshot has been aborted"; for (ObjectObjectCursor shard : entry.shards()) { final IndexShardSnapshotStatus snapshotStatus = snapshotShards.get(shard.key); if (snapshotStatus != null) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.abortIfNotCompleted(failure); + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = + snapshotStatus.abortIfNotCompleted("snapshot has been aborted"); final Stage stage = lastSnapshotStatus.getStage(); if (stage == Stage.FINALIZE) { logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + From 228c75ebf5955140a4db22aa971383f26edaf3fa Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 30 Jan 2019 11:38:11 +0100 Subject: [PATCH 6/9] nicer --- .../snapshots/SnapshotShardsService.java | 133 ++++++++---------- 1 file changed, 62 insertions(+), 71 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index f2a706baf74cc..ad5b461744992 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -80,7 +80,6 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.cluster.SnapshotsInProgress.completed; import static org.elasticsearch.transport.EmptyTransportResponseHandler.INSTANCE_SAME; @@ -203,57 +202,9 @@ public Map currentSnapshotShards(Snapshot sna */ private void processIndexShardSnapshots(ClusterChangedEvent event) { final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - cancelRemoved(snapshotsInProgress); - - // For now we will be mostly dealing with a single snapshot at a time but might have multiple simultaneously running - // snapshots in the future - // Now go through all snapshots and update existing or create missing if (snapshotsInProgress != null) { - final String localNodeId = event.state().nodes().getLocalNodeId(); - final Map> newSnapshots = newSnapshots(snapshotsInProgress, localNodeId); - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - for (final Map.Entry> entry : newSnapshots.entrySet()) { - final Snapshot snapshot = entry.getKey(); - final Map indicesMap = snapshotsInProgress.snapshot(snapshot).indices().stream() - .collect(Collectors.toMap(IndexId::getName, Function.identity())); - for (final Map.Entry shardEntry : entry.getValue().entrySet()) { - final ShardId shardId = shardEntry.getKey(); - final IndexId indexId = indicesMap.get(shardId.getIndexName()); - executor.execute(new AbstractRunnable() { - - private final SetOnce failure = new SetOnce<>(); - - @Override - public void doRun() { - final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); - assert indexId != null; - snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); - } - - @Override - public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); - failure.set(e); - } - - @Override - public void onRejection(Exception e) { - failure.set(e); - } - - @Override - public void onAfter() { - final Exception exception = failure.get(); - if (exception != null) { - notifyFailedSnapshotShard(snapshot, shardId, localNodeId, ExceptionsHelper.detailedMessage(exception)); - } else { - notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId); - } - } - }); - } - } + startNewSnapshots(snapshotsInProgress); } } @@ -276,9 +227,11 @@ private void cancelRemoved(@Nullable SnapshotsInProgress snapshotsInProgress) { } } - private Map> newSnapshots(SnapshotsInProgress snapshotsInProgress, - String localNodeId) { - final Map> newSnapshots = new HashMap<>(); + private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { + final String localNodeId = clusterService.localNode().getId(); + // For now we will be mostly dealing with a single snapshot at a time but might have multiple simultaneously running + // snapshots in the future + // Now go through all snapshots and update existing or create missing for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { final State entryState = entry.state(); if (entryState == State.STARTED) { @@ -288,20 +241,15 @@ private Map> newSnapshots(Snaps for (ObjectObjectCursor shard : entry.shards()) { // Add all new shards to start processing on final ShardId shardId = shard.key; - if (localNodeId.equals(shard.value.nodeId()) && shard.value.state() == State.INIT + final ShardSnapshotStatus shardSnapshotStatus = shard.value; + if (localNodeId.equals(shardSnapshotStatus.nodeId()) && shardSnapshotStatus.state() == State.INIT && snapshotShards.containsKey(shardId) == false) { logger.trace("[{}] - Adding shard to the queue", shardId); startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing()); } } if (startedShards.isEmpty() == false) { - newSnapshots.put(snapshot, startedShards); - // We already saw this snapshot but we need to add more started shards - // Put all shards that were already running on this node - final Map shards = new HashMap<>(snapshotShards); - // Put all newly started shards - shards.putAll(startedShards); - shardSnapshots.put(snapshot, unmodifiableMap(shards)); + startNewShards(snapshotsInProgress, startedShards, snapshot); } } else if (entryState == State.ABORTED) { // Abort all running shards for this snapshot @@ -319,17 +267,61 @@ private Map> newSnapshots(Snaps } else if (stage == Stage.DONE) { logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + "updating status on the master", snapshot, shard.key); - notifySuccessfulSnapshotShard(snapshot, shard.key, localNodeId); + notifySuccessfulSnapshotShard(snapshot, shard.key); } else if (stage == Stage.FAILURE) { logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + "updating status on the master", snapshot, shard.key); - notifyFailedSnapshotShard(snapshot, shard.key, localNodeId, lastSnapshotStatus.getFailure()); + notifyFailedSnapshotShard(snapshot, shard.key, lastSnapshotStatus.getFailure()); } } } } } - return newSnapshots; + } + + private void startNewShards( + SnapshotsInProgress snapshotsInProgress, Map startedShards, Snapshot snapshot) { + shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>()).putAll(startedShards); + final Map indicesMap = snapshotsInProgress.snapshot(snapshot).indices().stream() + .collect(Collectors.toMap(IndexId::getName, Function.identity())); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + for (final Map.Entry shardEntry : startedShards.entrySet()) { + final ShardId shardId = shardEntry.getKey(); + final IndexId indexId = indicesMap.get(shardId.getIndexName()); + executor.execute(new AbstractRunnable() { + + private final SetOnce failure = new SetOnce<>(); + + @Override + public void doRun() { + final IndexShard indexShard = + indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + assert indexId != null; + snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + failure.set(e); + } + + @Override + public void onRejection(Exception e) { + failure.set(e); + } + + @Override + public void onAfter() { + final Exception exception = failure.get(); + if (exception != null) { + notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.detailedMessage(exception)); + } else { + notifySuccessfulSnapshotShard(snapshot, shardId); + } + } + }); + } } /** @@ -382,7 +374,6 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { return; } - final String localNodeId = event.state().nodes().getLocalNodeId(); for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { Map localShards = currentSnapshotShards(snapshot.snapshot()); @@ -399,13 +390,13 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { // but we think the shard is done - we need to make new master know that the shard is done logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " + "updating status on the master", snapshot.snapshot(), shardId); - notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localNodeId); + notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId); } else if (stage == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " + "updating status on master", snapshot.snapshot(), shardId); - notifyFailedSnapshotShard(snapshot.snapshot(), shardId, localNodeId, indexShardSnapshotStatus.getFailure()); + notifyFailedSnapshotShard(snapshot.snapshot(), shardId, indexShardSnapshotStatus.getFailure()); } } } @@ -474,13 +465,13 @@ public String toString() { } /** Notify the master node that the given shard has been successfully snapshotted **/ - void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String localNodeId) { - sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.SUCCESS)); + void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) { + sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.SUCCESS)); } /** Notify the master node that the given shard failed to be snapshotted **/ - void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String localNodeId, final String failure) { - sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.FAILED, failure)); + void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) { + sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.FAILED, failure)); } /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ From 29a3952b3e9e9bb52f05d192fe4a15feba324d53 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 30 Jan 2019 12:00:13 +0100 Subject: [PATCH 7/9] nicer --- .../snapshots/SnapshotShardsService.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index ad5b461744992..f510064c99c92 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -210,8 +210,8 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { private void cancelRemoved(@Nullable SnapshotsInProgress snapshotsInProgress) { // First, remove snapshots that are no longer there - for (Iterator>> it = shardSnapshots.entrySet().iterator(); - it.hasNext(); ) { + Iterator>> it = shardSnapshots.entrySet().iterator(); + while (it.hasNext()) { final Map.Entry> entry = it.next(); final Snapshot snapshot = entry.getKey(); if (snapshotsInProgress == null || snapshotsInProgress.snapshot(snapshot) == null) { @@ -228,7 +228,6 @@ private void cancelRemoved(@Nullable SnapshotsInProgress snapshotsInProgress) { } private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { - final String localNodeId = clusterService.localNode().getId(); // For now we will be mostly dealing with a single snapshot at a time but might have multiple simultaneously running // snapshots in the future // Now go through all snapshots and update existing or create missing @@ -238,6 +237,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { Map startedShards = new HashMap<>(); final Snapshot snapshot = entry.snapshot(); Map snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap()); + final String localNodeId = clusterService.localNode().getId(); for (ObjectObjectCursor shard : entry.shards()) { // Add all new shards to start processing on final ShardId shardId = shard.key; @@ -249,7 +249,8 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { } } if (startedShards.isEmpty() == false) { - startNewShards(snapshotsInProgress, startedShards, snapshot); + shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>()).putAll(startedShards); + startNewShards(entry, startedShards); } } else if (entryState == State.ABORTED) { // Abort all running shards for this snapshot @@ -279,11 +280,9 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { } } - private void startNewShards( - SnapshotsInProgress snapshotsInProgress, Map startedShards, Snapshot snapshot) { - shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>()).putAll(startedShards); - final Map indicesMap = snapshotsInProgress.snapshot(snapshot).indices().stream() - .collect(Collectors.toMap(IndexId::getName, Function.identity())); + private void startNewShards(SnapshotsInProgress.Entry entry, Map startedShards) { + final Snapshot snapshot = entry.snapshot(); + final Map indicesMap = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); for (final Map.Entry shardEntry : startedShards.entrySet()) { final ShardId shardId = shardEntry.getKey(); @@ -465,17 +464,17 @@ public String toString() { } /** Notify the master node that the given shard has been successfully snapshotted **/ - void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) { + private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) { sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.SUCCESS)); } /** Notify the master node that the given shard failed to be snapshotted **/ - void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) { + private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) { sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.FAILED, failure)); } /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ - void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) { + private void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) { try { UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, INSTANCE_SAME); @@ -514,7 +513,7 @@ private class SnapshotStateExecutor implements ClusterStateTaskExecutor - execute(ClusterState currentState, List tasks) throws Exception { + execute(ClusterState currentState, List tasks) { final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots != null) { int changedCount = 0; @@ -587,7 +586,7 @@ protected UpdateIndexShardSnapshotStatusResponse newResponse() { @Override protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, - ActionListener listener) throws Exception { + ActionListener listener) { innerUpdateSnapshotState(request, listener); } From 2f15c75e70c9c5da53db4934fff407d33a2a1329 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 30 Jan 2019 12:04:15 +0100 Subject: [PATCH 8/9] safer --- .../org/elasticsearch/snapshots/SnapshotShardsService.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index f510064c99c92..8b6d0dca20dfe 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -80,6 +80,7 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyMap; +import static java.util.Collections.unmodifiableList; import static org.elasticsearch.cluster.SnapshotsInProgress.completed; import static org.elasticsearch.transport.EmptyTransportResponseHandler.INSTANCE_SAME; @@ -552,8 +553,9 @@ private class SnapshotStateExecutor implements ClusterStateTaskExecutor 0) { logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); - return ClusterTasksResult.builder().successes(tasks).build( - ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entries)).build()); + return ClusterTasksResult.builder().successes(tasks) + .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, + new SnapshotsInProgress(unmodifiableList(entries))).build()); } } return ClusterTasksResult.builder().successes(tasks).build(currentState); From 7bacf372de753657447ea2731ec0b5eac96e4f2e Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 1 Feb 2019 17:38:21 +0100 Subject: [PATCH 9/9] CR: simplifications --- .../snapshots/SnapshotShardsService.java | 18 ++++++++++-------- .../snapshots/SnapshotsService.java | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 85a5225cb94a2..fdcf22a080ecb 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -157,7 +157,7 @@ public void clusterChanged(ClusterChangedEvent event) { if ((previousSnapshots == null && currentSnapshots != null) || (previousSnapshots != null && previousSnapshots.equals(currentSnapshots) == false)) { synchronized (shardSnapshots) { - processIndexShardSnapshots(event); + processIndexShardSnapshots(currentSnapshots); } } @@ -206,10 +206,9 @@ public Map currentSnapshotShards(Snapshot sna /** * Checks if any new shards should be snapshotted on this node * - * @param event cluster state changed event + * @param snapshotsInProgress Current snapshots in progress in cluster state */ - private void processIndexShardSnapshots(ClusterChangedEvent event) { - final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + private void processIndexShardSnapshots(SnapshotsInProgress snapshotsInProgress) { cancelRemoved(snapshotsInProgress); if (snapshotsInProgress != null) { startNewSnapshots(snapshotsInProgress); @@ -239,13 +238,13 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { // For now we will be mostly dealing with a single snapshot at a time but might have multiple simultaneously running // snapshots in the future // Now go through all snapshots and update existing or create missing + final String localNodeId = clusterService.localNode().getId(); for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { final State entryState = entry.state(); if (entryState == State.STARTED) { - Map startedShards = new HashMap<>(); + Map startedShards = null; final Snapshot snapshot = entry.snapshot(); Map snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap()); - final String localNodeId = clusterService.localNode().getId(); for (ObjectObjectCursor shard : entry.shards()) { // Add all new shards to start processing on final ShardId shardId = shard.key; @@ -253,10 +252,13 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { if (localNodeId.equals(shardSnapshotStatus.nodeId()) && shardSnapshotStatus.state() == State.INIT && snapshotShards.containsKey(shardId) == false) { logger.trace("[{}] - Adding shard to the queue", shardId); + if (startedShards == null) { + startedShards = new HashMap<>(); + } startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing()); } } - if (startedShards.isEmpty() == false) { + if (startedShards != null && startedShards.isEmpty() == false) { shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>()).putAll(startedShards); startNewShards(entry, startedShards); } @@ -303,6 +305,7 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map shardEntry : startedShards.entrySet()) { final ShardId shardId = shardEntry.getKey(); final IndexId indexId = indicesMap.get(shardId.getIndexName()); + assert indexId != null; executor.execute(new AbstractRunnable() { private final SetOnce failure = new SetOnce<>(); @@ -311,7 +314,6 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes - * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method
  • + * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(SnapshotsInProgress)} method *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using * the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method
  • *
  • When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot