Skip to content

Commit 014b323

Browse files
authored
Ensure to generate identical NoOp for the same failure (#33141)
We generate slightly different NoOps in InternalEngine and TransportShardBulkAction for the same failure. 1. InternalEngine uses Exception#getFailure to generate a message without the class name: newOp [NoOp{seqNo=1, primaryTerm=1, reason='Contexts are mandatory in context enabled completion field [suggest_context]'}]. 2. TransportShardBulkAction uses Exception#toString to generate a message with the class name: NoOp{seqNo=1, primaryTerm=1, reason='java.lang.IllegalArgumentException: Contexts are mandatory in context enabled completion field [suggest_context]'}. If a write operation fails while a replica is recovering, that replica will possibly receive two different NoOps: one from recovery and one from replication. These two different NoOps will trip TranslogWriter#assertNoSeqNumberConflict assertion. This commit ensures that we generate the same Noop for the same failure. Closes #32986
1 parent ed0571e commit 014b323

File tree

3 files changed

+78
-75
lines changed

3 files changed

+78
-75
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,7 @@ public IndexResult index(Index index) throws IOException {
802802
location = translog.add(new Translog.Index(index, indexResult));
803803
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
804804
// if we have document failure, record it as a no-op in the translog with the generated seq_no
805-
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
805+
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().toString()));
806806
} else {
807807
location = null;
808808
}
@@ -1111,7 +1111,7 @@ public DeleteResult delete(Delete delete) throws IOException {
11111111
location = translog.add(new Translog.Delete(delete, deleteResult));
11121112
} else if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
11131113
location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(),
1114-
delete.primaryTerm(), deleteResult.getFailure().getMessage()));
1114+
delete.primaryTerm(), deleteResult.getFailure().toString()));
11151115
} else {
11161116
location = null;
11171117
}

server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

Lines changed: 59 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.common.xcontent.XContentType;
3737
import org.elasticsearch.index.IndexSettings;
3838
import org.elasticsearch.index.engine.Engine;
39-
import org.elasticsearch.index.engine.EngineConfig;
4039
import org.elasticsearch.index.engine.EngineFactory;
4140
import org.elasticsearch.index.engine.InternalEngine;
4241
import org.elasticsearch.index.engine.InternalEngineTests;
@@ -47,13 +46,15 @@
4746
import org.elasticsearch.index.shard.IndexShard;
4847
import org.elasticsearch.index.shard.IndexShardTests;
4948
import org.elasticsearch.index.store.Store;
49+
import org.elasticsearch.index.translog.SnapshotMatchers;
5050
import org.elasticsearch.index.translog.Translog;
5151
import org.elasticsearch.indices.recovery.RecoveryTarget;
5252
import org.elasticsearch.threadpool.TestThreadPool;
5353
import org.elasticsearch.threadpool.ThreadPool;
5454
import org.hamcrest.Matcher;
5555

