Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotsService;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -249,27 +250,39 @@ public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {
public static class ShardSnapshotStatus {
private final ShardState state;
private final String nodeId;

@Nullable
private final String generation;

@Nullable
private final String reason;

public ShardSnapshotStatus(String nodeId) {
this(nodeId, ShardState.INIT);
public ShardSnapshotStatus(String nodeId, String generation) {
this(nodeId, ShardState.INIT, generation);
}

public ShardSnapshotStatus(String nodeId, ShardState state) {
this(nodeId, state, null);
public ShardSnapshotStatus(String nodeId, ShardState state, String generation) {
this(nodeId, state, null, generation);
}

public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
public ShardSnapshotStatus(String nodeId, ShardState state, String reason, String generation) {
this.nodeId = nodeId;
this.state = state;
this.reason = reason;
this.generation = generation;
// If the state is failed we have to have a reason for this failure
assert state.failed() == false || reason != null;
}

public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = ShardState.fromValue(in.readByte());
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
generation = in.readOptionalString();
assert generation != null || state != ShardState.SUCCESS : "Received null generation for shard state [" + state + "]";
} else {
generation = null;
}
reason = in.readOptionalString();
}

Expand All @@ -281,13 +294,20 @@ public String nodeId() {
return nodeId;
}

public String generation() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking the generation here as well as via the String callback to have it available for retries of shard status updates on e.g. master failover in SnapshotShardsService.

return this.generation;
}

public String reason() {
return reason;
}

public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeOptionalString(generation);
}
out.writeOptionalString(reason);
}

Expand All @@ -296,21 +316,22 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardSnapshotStatus status = (ShardSnapshotStatus) o;
return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;

return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason)
&& Objects.equals(generation, status.generation) && state == status.state;
}

@Override
public int hashCode() {
int result = state != null ? state.hashCode() : 0;
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
result = 31 * result + (reason != null ? reason.hashCode() : 0);
result = 31 * result + (generation != null ? generation.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + ", generation=" + generation + "]";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public enum Stage {
}

private final AtomicReference<Stage> stage;
private final AtomicReference<String> generation;
private long startTime;
private long totalTime;
private int incrementalFileCount;
Expand All @@ -71,8 +72,10 @@ public enum Stage {

private IndexShardSnapshotStatus(final Stage stage, final long startTime, final long totalTime,
final int incrementalFileCount, final int totalFileCount, final int processedFileCount,
final long incrementalSize, final long totalSize, final long processedSize, final String failure) {
final long incrementalSize, final long totalSize, final long processedSize, final String failure,
final String generation) {
this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
this.generation = new AtomicReference<>(generation);
this.startTime = startTime;
this.totalTime = totalTime;
this.incrementalFileCount = incrementalFileCount;
Expand Down Expand Up @@ -109,9 +112,11 @@ public synchronized Copy moveToFinalize(final long indexVersion) {
return asCopy();
}

public synchronized void moveToDone(final long endTime) {
public synchronized void moveToDone(final long endTime, final String newGeneration) {
assert newGeneration != null;
if (stage.compareAndSet(Stage.FINALIZE, Stage.DONE)) {
this.totalTime = Math.max(0L, endTime - startTime);
this.generation.set(newGeneration);
} else {
throw new IllegalStateException("Unable to move the shard snapshot status to [DONE]: " +
"expecting [FINALIZE] but got [" + stage.get() + "]");
Expand All @@ -131,6 +136,10 @@ public synchronized void moveToFailed(final long endTime, final String failure)
}
}

public String generation() {
return generation.get();
}

public boolean isAborted() {
return stage.get() == Stage.ABORTED;
}
Expand All @@ -156,24 +165,24 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() {
indexVersion, failure);
}

public static IndexShardSnapshotStatus newInitializing() {
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null);
public static IndexShardSnapshotStatus newInitializing(String generation) {
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation);
}

public static IndexShardSnapshotStatus newFailed(final String failure) {
assert failure != null : "expecting non null failure for a failed IndexShardSnapshotStatus";
if (failure == null) {
throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
}
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure);
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null);
}

public static IndexShardSnapshotStatus newDone(final long startTime, final long totalTime,
final int incrementalFileCount, final int fileCount,
final long incrementalSize, final long size) {
final long incrementalSize, final long size, String generation) {
// The snapshot is done which means the number of processed files is the same as total
return new IndexShardSnapshotStatus(Stage.DONE, startTime, totalTime, incrementalFileCount, fileCount, incrementalFileCount,
incrementalSize, size, incrementalSize, null);
incrementalSize, size, incrementalSize, null, generation);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public boolean isReadOnly() {

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
}
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
* @param listener listener invoked on completion
*/
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener);
IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener);

