Skip to content

Commit 29c5391

Browse files
Fix Snapshot Corruption in Edge Case (#47552) (#47636)
This fixes missing to marking shard snapshots as failures when multiple data-nodes are lost during the snapshot process or shard snapshot failures have occured before a node left the cluster. The problem was that we were simply not adding any shard entries for completed shards on node-left events. This has no effect for a successful shard, but for a failed shard would lead to that shard not being marked as failed during snapshot finalization. Fixed by corectly keeping track of all previous completed shard states as well in this case. Also, added an assertion that without this fix would trip on almost every run of the resiliency tests and adjusted the serialization of SnapshotsInProgress.Entry so we have a proper assertion message. Closes #47550
1 parent e0c5fa3 commit 29c5391

File tree

4 files changed

+120
-21
lines changed

4 files changed

+120
-21
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -121,22 +121,24 @@ protected void masterOperation(final SnapshotsStatusRequest request,
121121
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()]))
122122
.snapshots(snapshots).timeout(request.masterNodeTimeout());
123123
transportNodesSnapshotsStatus.execute(nodesRequest, new ActionListener<TransportNodesSnapshotsStatus.NodesSnapshotStatus>() {
124-
@Override
125-
public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
126-
try {
127-
List<SnapshotsInProgress.Entry> currentSnapshots =
128-
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
129-
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
130-
} catch (Exception e) {
131-
listener.onFailure(e);
132-
}
133-
}
134-
135-
@Override
136-
public void onFailure(Exception e) {
124+
@Override
125+
public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
126+
threadPool.generic().execute(() -> {
127+
try {
128+
List<SnapshotsInProgress.Entry> currentSnapshots =
129+
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
130+
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
131+
} catch (Exception e) {
137132
listener.onFailure(e);
138133
}
139134
});
135+
}
136+
137+
@Override
138+
public void onFailure(Exception e) {
139+
listener.onFailure(e);
140+
}
141+
});
140142
} else {
141143
// We don't have any in-progress shards, just return current stats
142144
listener.onResponse(buildResponse(request, currentSnapshots, null));

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.Version;
2626
import org.elasticsearch.cluster.ClusterState.Custom;
2727
import org.elasticsearch.common.Nullable;
28+
import org.elasticsearch.common.Strings;
2829
import org.elasticsearch.common.collect.ImmutableOpenMap;
2930
import org.elasticsearch.common.io.stream.StreamInput;
3031
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -84,7 +85,7 @@ public String toString() {
8485
return builder.append("]").toString();
8586
}
8687

87-
public static class Entry {
88+
public static class Entry implements ToXContent {
8889
private final State state;
8990
private final Snapshot snapshot;
9091
private final boolean includeGlobalState;
@@ -211,7 +212,50 @@ public int hashCode() {
211212

212213
@Override
213214
public String toString() {
214-
return snapshot.toString();
215+
return Strings.toString(this);
216+
}
217+
218+
@Override
219+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
220+
builder.startObject();
221+
builder.field(REPOSITORY, snapshot.getRepository());
222+
builder.field(SNAPSHOT, snapshot.getSnapshotId().getName());
223+
builder.field(UUID, snapshot.getSnapshotId().getUUID());
224+
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState());
225+
builder.field(PARTIAL, partial);
226+
builder.field(STATE, state);
227+
builder.startArray(INDICES);
228+
{
229+
for (IndexId index : indices) {
230+
index.toXContent(builder, params);
231+
}
232+
}
233+
builder.endArray();
234+
builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(startTime));
235+
builder.field(REPOSITORY_STATE_ID, repositoryStateId);
236+
builder.startArray(SHARDS);
237+
{
238+
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : shards) {
239+
ShardId shardId = shardEntry.key;
240+
ShardSnapshotStatus status = shardEntry.value;
241+
builder.startObject();
242+
{
243+
builder.field(INDEX, shardId.getIndex());
244+
builder.field(SHARD, shardId.getId());
245+
builder.field(STATE, status.state());
246+
builder.field(NODE, status.nodeId());
247+
}
248+
builder.endObject();
249+
}
250+
}
251+
builder.endArray();
252+
builder.endObject();
253+
return builder;
254+
}
255+
256+
@Override
257+
public boolean isFragment() {
258+
return false;
215259
}
216260