5656
import java.io.IOException;
57+
import java.nio.charset.StandardCharsets;
5758
import java.util.ArrayList;
5859
import java.util.Collections;
5960
import java.util.List;
@@ -338,38 +339,73 @@ public void testReplicaOperationWithConcurrentPrimaryPromotion() throws Exceptio
338339
* for primary and replica shards
339340
*/
340341
public void testDocumentFailureReplication() throws Exception {
341-
final String failureMessage = "simulated document failure";
342-
final ThrowingDocumentFailureEngineFactory throwingDocumentFailureEngineFactory =
343-
new ThrowingDocumentFailureEngineFactory(failureMessage);
342+
final IOException indexException = new IOException("simulated indexing failure");
343+
final IOException deleteException = new IOException("simulated deleting failure");
344+
final EngineFactory engineFactory = config -> InternalEngineTests.createInternalEngine((dir, iwc) ->
345+
new IndexWriter(dir, iwc) {
346+
final AtomicBoolean throwAfterIndexedOneDoc = new AtomicBoolean(); // need one document to trigger delete in IW.
347+
@Override
348+
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
349+
if (throwAfterIndexedOneDoc.getAndSet(true)) {
350+
throw indexException;
351+
} else {
352+
return super.addDocument(doc);
353+
}
354+
}
355+
@Override
356+
public long deleteDocuments(Term... terms) throws IOException {
357+
throw deleteException;
358+
}
359+
}, null, null, config);
344360
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
345361
@Override
346-
protected EngineFactory getEngineFactory(ShardRouting routing) {
347-
return throwingDocumentFailureEngineFactory;
348-
}}) {
362+
protected EngineFactory getEngineFactory(ShardRouting routing) { return engineFactory; }}) {
349363

350-
// test only primary
364+
// start with the primary only so two first failures are replicated to replicas via recovery from the translog of the primary.
351365
shards.startPrimary();
352-
BulkItemResponse response = shards.index(
353-
new IndexRequest(index.getName(), "type", "1")
354-
.source("{}", XContentType.JSON)
355-
);
356-
assertTrue(response.isFailed());
357-
assertNoOpTranslogOperationForDocumentFailure(shards, 1, shards.getPrimary().getPendingPrimaryTerm(), failureMessage);
358-
shards.assertAllEqual(0);
366+
long primaryTerm = shards.getPrimary().getPendingPrimaryTerm();
367+
List<Translog.Operation> expectedTranslogOps = new ArrayList<>();
368+
BulkItemResponse indexResp = shards.index(new IndexRequest(index.getName(), "type", "1").source("{}", XContentType.JSON));
369+
assertThat(indexResp.isFailed(), equalTo(false));
370+
expectedTranslogOps.add(new Translog.Index("type", "1", 0, primaryTerm, 1, "{}".getBytes(StandardCharsets.UTF_8), null, -1));
371+
try (Translog.Snapshot snapshot = getTranslog(shards.getPrimary()).newSnapshot()) {
372+
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
373+
}
374+
375+
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
376+
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
377+
expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString()));
378+
379+
BulkItemResponse deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
380+
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
381+
expectedTranslogOps.add(new Translog.NoOp(2, primaryTerm, deleteException.toString()));
382+
shards.assertAllEqual(1);
359383

360-
// add some replicas
361384
int nReplica = randomIntBetween(1, 3);
362385
for (int i = 0; i < nReplica; i++) {
363386
shards.addReplica();
364387
}
365388
shards.startReplicas(nReplica);
366-
response = shards.index(
367-
new IndexRequest(index.getName(), "type", "1")
368-
.source("{}", XContentType.JSON)
369-
);
370-
assertTrue(response.isFailed());
371-
assertNoOpTranslogOperationForDocumentFailure(shards, 2, shards.getPrimary().getPendingPrimaryTerm(), failureMessage);
372-
shards.assertAllEqual(0);
389+
for (IndexShard shard : shards) {
390+
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
391+
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
392+
}
393+
}
394+
// unlike previous failures, these two failures replicated directly from the replication channel.
395+
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
396+
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
397+
expectedTranslogOps.add(new Translog.NoOp(3, primaryTerm, indexException.toString()));
398+
399+
deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
400+
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
401+
expectedTranslogOps.add(new Translog.NoOp(4, primaryTerm, deleteException.toString()));
402+
403+
for (IndexShard shard : shards) {
404+
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
405+
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
406+
}
407+
}
408+
shards.assertAllEqual(1);
373409
}
374410
}
375411