/**
* Restores snapshot of the shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ private void cleanupStaleBlobs(Map<String, BlobContainer> foundIndices, Map<Stri
* <li>Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}</li>
* </ul>
* @param repositoryStateId Current repository state id
* @param listener Lister to complete when done
* @param listener Listener to complete when done
*/
public void cleanup(long repositoryStateId, ActionListener<RepositoryCleanupResult> listener) {
try {
Expand Down Expand Up @@ -942,10 +942,10 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
final StepListener<Void> snapshotDoneListener = new StepListener<>();
final StepListener<String> snapshotDoneListener = new StepListener<>();
snapshotDoneListener.whenComplete(listener::onResponse, e -> {
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.stackTrace(e));
listener.onFailure(e instanceof IndexShardSnapshotFailedException ? (IndexShardSnapshotFailedException) e
Expand Down Expand Up @@ -1084,8 +1084,8 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization",
snapshotId, shardId), e);
}
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis());
snapshotDoneListener.onResponse(null);
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), indexGeneration);
snapshotDoneListener.onResponse(indexGeneration);
}, snapshotDoneListener::onFailure);
if (indexIncrementalFileCount == 0) {
allFilesUploadedListener.onResponse(Collections.emptyList());
Expand Down Expand Up @@ -1153,7 +1153,7 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId);
return IndexShardSnapshotStatus.newDone(snapshot.startTime(), snapshot.time(),
snapshot.incrementalFileCount(), snapshot.totalFileCount(),
snapshot.incrementalSize(), snapshot.totalSize());
snapshot.incrementalSize(), snapshot.totalSize(), null); // Not adding a real generation here as it doesn't matter to callers
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
if (startedShards == null) {
startedShards = new HashMap<>();
}
startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing());
startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing(shardSnapshotStatus.generation()));
}
}
if (startedShards != null && startedShards.isEmpty() == false) {
Expand Down Expand Up @@ -283,12 +283,15 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map<ShardId, IndexS
assert indexId != null;
snapshot(shardId, snapshot, indexId, snapshotStatus, new ActionListener<>() {
@Override
public void onResponse(final Void aVoid) {
public void onResponse(String newGeneration) {
assert newGeneration != null;
assert newGeneration.equals(snapshotStatus.generation());
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug("snapshot ({}) completed to {} with {}", snapshot, snapshot.getRepository(), lastSnapshotStatus);
logger.debug("snapshot [{}] completed to [{}] with [{}] at generation [{}]",
snapshot, snapshot.getRepository(), lastSnapshotStatus, snapshotStatus.generation());
}
notifySuccessfulSnapshotShard(snapshot, shardId);
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
}

@Override
Expand All @@ -308,7 +311,7 @@ public void onFailure(Exception e) {
* @param snapshotStatus snapshot status
*/
private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId,
final IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
final IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
if (indexShard.routingEntry().primary() == false) {
Expand Down Expand Up @@ -366,7 +369,7 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
// but we think the shard is done - we need to make new master know that the shard is done
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " +
"updating status on the master", snapshot.snapshot(), shardId);
notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId);
notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localShard.getValue().generation());

} else if (stage == Stage.FAILURE) {
// but we think the shard failed - we need to make new master know that the shard failed
Expand Down Expand Up @@ -436,15 +439,16 @@ public String toString() {
}

/** Notify the master node that the given shard has been successfully snapshotted **/
private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) {
private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, String generation) {
assert generation != null;
sendSnapshotShardUpdate(snapshot, shardId,
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS));
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS, generation));
}

/** Notify the master node that the given shard failed to be snapshotted **/
private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) {
sendSnapshotShardUpdate(snapshot, shardId,
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure));
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, null));
}

/** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
Expand Down
Loading