Skip to content

Commit d2e3319

Browse files
committed
Track Shard Snapshot Generation in Cluster State
Backport on elastic/elasticsearch#46864
1 parent 7744029 commit d2e3319

File tree

8 files changed

+224
-156
lines changed

8 files changed

+224
-156
lines changed

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.index.shard.ShardId;
3434
import org.elasticsearch.repositories.IndexId;
3535
import org.elasticsearch.snapshots.Snapshot;
36+
import org.elasticsearch.snapshots.SnapshotsService;
3637

3738
import javax.annotation.Nullable;
3839
import java.io.IOException;
@@ -262,26 +263,37 @@ public static class ShardSnapshotStatus {
262263
private final String nodeId;
263264
private final String reason;
264265

265-
public ShardSnapshotStatus(String nodeId) {
266-
this(nodeId, ShardState.INIT);
266+
@Nullable
267+
private final String generation;
268+
269+
public ShardSnapshotStatus(String nodeId, String generation) {
270+
this(nodeId, ShardState.INIT, generation);
267271
}
268272

269-
public ShardSnapshotStatus(String nodeId, ShardState state) {
270-
this(nodeId, state, null);
273+
public ShardSnapshotStatus(String nodeId, ShardState state, String generation) {
274+
this(nodeId, state, null, generation);
271275
}
272276

273-
public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
277+
public ShardSnapshotStatus(String nodeId, ShardState state, String reason, String generation) {
274278
this.nodeId = nodeId;
275279
this.state = state;
276280
this.reason = reason;
281+
this.generation = generation;
277282
// If the state is failed we have to have a reason for this failure
278283
assert state.failed() == false || reason != null;
279284
}
280285

281286
public ShardSnapshotStatus(StreamInput in) throws IOException {
282287
nodeId = in.readOptionalString();
283288
state = ShardState.fromValue(in.readByte());
289+
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
290+
generation = in.readOptionalString();
291+
assert generation != null || state != ShardState.SUCCESS : "Received null generation for shard state [" + state + "]";
292+
} else {
293+
generation = null;
294+
}
284295
reason = in.readOptionalString();
296+
285297
}
286298

287299
public ShardState state() {
@@ -296,9 +308,17 @@ public String reason() {
296308
return reason;
297309
}
298310

311+
@Nullable
312+
public String generation() {
313+
return generation;
314+
}
315+
299316
public void writeTo(StreamOutput out) throws IOException {
300317
out.writeOptionalString(nodeId);
301318
out.writeByte(state.value);
319+
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
320+
out.writeOptionalString(generation);
321+
}
302322
out.writeOptionalString(reason);
303323
}
304324

@@ -307,21 +327,24 @@ public boolean equals(Object o) {
307327
if (this == o) return true;
308328
if (o == null || getClass() != o.getClass()) return false;
309329
ShardSnapshotStatus status = (ShardSnapshotStatus) o;
310-
return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;
311-
330+
return Objects.equals(nodeId, status.nodeId) &&
331+
Objects.equals(reason, status.reason) &&
332+
Objects.equals(generation, status.generation) &&
333+
state == status.state;
312334
}
313335

314336
@Override
315337
public int hashCode() {
316338
int result = state != null ? state.hashCode() : 0;
317339
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
318340
result = 31 * result + (reason != null ? reason.hashCode() : 0);
341+
result = 31 * result + (generation != null ? generation.hashCode() : 0);
319342
return result;
320343
}
321344

322345
@Override
323346
public String toString() {
324-
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
347+
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + ", generation=" + generation + "]";
325348
}
326349
}
327350

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.util.set.Sets;
3939
import org.elasticsearch.index.Index;
4040
import org.elasticsearch.snapshots.RestoreService;
41+
import org.elasticsearch.snapshots.SnapshotInProgressException;
4142
import org.elasticsearch.snapshots.SnapshotsService;
4243

4344
import java.util.Arrays;
@@ -93,9 +94,15 @@ public ClusterState execute(final ClusterState currentState) {
9394
*/
9495
public ClusterState deleteIndices(ClusterState currentState, Set<Index> indices) {
9596
final MetaData meta = currentState.metaData();
96-
final Set<IndexMetaData> metaDatas = indices.stream().map(i -> meta.getIndexSafe(i)).collect(toSet());
97+
final Set<Index> indicesToDelete = indices.stream().map(i -> meta.getIndexSafe(i).getIndex()).collect(toSet());
98+
9799
// Check if index deletion conflicts with any running snapshots
98-
SnapshotsService.checkIndexDeletion(currentState, metaDatas);
100+
Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, indicesToDelete);
101+
if (snapshottingIndices.isEmpty() == false) {
102+
throw new SnapshotInProgressException("Cannot delete indices that are being snapshotted: " + snapshottingIndices +
103+
". Try again after snapshot finishes or cancel the currently running snapshot.");
104+
}
105+
99106
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
100107
MetaData.Builder metaDataBuilder = MetaData.builder(meta);
101108
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());

server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public enum Stage {
5858
}
5959

6060
private final AtomicReference<Stage> stage;
61+
private final AtomicReference<String> generation;
6162
private long startTime;
6263
private long totalTime;
6364
private int incrementalFileCount;
@@ -69,10 +70,19 @@ public enum Stage {
6970
private long indexVersion;
7071
private String failure;
7172

72-
private IndexShardSnapshotStatus(final Stage stage, final long startTime, final long totalTime,
73-
final int incrementalFileCount, final int totalFileCount, final int processedFileCount,
74-
final long incrementalSize, final long totalSize, final long processedSize, final String failure) {
73+
private IndexShardSnapshotStatus(final Stage stage,
74+
final long startTime,
75+
final long totalTime,
76+
final int incrementalFileCount,
77+
final int totalFileCount,
78+
final int processedFileCount,
79+
final long incrementalSize,
80+
final long totalSize,
81+
final long processedSize,
82+
final String failure,
83+
final String generation) {
7584
this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
85+
this.generation = new AtomicReference<>(generation);
7686
this.startTime = startTime;
7787
this.totalTime = totalTime;
7888
this.incrementalFileCount = incrementalFileCount;
@@ -109,21 +119,21 @@ public synchronized Copy moveToFinalize(final long indexVersion) {
109119
return asCopy();
110120
}
111121

112-
public synchronized Copy moveToDone(final long endTime) {
122+
public synchronized void moveToDone(final long endTime, final String newGeneration) {
123+
assert newGeneration != null;
113124
if (stage.compareAndSet(Stage.FINALIZE, Stage.DONE)) {
114125
this.totalTime = Math.max(0L, endTime - startTime);
126+
this.generation.set(newGeneration);
115127
} else {
116128
throw new IllegalStateException("Unable to move the shard snapshot status to [DONE]: " +
117129
"expecting [FINALIZE] but got [" + stage.get() + "]");
118130
}
119-
return asCopy();
120131
}
121132

122-
public synchronized Copy abortIfNotCompleted(final String failure) {
133+
public synchronized void abortIfNotCompleted(final String failure) {
123134
if (stage.compareAndSet(Stage.INIT, Stage.ABORTED) || stage.compareAndSet(Stage.STARTED, Stage.ABORTED)) {
124135
this.failure = failure;
125136
}
126-
return asCopy();
127137
}
128138

129139
public synchronized void moveToFailed(final long endTime, final String failure) {
@@ -133,6 +143,10 @@ public synchronized void moveToFailed(final long endTime, final String failure)
133143
}
134144
}
135145

146+
public String generation() {
147+
return generation.get();
148+
}
149+
136150
public boolean isAborted() {
137151
return stage.get() == Stage.ABORTED;
138152
}
@@ -158,24 +172,24 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() {
158172
indexVersion, failure);
159173
}
160174

161-
public static IndexShardSnapshotStatus newInitializing() {
162-
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null);
175+
public static IndexShardSnapshotStatus newInitializing(String generation) {
176+
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation);
163177
}
164178

165179
public static IndexShardSnapshotStatus newFailed(final String failure) {
166180
assert failure != null : "expecting non null failure for a failed IndexShardSnapshotStatus";
167181
if (failure == null) {
168182
throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
169183
}
170-
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure);
184+
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null);
171185
}
172186

173187
public static IndexShardSnapshotStatus newDone(final long startTime, final long totalTime,
174188
final int incrementalFileCount, final int fileCount,
175-
final long incrementalSize, final long size) {
189+
final long incrementalSize, final long size, String generation) {
176190
// The snapshot is done which means the number of processed files is the same as total
177191
return new IndexShardSnapshotStatus(Stage.DONE, startTime, totalTime, incrementalFileCount, fileCount, incrementalFileCount,
178-
incrementalSize, size, incrementalSize, null);
192+
incrementalSize, size, incrementalSize, null, generation);
179193
}
180194

181195
/**

server/src/main/java/org/elasticsearch/repositories/Repository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ void finalizeSnapshot(SnapshotId snapshotId,
224224
* @param listener listener invoked on completion
225225
*/
226226
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
227-
IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener);
227+
IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener);
228228

229229
/**
230230
* Restores snapshot of the shard.

0 commit comments

Comments
 (0)