217261
// package private for testing
@@ -527,7 +571,7 @@ public void writeTo(StreamOutput out) throws IOException {
527571
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
528572
builder.startArray(SNAPSHOTS);
529573
for (Entry entry : entries) {
530-
toXContent(entry, builder, params);
574+
entry.toXContent(builder, params);
531575
}
532576
builder.endArray();
533577
return builder;

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -765,18 +765,20 @@ public ClusterState execute(ClusterState currentState) {
765765
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
766766
boolean snapshotChanged = false;
767767
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards()) {
768-
ShardSnapshotStatus shardStatus = shardEntry.value;
768+
final ShardSnapshotStatus shardStatus = shardEntry.value;
769+
final ShardId shardId = shardEntry.key;
769770
if (!shardStatus.state().completed() && shardStatus.nodeId() != null) {
770771
if (nodes.nodeExists(shardStatus.nodeId())) {
771-
shards.put(shardEntry.key, shardEntry.value);
772+
shards.put(shardId, shardStatus);
772773
} else {
773774
// TODO: Restart snapshot on another node?
774775
snapshotChanged = true;
775776
logger.warn("failing snapshot of shard [{}] on closed node [{}]",
776-
shardEntry.key, shardStatus.nodeId());
777-
shards.put(shardEntry.key,
778-
new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown"));
777+
shardId, shardStatus.nodeId());
778+
shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown"));
779779
}
780+
} else {
781+
shards.put(shardId, shardStatus);
780782
}
781783
}
782784
if (snapshotChanged) {
@@ -808,6 +810,8 @@ public void onFailure(Exception e) {
808810
}
809811
}, updatedSnapshot.getRepositoryStateId(), false);
810812
}
813+
assert updatedSnapshot.shards().size() == snapshot.shards().size()
814+
: "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]";
811815
}
812816
if (changed) {
813817
return ClusterState.builder(currentState)

server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,6 +1229,55 @@ public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
12291229
}, 60L, TimeUnit.SECONDS);
12301230
}
12311231

1232+
public void testDataNodeRestartAfterShardSnapshotFailure() throws Exception {
1233+
logger.info("--> starting a master node and two data nodes");
1234+
internalCluster().startMasterOnlyNode();
1235+
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
1236+
logger.info("--> creating repository");
1237+
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
1238+
.setType("mock").setSettings(Settings.builder()
1239+
.put("location", randomRepoPath())
1240+
.put("compress", randomBoolean())
1241+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
1242+
assertAcked(prepareCreate("test-idx", 0, Settings.builder()
1243+
.put("number_of_shards", 2).put("number_of_replicas", 0)));
1244+
ensureGreen();
1245+
logger.info("--> indexing some data");
1246+
final int numdocs = randomIntBetween(50, 100);
1247+
IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
1248+
for (int i = 0; i < builders.length; i++) {
1249+
builders[i] = client().prepareIndex("test-idx", "type1",
1250+
Integer.toString(i)).setSource("field1", "bar " + i);
1251+
}
1252+
indexRandom(true, builders);
1253+
flushAndRefresh();
1254+
blockAllDataNodes("test-repo");
1255+
logger.info("--> snapshot");
1256+
client(internalCluster().getMasterName()).admin().cluster()
1257+
.prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
1258+
logger.info("--> restarting first data node, which should cause the primary shard on it to be failed");
1259+
internalCluster().restartNode(dataNodes.get(0), InternalTestCluster.EMPTY_CALLBACK);
1260+
1261+
logger.info("--> wait for shard snapshot of first primary to show as failed");
1262+
assertBusy(() -> assertThat(
1263+
client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots()
1264+
.get(0).getShardsStats().getFailedShards(), is(1)), 60L, TimeUnit.SECONDS);
1265+
1266+
logger.info("--> restarting second data node, which should cause the primary shard on it to be failed");
1267+
internalCluster().restartNode(dataNodes.get(1), InternalTestCluster.EMPTY_CALLBACK);
1268+
1269+
// check that snapshot completes with both failed shards being accounted for in the snapshot result
1270+
assertBusy(() -> {
1271+
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
1272+
.prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get();
1273+
assertEquals(1, snapshotsStatusResponse.getSnapshots().size());
1274+
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
1275+
assertTrue(snapshotInfo.state().toString(), snapshotInfo.state().completed());
1276+
assertThat(snapshotInfo.totalShards(), is(2));
1277+
assertThat(snapshotInfo.shardFailures(), hasSize(2));
1278+
}, 60L, TimeUnit.SECONDS);
1279+
}
1280+
12321281
private long calculateTotalFilesSize(List<Path> files) {
12331282
return files.stream().mapToLong(f -> {
12341283
try {

0 commit comments

Comments
 (0)