Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -200,7 +201,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
l -> blobStoreRepository.cleanup(
repositoryStateId, ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
repositoryStateId,
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
}

private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotsService;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -90,12 +91,14 @@ public static class Entry implements ToXContent {
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;
private final long repositoryStateId;
// see #useShardGenerations
private final boolean useShardGenerations;
@Nullable private final Map<String, Object> userMetadata;
@Nullable private final String failure;

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
String failure, Map<String, Object> userMetadata) {
String failure, Map<String, Object> userMetadata, boolean useShardGenerations) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
Expand All @@ -113,6 +116,7 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
this.repositoryStateId = repositoryStateId;
this.failure = failure;
this.userMetadata = userMetadata;
this.useShardGenerations = useShardGenerations;
}

private static boolean assertShardsConsistent(State state, List<IndexId> indices,
Expand All @@ -127,20 +131,22 @@ private static boolean assertShardsConsistent(State state, List<IndexId> indices
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
return true;
}

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata) {
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata);
Map<String, Object> userMetadata, boolean useShardGenerations) {
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata,
useShardGenerations);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, entry.failure, entry.userMetadata);
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, failure, entry.userMetadata);
entry.repositoryStateId, shards, failure, entry.userMetadata, entry.useShardGenerations);
}

public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
Expand Down Expand Up @@ -191,6 +197,16 @@ public String failure() {
return failure;
}

/**
* Whether to write to the repository in a format only understood by versions newer than
* {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
*
* @return true if writing to repository in new format
*/
public boolean useShardGenerations() {
return useShardGenerations;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -206,6 +222,7 @@ public boolean equals(Object o) {
if (!snapshot.equals(entry.snapshot)) return false;
if (state != entry.state) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
if (useShardGenerations != entry.useShardGenerations) return false;

return true;
}
Expand All @@ -220,6 +237,7 @@ public int hashCode() {
result = 31 * result + indices.hashCode();
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
result = 31 * result + (useShardGenerations ? 1 : 0);
return result;
}

Expand Down Expand Up @@ -307,27 +325,39 @@ public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {
public static class ShardSnapshotStatus {
private final ShardState state;
private final String nodeId;

@Nullable
private final String generation;

@Nullable
private final String reason;

public ShardSnapshotStatus(String nodeId) {
this(nodeId, ShardState.INIT);
public ShardSnapshotStatus(String nodeId, String generation) {
this(nodeId, ShardState.INIT, generation);
}

public ShardSnapshotStatus(String nodeId, ShardState state) {
this(nodeId, state, null);
public ShardSnapshotStatus(String nodeId, ShardState state, String generation) {
this(nodeId, state, null, generation);
}

public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
public ShardSnapshotStatus(String nodeId, ShardState state, String reason, String generation) {
this.nodeId = nodeId;
this.state = state;
this.reason = reason;
this.generation = generation;
// If the state is failed we have to have a reason for this failure
assert state.failed() == false || reason != null;
}

public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = ShardState.fromValue(in.readByte());
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
generation = in.readOptionalString();
assert generation != null || state != ShardState.SUCCESS : "Received null generation for shard state [" + state + "]";
} else {
generation = null;
}
reason = in.readOptionalString();
}

Expand All @@ -339,13 +369,20 @@ public String nodeId() {
return nodeId;
}

public String generation() {
return this.generation;
}

public String reason() {
return reason;
}

public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeOptionalString(generation);
}
out.writeOptionalString(reason);
}

Expand All @@ -354,21 +391,22 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardSnapshotStatus status = (ShardSnapshotStatus) o;
return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;

return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason)
&& Objects.equals(generation, status.generation) && state == status.state;
}

@Override
public int hashCode() {
int result = state != null ? state.hashCode() : 0;
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
result = 31 * result + (reason != null ? reason.hashCode() : 0);
result = 31 * result + (generation != null ? generation.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + ", generation=" + generation + "]";
}
}

