Skip to content

Commit 007ffd5

Browse files
committed
Peer recovery should not indefinitely wait for mapping
1 parent ccbed3d commit 007ffd5

File tree

11 files changed

+135
-33
lines changed

11 files changed

+135
-33
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@
2626
import org.apache.lucene.index.IndexCommit;
2727
import org.apache.lucene.store.AlreadyClosedException;
2828
import org.apache.lucene.store.RateLimiter;
29+
import org.elasticsearch.Assertions;
2930
import org.elasticsearch.ElasticsearchException;
3031
import org.elasticsearch.ElasticsearchTimeoutException;
3132
import org.elasticsearch.ExceptionsHelper;
3233
import org.elasticsearch.action.ActionListener;
3334
import org.elasticsearch.action.support.ChannelActionListener;
3435
import org.elasticsearch.cluster.ClusterState;
3536
import org.elasticsearch.cluster.ClusterStateObserver;
37+
import org.elasticsearch.cluster.metadata.IndexMetaData;
3638
import org.elasticsearch.cluster.node.DiscoveryNode;
3739
import org.elasticsearch.cluster.service.ClusterService;
3840
import org.elasticsearch.common.Nullable;
@@ -119,8 +121,8 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
119121
RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
120122
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
121123
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
122-
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
123-
new TranslogOperationsRequestHandler());
124+
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, ThreadPool.Names.GENERIC, RecoveryTranslogOperationsRequest::new,
125+
new TranslogOperationsRequestHandler());
124126
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
125127
FinalizeRecoveryRequestHandler());
126128
transportService.registerRequestHandler(
@@ -501,16 +503,24 @@ public void onTimeout(TimeValue timeout) {
501503
}
502504
});
503505
};
506+
final IndexMetaData indexMetaData = clusterService.state().metaData().index(request.shardId().getIndex());
507+
final long mappingVersionOnTarget = indexMetaData != null ? indexMetaData.getMappingVersion() : 0L;
504508
recoveryTarget.indexTranslogOperations(
505509
request.operations(),
506510
request.totalTranslogOps(),
507511
request.maxSeenAutoIdTimestampOnPrimary(),
508512
request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
509513
request.retentionLeases(),
514+
request.mappingVersion(),
510515
ActionListener.wrap(
511516
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
512517
e -> {
513-
if (e instanceof MapperException) {
518+
if (Assertions.ENABLED) {
519+
if (e instanceof MapperException == false) {
520+
throw new AssertionError("unexpected failure while replicating translog entry", e);
521+
}
522+
}
523+
if (mappingVersionOnTarget < request.mappingVersion() && e instanceof MapperException) {
514524
retryOnMappingException.accept(e);
515525
} else {
516526
listener.onFailure(e);

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
219219
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
220220
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
221221
final RetentionLeases retentionLeases = shard.getRetentionLeases();
222+
final long mappingVersion = shard.indexSettings().getIndexMetaData().getMappingVersion();
222223
phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
223-
retentionLeases, sendSnapshotStep);
224+
retentionLeases, mappingVersion, sendSnapshotStep);
224225
sendSnapshotStep.whenComplete(
225226
r -> IOUtils.close(phase2Snapshot),
226227
e -> {
@@ -511,6 +512,7 @@ void phase2(
511512
final long maxSeenAutoIdTimestamp,
512513
final long maxSeqNoOfUpdatesOrDeletes,
513514
final RetentionLeases retentionLeases,
515+
final long mappingVersion,
514516
final ActionListener<SendSnapshotResult> listener) throws IOException {
515517
if (shard.state() == IndexShardState.CLOSED) {
516518
throw new IndexShardClosedException(request.shardId());
@@ -572,6 +574,7 @@ void phase2(
572574
maxSeenAutoIdTimestamp,
573575
maxSeqNoOfUpdatesOrDeletes,
574576
retentionLeases,
577+
mappingVersion,
575578
batchedListener);
576579
}
577580

@@ -583,6 +586,7 @@ private void sendBatch(
583586
final long maxSeenAutoIdTimestamp,
584587
final long maxSeqNoOfUpdatesOrDeletes,
585588
final RetentionLeases retentionLeases,
589+
final long mappingVersion,
586590
final ActionListener<Long> listener) throws IOException {
587591
assert ThreadPool.assertCurrentMethodIsNotCalledRecursively();
588592
final List<Translog.Operation> operations = nextBatch.get();
@@ -595,6 +599,7 @@ private void sendBatch(
595599
maxSeenAutoIdTimestamp,
596600
maxSeqNoOfUpdatesOrDeletes,
597601
retentionLeases,
602+
mappingVersion,
598603
ActionListener.wrap(
599604
newCheckpoint -> {
600605
sendBatch(
@@ -605,6 +610,7 @@ private void sendBatch(
605610
maxSeenAutoIdTimestamp,
606611
maxSeqNoOfUpdatesOrDeletes,
607612
retentionLeases,
613+
mappingVersion,
608614
listener);
609615
},
610616
listener::onFailure

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.lucene.index.CorruptIndexException;
2424
import org.apache.lucene.index.IndexFormatTooNewException;
2525
import org.apache.lucene.index.IndexFormatTooOldException;
26-
import org.elasticsearch.Assertions;
2726
import org.elasticsearch.ElasticsearchException;
2827
import org.elasticsearch.ExceptionsHelper;
2928
import org.elasticsearch.Version;
@@ -320,6 +319,7 @@ public void indexTranslogOperations(
320319
final long maxSeenAutoIdTimestampOnPrimary,
321320
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
322321
final RetentionLeases retentionLeases,
322+
final long mappingVersion,
323323
final ActionListener<Long> listener) {
324324
ActionListener.completeWith(listener, () -> {
325325
final RecoveryState.Translog translog = state().getTranslog();
@@ -351,9 +351,6 @@ public void indexTranslogOperations(
351351
throw new MapperException("mapping updates are not allowed [" + operation + "]");
352352
}
353353
if (result.getFailure() != null) {
354-
if (Assertions.ENABLED) {
355-
throw new AssertionError("unexpected failure while replicating translog entry", result.getFailure());
356-
}
357354
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
358355
}
359356
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ void indexTranslogOperations(
7474
long maxSeenAutoIdTimestampOnPrimary,
7575
long maxSeqNoOfUpdatesOrDeletesOnPrimary,
7676
RetentionLeases retentionLeases,
77+
long mappingVersion,
7778
ActionListener<Long> listener);
7879

7980
/**

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.indices.recovery;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
2425
import org.elasticsearch.index.seqno.RetentionLeases;
@@ -31,16 +32,14 @@
3132

3233
public class RecoveryTranslogOperationsRequest extends TransportRequest {
3334

34-
private long recoveryId;
35-
private ShardId shardId;
36-
private List<Translog.Operation> operations;
37-
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
38-
private long maxSeenAutoIdTimestampOnPrimary;
39-
private long maxSeqNoOfUpdatesOrDeletesOnPrimary;
40-
private RetentionLeases retentionLeases;
41-
42-
public RecoveryTranslogOperationsRequest() {
43-
}
35+
private final long recoveryId;
36+
private final ShardId shardId;
37+
private final List<Translog.Operation> operations;
38+
private final int totalTranslogOps;
39+
private final long maxSeenAutoIdTimestampOnPrimary;
40+
private final long maxSeqNoOfUpdatesOrDeletesOnPrimary;
41+
private final RetentionLeases retentionLeases;
42+
private final long mappingVersion;
4443

4544
RecoveryTranslogOperationsRequest(
4645
final long recoveryId,
@@ -49,14 +48,16 @@ public RecoveryTranslogOperationsRequest() {
4948
final int totalTranslogOps,
5049
final long maxSeenAutoIdTimestampOnPrimary,
5150
final long maxSeqNoOfUpdatesOrDeletesOnPrimary,
52-
final RetentionLeases retentionLeases) {
51+
final RetentionLeases retentionLeases,
52+
final long mappingVersion) {
5353
this.recoveryId = recoveryId;
5454
this.shardId = shardId;
5555
this.operations = operations;
5656
this.totalTranslogOps = totalTranslogOps;
5757
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
5858
this.maxSeqNoOfUpdatesOrDeletesOnPrimary = maxSeqNoOfUpdatesOrDeletesOnPrimary;
5959
this.retentionLeases = retentionLeases;
60+
this.mappingVersion = mappingVersion;
6061
}
6162

6263
public long recoveryId() {
@@ -87,8 +88,11 @@ public RetentionLeases retentionLeases() {
8788
return retentionLeases;
8889
}
8990

90-
@Override
91-
public void readFrom(StreamInput in) throws IOException {
91+
public long mappingVersion() {
92+
return mappingVersion;
93+
}
94+
95+
RecoveryTranslogOperationsRequest(StreamInput in) throws IOException {
9296
super.readFrom(in);
9397
recoveryId = in.readLong();
9498
shardId = ShardId.readShardId(in);
@@ -97,6 +101,11 @@ public void readFrom(StreamInput in) throws IOException {
97101
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
98102
maxSeqNoOfUpdatesOrDeletesOnPrimary = in.readZLong();
99103
retentionLeases = new RetentionLeases(in);
104+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
105+
mappingVersion = in.readVLong();
106+
} else {
107+
mappingVersion = Long.MAX_VALUE;
108+
}
100109
}
101110

102111
@Override
@@ -109,5 +118,8 @@ public void writeTo(StreamOutput out) throws IOException {
109118
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
110119
out.writeZLong(maxSeqNoOfUpdatesOrDeletesOnPrimary);
111120
retentionLeases.writeTo(out);
121+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
122+
out.writeVLong(mappingVersion);
123+
}
112124
}
113125
}

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public void indexTranslogOperations(
112112
final long maxSeenAutoIdTimestampOnPrimary,
113113
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
114114
final RetentionLeases retentionLeases,
115+
final long mappingVersion,
115116
final ActionListener<Long> listener) {
116117
final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest(
117118
recoveryId,
@@ -120,7 +121,8 @@ public void indexTranslogOperations(
120121
totalTranslogOps,
121122
maxSeenAutoIdTimestampOnPrimary,
122123
maxSeqNoOfDeletesOrUpdatesOnPrimary,
123-
retentionLeases);
124+
retentionLeases,
125+
mappingVersion);
124126
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions,
125127
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> r.localCheckpoint),
126128
RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC));

server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ public void indexTranslogOperations(
564564
final long maxAutoIdTimestamp,
565565
final long maxSeqNoOfUpdates,
566566
final RetentionLeases retentionLeases,
567+
final long mappingVersion,
567568
final ActionListener<Long> listener) {
568569
// index a doc which is not part of the snapshot, but also does not complete on replica
569570
replicaEngineFactory.latchIndexers(1);
@@ -597,6 +598,7 @@ public void indexTranslogOperations(
597598
maxAutoIdTimestamp,
598599
maxSeqNoOfUpdates,
599600
retentionLeases,
601+
mappingVersion,
600602
listener);
601603
}
602604
});
@@ -845,11 +847,13 @@ public void indexTranslogOperations(
845847
final long maxAutoIdTimestamp,
846848
final long maxSeqNoOfUpdates,
847849
final RetentionLeases retentionLeases,
850+
final long mappingVersion,
848851
final ActionListener<Long> listener) {
849852
if (hasBlocked() == false) {
850853
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
851854
}
852-
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, retentionLeases, listener);
855+
super.indexTranslogOperations(
856+
operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, retentionLeases, mappingVersion, listener);
853857
}
854858

855859
@Override

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2469,13 +2469,15 @@ public void indexTranslogOperations(
24692469
final long maxSeenAutoIdTimestamp,
24702470
final long maxSeqNoOfUpdatesOrDeletes,
24712471
final RetentionLeases retentionLeases,
2472+
final long mappingVersion,
24722473
final ActionListener<Long> listener){
24732474
super.indexTranslogOperations(
24742475
operations,
24752476
totalTranslogOps,
24762477
maxSeenAutoIdTimestamp,
24772478
maxSeqNoOfUpdatesOrDeletes,
24782479
retentionLeases,
2480+
mappingVersion,
24792481
ActionListener.wrap(
24802482
r -> {
24812483
assertFalse(replica.isSyncNeeded());
@@ -2591,13 +2593,15 @@ public void indexTranslogOperations(
25912593
final long maxAutoIdTimestamp,
25922594
final long maxSeqNoOfUpdatesOrDeletes,
25932595
final RetentionLeases retentionLeases,
2596+
final long mappingVersion,
25942597
final ActionListener<Long> listener){
25952598
super.indexTranslogOperations(
25962599
operations,
25972600
totalTranslogOps,
25982601
maxAutoIdTimestamp,
25992602
maxSeqNoOfUpdatesOrDeletes,
26002603
retentionLeases,
2604+
mappingVersion,
26012605
ActionListener.wrap(
26022606
checkpoint -> {
26032607
listener.onResponse(checkpoint);
@@ -2656,13 +2660,15 @@ public void indexTranslogOperations(
26562660
final long maxAutoIdTimestamp,
26572661
final long maxSeqNoOfUpdatesOrDeletes,
26582662
final RetentionLeases retentionLeases,
2663+
final long mappingVersion,
26592664
final ActionListener<Long> listener) {
26602665
super.indexTranslogOperations(
26612666
operations,
26622667
totalTranslogOps,
26632668
maxAutoIdTimestamp,
26642669
maxSeqNoOfUpdatesOrDeletes,
26652670
retentionLeases,
2671+
mappingVersion,
26662672
ActionListener.wrap(
26672673
r -> {
26682674
assertListenerCalled.accept(replica);

0 commit comments

Comments
 (0)