@@ -541,47 +577,4 @@ public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception {
541577
shards.assertAllEqual(0);
542578
}
543579
}
544-
545-
/** Throws <code>documentFailure</code> on every indexing operation */
546-
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
547-
final String documentFailureMessage;
548-
549-
ThrowingDocumentFailureEngineFactory(String documentFailureMessage) {
550-
this.documentFailureMessage = documentFailureMessage;
551-
}
552-
553-
@Override
554-
public Engine newReadWriteEngine(EngineConfig config) {
555-
return InternalEngineTests.createInternalEngine((directory, writerConfig) ->
556-
new IndexWriter(directory, writerConfig) {
557-
@Override
558-
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
559-
assert documentFailureMessage != null;
560-
throw new IOException(documentFailureMessage);
561-
}
562-
}, null, null, config);
563-
}
564-
}
565-
566-
private static void assertNoOpTranslogOperationForDocumentFailure(
567-
Iterable<IndexShard> replicationGroup,
568-
int expectedOperation,
569-
long expectedPrimaryTerm,
570-
String failureMessage) throws IOException {
571-
for (IndexShard indexShard : replicationGroup) {
572-
try(Translog.Snapshot snapshot = getTranslog(indexShard).newSnapshot()) {
573-
assertThat(snapshot.totalOperations(), equalTo(expectedOperation));
574-
long expectedSeqNo = 0L;
575-
Translog.Operation op = snapshot.next();
576-
do {
577-
assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP));
578-
assertThat(op.seqNo(), equalTo(expectedSeqNo));
579-
assertThat(op.primaryTerm(), equalTo(expectedPrimaryTerm));
580-
assertThat(((Translog.NoOp) op).reason(), containsString(failureMessage));
581-
op = snapshot.next();
582-
expectedSeqNo++;
583-
} while (op != null);
584-
}
585-
}
586-
}
587580
}

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.action.resync.ResyncReplicationResponse;
3838
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
3939
import org.elasticsearch.action.support.PlainActionFuture;
40+
import org.elasticsearch.action.support.WriteRequest;
4041
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
4142
import org.elasticsearch.action.support.replication.ReplicationOperation;
4243
import org.elasticsearch.action.support.replication.ReplicationRequest;
@@ -193,14 +194,23 @@ public int appendDocs(final int numOfDoc) throws Exception {
193194
}
194195

195196
public BulkItemResponse index(IndexRequest indexRequest) throws Exception {
197+
return executeWriteRequest(indexRequest, indexRequest.getRefreshPolicy());
198+
}
199+
200+
public BulkItemResponse delete(DeleteRequest deleteRequest) throws Exception {
201+
return executeWriteRequest(deleteRequest, deleteRequest.getRefreshPolicy());
202+
}
203+
204+
private BulkItemResponse executeWriteRequest(
205+
DocWriteRequest<?> writeRequest, WriteRequest.RefreshPolicy refreshPolicy) throws Exception {
196206
PlainActionFuture<BulkItemResponse> listener = new PlainActionFuture<>();
197207
final ActionListener<BulkShardResponse> wrapBulkListener = ActionListener.wrap(
198-
bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]),
199-
listener::onFailure);
208+
bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]),
209+
listener::onFailure);
200210
BulkItemRequest[] items = new BulkItemRequest[1];
201-
items[0] = new BulkItemRequest(0, indexRequest);
202-
BulkShardRequest request = new BulkShardRequest(shardId, indexRequest.getRefreshPolicy(), items);
203-
new IndexingAction(request, wrapBulkListener, this).execute();
211+
items[0] = new BulkItemRequest(0, writeRequest);
212+
BulkShardRequest request = new BulkShardRequest(shardId, refreshPolicy, items);
213+
new WriteReplicationAction(request, wrapBulkListener, this).execute();
204214
return listener.get();
205215
}
206216

@@ -598,9 +608,9 @@ public void respond(ActionListener<Response> listener) {
598608

599609
}
600610

601-
class IndexingAction extends ReplicationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
611+
class WriteReplicationAction extends ReplicationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
602612

603-
IndexingAction(BulkShardRequest request, ActionListener<BulkShardResponse> listener, ReplicationGroup replicationGroup) {
613+
WriteReplicationAction(BulkShardRequest request, ActionListener<BulkShardResponse> listener, ReplicationGroup replicationGroup) {
604614
super(request, listener, replicationGroup, "indexing");
605615
}
606616

0 commit comments

Comments
 (0)