Expand Down Expand Up @@ -497,6 +535,12 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
userMetadata = in.readMap();
}
final boolean useShardGenerations;
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
useShardGenerations = in.readBoolean();
} else {
useShardGenerations = false;
}
entries[i] = new Entry(snapshot,
includeGlobalState,
partial,
Expand All @@ -506,7 +550,8 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
repositoryStateId,
builder.build(),
failure,
userMetadata
userMetadata,
useShardGenerations
);
}
this.entries = Arrays.asList(entries);
Expand Down Expand Up @@ -542,6 +587,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
out.writeMap(entry.userMetadata);
}
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeBoolean(entry.useShardGenerations);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public enum Stage {
}

private final AtomicReference<Stage> stage;
private final AtomicReference<String> generation;
private long startTime;
private long totalTime;
private int incrementalFileCount;
Expand All @@ -71,9 +72,10 @@ public enum Stage {

private IndexShardSnapshotStatus(final Stage stage, final long startTime, final long totalTime,
final int incrementalFileCount, final int totalFileCount, final int processedFileCount,
final long incrementalSize, final long totalSize, final long processedSize,
final long indexVersion, final String failure) {
final long incrementalSize, final long totalSize, final long processedSize, final String failure,
final String generation) {
this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
this.generation = new AtomicReference<>(generation);
this.startTime = startTime;
this.totalTime = totalTime;
this.incrementalFileCount = incrementalFileCount;
Expand All @@ -82,7 +84,6 @@ private IndexShardSnapshotStatus(final Stage stage, final long startTime, final
this.totalSize = totalSize;
this.processedSize = processedSize;
this.incrementalSize = incrementalSize;
this.indexVersion = indexVersion;
this.failure = failure;
}

Expand Down Expand Up @@ -111,9 +112,11 @@ public synchronized Copy moveToFinalize(final long indexVersion) {
return asCopy();
}

public synchronized void moveToDone(final long endTime) {
public synchronized void moveToDone(final long endTime, final String newGeneration) {
assert newGeneration != null;
if (stage.compareAndSet(Stage.FINALIZE, Stage.DONE)) {
this.totalTime = Math.max(0L, endTime - startTime);
this.generation.set(newGeneration);
} else {
throw new IllegalStateException("Unable to move the shard snapshot status to [DONE]: " +
"expecting [FINALIZE] but got [" + stage.get() + "]");
Expand All @@ -133,6 +136,10 @@ public synchronized void moveToFailed(final long endTime, final String failure)
}
}

public String generation() {
return generation.get();
}

public boolean isAborted() {
return stage.get() == Stage.ABORTED;
}
Expand All @@ -158,24 +165,24 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() {
indexVersion, failure);
}

public static IndexShardSnapshotStatus newInitializing() {
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, 0, null);
public static IndexShardSnapshotStatus newInitializing(String generation) {
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation);
}

public static IndexShardSnapshotStatus newFailed(final String failure) {
assert failure != null : "expecting non null failure for a failed IndexShardSnapshotStatus";
if (failure == null) {
throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
}
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, 0, failure);
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null);
}

public static IndexShardSnapshotStatus newDone(final long startTime, final long totalTime,
final int incrementalFileCount, final int fileCount,
final long incrementalSize, final long size) {
final long incrementalSize, final long size, String generation) {
// The snapshot is done which means the number of processed files is the same as total
return new IndexShardSnapshotStatus(Stage.DONE, startTime, totalTime, incrementalFileCount, fileCount, incrementalFileCount,
incrementalSize, size, incrementalSize, 0, null);
incrementalSize, size, incrementalSize, null, generation);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,17 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
}

@Override
public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData metaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener) {
in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metaData, userMetadata, listener);
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metaData, userMetadata, writeShardGens, listener);
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, listener);
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
}

@Override
Expand Down Expand Up @@ -122,8 +123,9 @@ public boolean isReadOnly() {

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
ActionListener<String> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
Expand Down
Loading