Skip to content

Commit 4a537ef

Browse files
authored
Bulk operation fail to replicate operations when a mapping update times out (#30244)
Starting with the refactoring in #22778 (released in 5.3) we may fail to properly replicate operation when a mapping update on master fails. If a bulk operations needs a mapping update half way, it will send a request to the master before continuing to index the operations. If that request times out or isn't acked (i.e., even one node in the cluster didn't process it within 30s), we end up throwing the exception and aborting the entire bulk. This is a problem because all operations that were processed so far are not replicated any more to the replicas. Although these operations were never "acked" to the user (we threw an error) it cause the local checkpoint on the replicas to lag (on 6.x) and the primary and replica to diverge. This PR does a couple of things: 1) Most importantly, treat *any* mapping update failure as a document level failure, meaning only the relevant indexing operation will fail. 2) Removes the mapping update callbacks from `IndexShard.applyIndexOperationOnPrimary` and similar methods for simpler execution. We don't use exceptions any more when a mapping update was successful. I think we need to do more work here (the fact that a single slow node can prevent those mappings updates from being acked and thus fail operations is bad), but I want to keep this as small as I can (it is already too big).
1 parent 65e5868 commit 4a537ef

28 files changed

+662
-469
lines changed

server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,4 @@ public interface MappingUpdatePerformer {
2929
*/
3030
void updateMappings(Mapping update, ShardId shardId, String type);
3131

32-
/**
33-
* Throws a {@code ReplicationOperation.RetryOnPrimaryException} if the operation needs to be
34-
* retried on the primary due to the mappings not being present yet, or a different exception if
35-
* updating the mappings on the master failed.
36-
*/
37-
void verifyMappings(Mapping update, ShardId shardId);
38-
3932
}

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 88 additions & 61 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,11 @@ public static Translog.Location performOnReplica(ResyncReplicationRequest reques
122122
Translog.Location location = null;
123123
for (Translog.Operation operation : request.getOperations()) {
124124
try {
125-
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA,
126-
update -> {
127-
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
128-
"Mappings are not available on the replica yet, triggered update: " + update);
129-
});
125+
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
126+
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
127+
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
128+
"Mappings are not available on the replica yet, triggered update: " + operationResult.getRequiredMappingUpdate());
129+
}
130130
location = syncOperationResultOrThrow(operationResult, location);
131131
} catch (Exception e) {
132132
// if its not a failure to be ignored, let it bubble up

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,17 @@
3333
import org.elasticsearch.cluster.service.ClusterService;
3434
import org.elasticsearch.common.Nullable;
3535
import org.elasticsearch.common.settings.Settings;
36-
import org.elasticsearch.index.VersionType;
3736
import org.elasticsearch.index.engine.Engine;
3837
import org.elasticsearch.index.mapper.MapperParsingException;
39-
import org.elasticsearch.index.mapper.Mapping;
40-
import org.elasticsearch.index.mapper.SourceToParse;
4138
import org.elasticsearch.index.shard.IndexShard;
4239
import org.elasticsearch.index.shard.ShardId;
4340
import org.elasticsearch.index.translog.Translog;
4441
import org.elasticsearch.index.translog.Translog.Location;
4542
import org.elasticsearch.indices.IndicesService;
46-
import org.elasticsearch.node.NodeClosedException;
4743
import org.elasticsearch.threadpool.ThreadPool;
48-
import org.elasticsearch.transport.TransportException;
4944
import org.elasticsearch.transport.TransportResponse;
5045
import org.elasticsearch.transport.TransportService;
5146

52-
import java.io.IOException;
5347
import java.util.concurrent.atomic.AtomicBoolean;
5448
import java.util.concurrent.atomic.AtomicInteger;
5549
import java.util.concurrent.atomic.AtomicReference;
@@ -78,7 +72,7 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
7872
protected static Location syncOperationResultOrThrow(final Engine.Result operationResult,
7973
final Location currentLocation) throws Exception {
8074
final Location location;
81-
if (operationResult.hasFailure()) {
75+
if (operationResult.getFailure() != null) {
8276
// check if any transient write operation failures should be bubbled up
8377
Exception failure = operationResult.getFailure();
8478
assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;

server/src/main/java/org/elasticsearch/index/IndexingSlowLog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ private void setReformat(boolean reformat) {
144144

145145
@Override
146146
public void postIndex(ShardId shardId, Engine.Index indexOperation, Engine.IndexResult result) {
147-
if (result.hasFailure() == false) {
147+
if (result.getResultType() == Engine.Result.Type.SUCCESS) {
148148
final ParsedDocument doc = indexOperation.parsedDoc();
149149
final long tookInNanos = result.getTook();
150150
if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) {

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.elasticsearch.common.unit.TimeValue;
5959
import org.elasticsearch.common.util.concurrent.ReleasableLock;
6060
import org.elasticsearch.index.VersionType;
61+
import org.elasticsearch.index.mapper.Mapping;
6162
import org.elasticsearch.index.mapper.ParseContext.Document;
6263
import org.elasticsearch.index.mapper.ParsedDocument;
6364
import org.elasticsearch.index.merge.MergeStats;
@@ -295,27 +296,45 @@ public Condition newCondition() {
295296
**/
296297
public abstract static class Result {
297298
private final Operation.TYPE operationType;
299+
private final Result.Type resultType;
298300
private final long version;
299301
private final long seqNo;
300302
private final Exception failure;
301303
private final SetOnce<Boolean> freeze = new SetOnce<>();
304+
private final Mapping requiredMappingUpdate;
302305
private Translog.Location translogLocation;
303306
private long took;
304307

305308
protected Result(Operation.TYPE operationType, Exception failure, long version, long seqNo) {
306309
this.operationType = operationType;
307-
this.failure = failure;
310+
this.failure = Objects.requireNonNull(failure);
308311
this.version = version;
309312
this.seqNo = seqNo;
313+
this.requiredMappingUpdate = null;
314+
this.resultType = Type.FAILURE;
310315
}
311316

312317
protected Result(Operation.TYPE operationType, long version, long seqNo) {
313-
this(operationType, null, version, seqNo);
318+
this.operationType = operationType;
319+
this.version = version;
320+
this.seqNo = seqNo;
321+
this.failure = null;
322+
this.requiredMappingUpdate = null;
323+
this.resultType = Type.SUCCESS;
324+
}
325+
326+
protected Result(Operation.TYPE operationType, Mapping requiredMappingUpdate) {
327+
this.operationType = operationType;
328+
this.version = Versions.NOT_FOUND;
329+
this.seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
330+
this.failure = null;
331+
this.requiredMappingUpdate = requiredMappingUpdate;
332+
this.resultType = Type.MAPPING_UPDATE_REQUIRED;
314333
}
315334

316-
/** whether the operation had failure */
317-
public boolean hasFailure() {
318-
return failure != null;
335+
/** whether the operation was successful, has failed or was aborted due to a mapping update */
336+
public Type getResultType() {
337+
return resultType;
319338
}
320339

321340
/** get the updated document version */
@@ -332,6 +351,14 @@ public long getSeqNo() {
332351
return seqNo;
333352
}
334353

354+
/**
355+
* If the operation was aborted due to missing mappings, this method will return the mappings
356+
* that are required to complete the operation.
357+
*/
358+
public Mapping getRequiredMappingUpdate() {
359+
return requiredMappingUpdate;
360+
}
361+
335362
/** get the translog location after executing the operation */
336363
public Translog.Location getTranslogLocation() {
337364
return translogLocation;
@@ -371,6 +398,11 @@ void freeze() {
371398
freeze.set(true);
372399
}
373400

401+
public enum Type {
402+
SUCCESS,
403+
FAILURE,
404+
MAPPING_UPDATE_REQUIRED
405+
}
374406
}
375407

376408
public static class IndexResult extends Result {
@@ -383,9 +415,8 @@ public IndexResult(long version, long seqNo, boolean created) {
383415
}
384416

385417
/**
386-
* use in case of index operation failed before getting to internal engine
387-
* (e.g while preparing operation or updating mappings)
388-
* */
418+
* use in case of the index operation failed before getting to internal engine
419+
**/
389420
public IndexResult(Exception failure, long version) {
390421
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO);
391422
}
@@ -395,6 +426,11 @@ public IndexResult(Exception failure, long version, long seqNo) {
395426
this.created = false;
396427
}
397428

429+
public IndexResult(Mapping requiredMappingUpdate) {
430+
super(Operation.TYPE.INDEX, requiredMappingUpdate);
431+
this.created = false;
432+
}
433+
398434
public boolean isCreated() {
399435
return created;
400436
}
@@ -410,11 +446,23 @@ public DeleteResult(long version, long seqNo, boolean found) {
410446
this.found = found;
411447
}
412448

449+
/**
450+
* use in case of the delete operation failed before getting to internal engine
451+
**/
452+
public DeleteResult(Exception failure, long version) {
453+
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
454+
}
455+
413456
public DeleteResult(Exception failure, long version, long seqNo, boolean found) {
414457
super(Operation.TYPE.DELETE, failure, version, seqNo);
415458
this.found = found;
416459
}
417460

461+
public DeleteResult(Mapping requiredMappingUpdate) {
462+
super(Operation.TYPE.DELETE, requiredMappingUpdate);
463+
this.found = false;
464+
}
465+
418466
public boolean isFound() {
419467
return found;
420468
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ public IndexResult index(Index index) throws IOException {
765765
final IndexResult indexResult;
766766
if (plan.earlyResultOnPreFlightError.isPresent()) {
767767
indexResult = plan.earlyResultOnPreFlightError.get();
768-
assert indexResult.hasFailure();
768+
assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
769769
} else if (plan.indexIntoLucene) {
770770
indexResult = indexIntoLucene(index, plan);
771771
} else {
@@ -774,7 +774,7 @@ public IndexResult index(Index index) throws IOException {
774774
}
775775
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
776776
final Translog.Location location;
777-
if (indexResult.hasFailure() == false) {
777+
if (indexResult.getResultType() == Result.Type.SUCCESS) {
778778
location = translog.add(new Translog.Index(index, indexResult));
779779
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
780780
// if we have document failure, record it as a no-op in the translog with the generated seq_no
@@ -784,7 +784,7 @@ public IndexResult index(Index index) throws IOException {
784784
}
785785
indexResult.setTranslogLocation(location);
786786
}
787-
if (plan.indexIntoLucene && indexResult.hasFailure() == false) {
787+
if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
788788
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
789789
versionMap.maybePutIndexUnderLock(index.uid().bytes(),
790790
new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm()));
@@ -1087,7 +1087,7 @@ public DeleteResult delete(Delete delete) throws IOException {
10871087
}
10881088
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
10891089
final Translog.Location location;
1090-
if (deleteResult.hasFailure() == false) {
1090+
if (deleteResult.getResultType() == Result.Type.SUCCESS) {
10911091
location = translog.add(new Translog.Delete(delete, deleteResult));
10921092
} else if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
10931093
location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(),

0 commit comments

Comments
 (0)