Skip to content

Commit 615a021

Browse files
committed
Recovery should not indefinitely retry on mapping error (#41099)
A stuck peer recovery in #40913 reveals that we indefinitely retry on new cluster states if indexing translog operations hits a mapper exception. We should not wait and retry if the mapping on the target is as recent as the mapping that the primary used to index the replaying operations. Relates #40913
1 parent 7528329 commit 615a021

File tree

12 files changed

+148
-31
lines changed

12 files changed

+148
-31
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3148,7 +3148,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
31483148
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
31493149
*
31503150
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
3151-
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, ActionListener)
3151+
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, long, ActionListener)
31523152
*/
31533153
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
31543154
assert seqNo != UNASSIGNED_SEQ_NO

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.action.support.ChannelActionListener;
3434
import org.elasticsearch.cluster.ClusterState;
3535
import org.elasticsearch.cluster.ClusterStateObserver;
36+
import org.elasticsearch.cluster.metadata.IndexMetaData;
3637
import org.elasticsearch.cluster.node.DiscoveryNode;
3738
import org.elasticsearch.cluster.service.ClusterService;
3839
import org.elasticsearch.common.Nullable;
@@ -119,8 +120,8 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
119120
RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
120121
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
121122
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
122-
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
123-
new TranslogOperationsRequestHandler());
123+
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, ThreadPool.Names.GENERIC, RecoveryTranslogOperationsRequest::new,
124+
new TranslogOperationsRequestHandler());
124125
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
125126
FinalizeRecoveryRequestHandler());
126127
transportService.registerRequestHandler(
@@ -501,16 +502,21 @@ public void onTimeout(TimeValue timeout) {
501502
}
502503
});
503504
};
505+
final IndexMetaData indexMetaData = clusterService.state().metaData().index(request.shardId().getIndex());
506+
final long mappingVersionOnTarget = indexMetaData != null ? indexMetaData.getMappingVersion() : 0L;
504507
recoveryTarget.indexTranslogOperations(
505508
request.operations(),
506509
request.totalTranslogOps(),
507510
request.maxSeenAutoIdTimestampOnPrimary(),
508511
request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
509512
request.retentionLeases(),
513+
request.mappingVersionOnPrimary(),
510514
ActionListener.wrap(
511515
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
512516
e -> {
513-
if (e instanceof MapperException) {
517+
// do not retry if the mapping on replica is at least as recent as the mapping
518+
// that the primary used to index the operations in the request.
519+
if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
514520
retryOnMappingException.accept(e);
515521
} else {
516522
listener.onFailure(e);

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
219219
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
220220
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
221221
final RetentionLeases retentionLeases = shard.getRetentionLeases();
222+
final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetaData().getMappingVersion();
222223
phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
223-
retentionLeases, sendSnapshotStep);
224+
retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);
224225
sendSnapshotStep.whenComplete(
225226
r -> IOUtils.close(phase2Snapshot),
226227
e -> {
@@ -511,6 +512,7 @@ void phase2(
511512
final long maxSeenAutoIdTimestamp,
512513
final long maxSeqNoOfUpdatesOrDeletes,
513514
final RetentionLeases retentionLeases,
515+
final long mappingVersion,
514516
final ActionListener<SendSnapshotResult> listener) throws IOException {
515517
if (shard.state() == IndexShardState.CLOSED) {
516518
throw new IndexShardClosedException(request.shardId());
@@ -572,6 +574,7 @@ void phase2(
572574
maxSeenAutoIdTimestamp,
573575
maxSeqNoOfUpdatesOrDeletes,
574576
retentionLeases,
577+
mappingVersion,
575578
batchedListener);
576579
}
577580

@@ -583,6 +586,7 @@ private void sendBatch(
583586
final long maxSeenAutoIdTimestamp,
584587
final long maxSeqNoOfUpdatesOrDeletes,
585588
final RetentionLeases retentionLeases,
589+
final long mappingVersionOnPrimary,
586590
final ActionListener<Long> listener) throws IOException {
587591
assert ThreadPool.assertCurrentMethodIsNotCalledRecursively();
588592
final List<Translog.Operation> operations = nextBatch.get();
@@ -595,6 +599,7 @@ private void sendBatch(
595599
maxSeenAutoIdTimestamp,
596600
maxSeqNoOfUpdatesOrDeletes,
597601
retentionLeases,
602+
mappingVersionOnPrimary,
598603
ActionListener.wrap(
599604
newCheckpoint -> {
600605
sendBatch(
@@ -605,6 +610,7 @@ private void sendBatch(
605610
maxSeenAutoIdTimestamp,
606611
maxSeqNoOfUpdatesOrDeletes,
607612
retentionLeases,
613+
mappingVersionOnPrimary,
608614
listener);
609615
},
610616
listener::onFailure

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ public void indexTranslogOperations(
320320
final long maxSeenAutoIdTimestampOnPrimary,
321321
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
322322
final RetentionLeases retentionLeases,
323+
final long mappingVersionOnPrimary,
323324
final ActionListener<Long> listener) {
324325
ActionListener.completeWith(listener, () -> {
325326
final RecoveryState.Translog translog = state().getTranslog();
@@ -351,7 +352,7 @@ public void indexTranslogOperations(
351352
throw new MapperException("mapping updates are not allowed [" + operation + "]");
352353
}
353354
if (result.getFailure() != null) {
354-
if (Assertions.ENABLED) {
355+
if (Assertions.ENABLED && result.getFailure() instanceof MapperException == false) {
355356
throw new AssertionError("unexpected failure while replicating translog entry", result.getFailure());
356357
}
357358
ExceptionsHelper.reThrowIfNotNull(result.getFailure());

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ public interface RecoveryTargetHandler {
6565
* the primary shard when capturing these operations. This value is at least as high as the
6666
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
6767
* @param retentionLeases the retention leases on the primary
68+
* @param mappingVersionOnPrimary the mapping version which is at least as up to date as the mapping version that the
69+
* primary used to index translog {@code operations} in this request.
70+
* If the mapping version on the replica is not older this version, we should not retry on
71+
* {@link org.elasticsearch.index.mapper.MapperException}; otherwise we should wait for a
72+
* new mapping then retry.
6873
* @param listener a listener which will be notified with the local checkpoint on the target
6974
* after these operations are successfully indexed on the target.
7075
*/
@@ -74,6 +79,7 @@ void indexTranslogOperations(
7479
long maxSeenAutoIdTimestampOnPrimary,
7580
long maxSeqNoOfUpdatesOrDeletesOnPrimary,
7681
RetentionLeases retentionLeases,
82+
long mappingVersionOnPrimary,
7783
ActionListener<Long> listener);
7884

7985
/**

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,14 @@
3434

3535
public class RecoveryTranslogOperationsRequest extends TransportRequest {
3636

37-
private long recoveryId;
38-
private ShardId shardId;
39-
private List<Translog.Operation> operations;
40-
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
41-
private long maxSeenAutoIdTimestampOnPrimary;
42-
private long maxSeqNoOfUpdatesOrDeletesOnPrimary;
43-
private RetentionLeases retentionLeases;
44-
45-
public RecoveryTranslogOperationsRequest() {
46-
}
37+
private final long recoveryId;
38+
private final ShardId shardId;
39+
private final List<Translog.Operation> operations;
40+
private final int totalTranslogOps;
41+
private final long maxSeenAutoIdTimestampOnPrimary;
42+
private final long maxSeqNoOfUpdatesOrDeletesOnPrimary;
43+
private final RetentionLeases retentionLeases;
44+
private final long mappingVersionOnPrimary;
4745

4846
RecoveryTranslogOperationsRequest(
4947
final long recoveryId,
@@ -52,14 +50,16 @@ public RecoveryTranslogOperationsRequest() {
5250
final int totalTranslogOps,
5351
final long maxSeenAutoIdTimestampOnPrimary,
5452
final long maxSeqNoOfUpdatesOrDeletesOnPrimary,
55-
final RetentionLeases retentionLeases) {
53+
final RetentionLeases retentionLeases,
54+
final long mappingVersionOnPrimary) {
5655
this.recoveryId = recoveryId;
5756
this.shardId = shardId;
5857
this.operations = operations;
5958
this.totalTranslogOps = totalTranslogOps;
6059
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
6160
this.maxSeqNoOfUpdatesOrDeletesOnPrimary = maxSeqNoOfUpdatesOrDeletesOnPrimary;
6261
this.retentionLeases = retentionLeases;
62+
this.mappingVersionOnPrimary = mappingVersionOnPrimary;
6363
}
6464

6565
public long recoveryId() {
@@ -90,8 +90,16 @@ public RetentionLeases retentionLeases() {
9090
return retentionLeases;
9191
}
9292

93-
@Override
94-
public void readFrom(StreamInput in) throws IOException {
93+
/**
94+
* Returns the mapping version which is at least as up to date as the mapping version that the primary used to index
95+
* the translog operations in this request. If the mapping version on the replica is not older this version, we should not
96+
* retry on {@link org.elasticsearch.index.mapper.MapperException}; otherwise we should wait for a new mapping then retry.
97+
*/
98+
long mappingVersionOnPrimary() {
99+
return mappingVersionOnPrimary;
100+
}
101+
102+
RecoveryTranslogOperationsRequest(StreamInput in) throws IOException {
95103
super.readFrom(in);
96104
recoveryId = in.readLong();
97105
shardId = ShardId.readShardId(in);
@@ -113,6 +121,11 @@ public void readFrom(StreamInput in) throws IOException {
113121
} else {
114122
retentionLeases = RetentionLeases.EMPTY;
115123
}
124+
if (in.getVersion().onOrAfter(Version.V_7_1_0)) {
125+
mappingVersionOnPrimary = in.readVLong();
126+
} else {
127+
mappingVersionOnPrimary = Long.MAX_VALUE;
128+
}
116129
}
117130

118131
@Override
@@ -131,5 +144,13 @@ public void writeTo(StreamOutput out) throws IOException {
131144
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
132145
retentionLeases.writeTo(out);
133146
}
147+
if (out.getVersion().onOrAfter(Version.V_7_1_0)) {
148+
out.writeVLong(mappingVersionOnPrimary);
149+
}
150+
}
151+
152+
@Override
153+
public void readFrom(StreamInput in) throws IOException {
154+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
134155
}
135156
}

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public void indexTranslogOperations(
112112
final long maxSeenAutoIdTimestampOnPrimary,
113113
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
114114
final RetentionLeases retentionLeases,
115+
final long mappingVersionOnPrimary,
115116
final ActionListener<Long> listener) {
116117
final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest(
117118
recoveryId,
@@ -120,7 +121,8 @@ public void indexTranslogOperations(
120121
totalTranslogOps,
121122
maxSeenAutoIdTimestampOnPrimary,
122123
maxSeqNoOfDeletesOrUpdatesOnPrimary,
123-
retentionLeases);
124+
retentionLeases,
125+
mappingVersionOnPrimary);
124126
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions,
125127
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> r.localCheckpoint),
126128
RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC));

server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ public void indexTranslogOperations(
564564
final long maxAutoIdTimestamp,
565565
final long maxSeqNoOfUpdates,
566566
final RetentionLeases retentionLeases,
567+
final long mappingVersion,
567568
final ActionListener<Long> listener) {
568569
// index a doc which is not part of the snapshot, but also does not complete on replica
569570
replicaEngineFactory.latchIndexers(1);
@@ -597,6 +598,7 @@ public void indexTranslogOperations(
597598
maxAutoIdTimestamp,
598599
maxSeqNoOfUpdates,
599600
retentionLeases,
601+
mappingVersion,
600602
listener);
601603
}
602604
});
@@ -845,11 +847,13 @@ public void indexTranslogOperations(
845847
final long maxAutoIdTimestamp,
846848
final long maxSeqNoOfUpdates,
847849
final RetentionLeases retentionLeases,
850+
final long mappingVersion,
848851
final ActionListener<Long> listener) {
849852
if (hasBlocked() == false) {
850853
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
851854
}
852-
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, retentionLeases, listener);
855+
super.indexTranslogOperations(
856+
operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, retentionLeases, mappingVersion, listener);
853857
}
854858

855859
@Override

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2472,13 +2472,15 @@ public void indexTranslogOperations(
24722472
final long maxSeenAutoIdTimestamp,
24732473
final long maxSeqNoOfUpdatesOrDeletes,
24742474
final RetentionLeases retentionLeases,
2475+
final long mappingVersion,
24752476
final ActionListener<Long> listener){
24762477
super.indexTranslogOperations(
24772478
operations,
24782479
totalTranslogOps,
24792480
maxSeenAutoIdTimestamp,
24802481
maxSeqNoOfUpdatesOrDeletes,
24812482
retentionLeases,
2483+
mappingVersion,
24822484
ActionListener.wrap(
24832485
r -> {
24842486
assertFalse(replica.isSyncNeeded());
@@ -2594,13 +2596,15 @@ public void indexTranslogOperations(
25942596
final long maxAutoIdTimestamp,
25952597
final long maxSeqNoOfUpdatesOrDeletes,
25962598
final RetentionLeases retentionLeases,
2599+
final long mappingVersion,
25972600
final ActionListener<Long> listener){
25982601
super.indexTranslogOperations(
25992602
operations,
26002603
totalTranslogOps,
26012604
maxAutoIdTimestamp,
26022605
maxSeqNoOfUpdatesOrDeletes,
26032606
retentionLeases,
2607+
mappingVersion,
26042608
ActionListener.wrap(
26052609
checkpoint -> {
26062610
listener.onResponse(checkpoint);
@@ -2659,13 +2663,15 @@ public void indexTranslogOperations(
26592663
final long maxAutoIdTimestamp,
26602664
final long maxSeqNoOfUpdatesOrDeletes,
26612665
final RetentionLeases retentionLeases,
2666+
final long mappingVersion,
26622667
final ActionListener<Long> listener) {
26632668
super.indexTranslogOperations(
26642669
operations,
26652670
totalTranslogOps,
26662671
maxAutoIdTimestamp,
26672672
maxSeqNoOfUpdatesOrDeletes,
26682673
retentionLeases,
2674+
mappingVersion,
26692675
ActionListener.wrap(
26702676
r -> {
26712677
assertListenerCalled.accept(replica);

0 commit comments

Comments
 (0)