Skip to content

Commit 716f163

Browse files
committed
Ensure to generate identical NoOp for the same failure (#33141)
We generate slightly different NoOps in InternalEngine and TransportShardBulkAction for the same failure. 1. InternalEngine uses Exception#getFailure to generate a message without the class name: newOp [NoOp{seqNo=1, primaryTerm=1, reason='Contexts are mandatory in context enabled completion field [suggest_context]'}]. 2. TransportShardBulkAction uses Exception#toString to generate a message with the class name: NoOp{seqNo=1, primaryTerm=1, reason='java.lang.IllegalArgumentException: Contexts are mandatory in context enabled completion field [suggest_context]'}. If a write operation fails while a replica is recovering, that replica will possibly receive two different NoOps: one from recovery and one from replication. These two different NoOps will trip TranslogWriter#assertNoSeqNumberConflict assertion. This commit ensures that we generate the same Noop for the same failure. Closes #32986
1 parent ebcf838 commit 716f163

File tree

3 files changed

+81
-75
lines changed

3 files changed

+81
-75
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,7 @@ public IndexResult index(Index index) throws IOException {
856856
location = translog.add(new Translog.Index(index, indexResult));
857857
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
858858
// if we have document failure, record it as a no-op in the translog with the generated seq_no
859-
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
859+
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().toString()));
860860
} else {
861861
location = null;
862862
}
@@ -1175,7 +1175,7 @@ public DeleteResult delete(Delete delete) throws IOException {
11751175
location = translog.add(new Translog.Delete(delete, deleteResult));
11761176
} else if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
11771177
location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(),
1178-
delete.primaryTerm(), deleteResult.getFailure().getMessage()));
1178+
delete.primaryTerm(), deleteResult.getFailure().toString()));
11791179
} else {
11801180
location = null;
11811181
}

server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

Lines changed: 62 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
import org.elasticsearch.common.util.iterable.Iterables;
3636
import org.elasticsearch.common.xcontent.XContentType;
3737
import org.elasticsearch.index.IndexSettings;
38+
import org.elasticsearch.index.VersionType;
3839
import org.elasticsearch.index.engine.Engine;
39-
import org.elasticsearch.index.engine.EngineConfig;
4040
import org.elasticsearch.index.engine.EngineFactory;
4141
import org.elasticsearch.index.engine.InternalEngine;
4242
import org.elasticsearch.index.engine.InternalEngineTests;
@@ -47,13 +47,15 @@
4747
import org.elasticsearch.index.shard.IndexShard;
4848
import org.elasticsearch.index.shard.IndexShardTests;
4949
import org.elasticsearch.index.store.Store;
50+
import org.elasticsearch.index.translog.SnapshotMatchers;
5051
import org.elasticsearch.index.translog.Translog;
5152
import org.elasticsearch.indices.recovery.RecoveryTarget;
5253
import org.elasticsearch.threadpool.TestThreadPool;
5354
import org.elasticsearch.threadpool.ThreadPool;
5455
import org.hamcrest.Matcher;
5556

5657
import java.io.IOException;
58+
import java.nio.charset.StandardCharsets;
5759
import java.util.ArrayList;
5860
import java.util.Collections;
5961
import java.util.List;
@@ -338,38 +340,75 @@ public void testReplicaOperationWithConcurrentPrimaryPromotion() throws Exceptio
338340
* for primary and replica shards
339341
*/
340342
public void testDocumentFailureReplication() throws Exception {
341-
final String failureMessage = "simulated document failure";
342-
final ThrowingDocumentFailureEngineFactory throwingDocumentFailureEngineFactory =
343-
new ThrowingDocumentFailureEngineFactory(failureMessage);
343+
final IOException indexException = new IOException("simulated indexing failure");
344+
final IOException deleteException = new IOException("simulated deleting failure");
345+
final EngineFactory engineFactory = config -> InternalEngineTests.createInternalEngine((dir, iwc) ->
346+
new IndexWriter(dir, iwc) {
347+
final AtomicBoolean throwAfterIndexedOneDoc = new AtomicBoolean(); // need one document to trigger delete in IW.
348+
@Override
349+
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
350+
if (throwAfterIndexedOneDoc.getAndSet(true)) {
351+
throw indexException;
352+
} else {
353+
return super.addDocument(doc);
354+
}
355+
}
356+
@Override
357+
public long deleteDocuments(Term... terms) throws IOException {
358+
throw deleteException;
359+
}
360+
}, null, null, config);
344361
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
345362
@Override
346-
protected EngineFactory getEngineFactory(ShardRouting routing) {
347-
return throwingDocumentFailureEngineFactory;
348-
}}) {
363+
protected EngineFactory getEngineFactory(ShardRouting routing) { return engineFactory; }}) {
349364

350-
// test only primary
365+
// start with the primary only so two first failures are replicated to replicas via recovery from the translog of the primary.
351366
shards.startPrimary();
352-
BulkItemResponse response = shards.index(
353-
new IndexRequest(index.getName(), "type", "1")
354-
.source("{}", XContentType.JSON)
355-
);
356-
assertTrue(response.isFailed());
357-
assertNoOpTranslogOperationForDocumentFailure(shards, 1, shards.getPrimary().getPendingPrimaryTerm(), failureMessage);
358-
shards.assertAllEqual(0);
367+
long primaryTerm = shards.getPrimary().getPendingPrimaryTerm();
368+
List<Translog.Operation> expectedTranslogOps = new ArrayList<>();
369+
BulkItemResponse indexResp = shards.index(new IndexRequest(index.getName(), "type", "1")
370+
.source("{}", XContentType.JSON).version(1).versionType(VersionType.EXTERNAL));
371+
assertThat(indexResp.isFailed(), equalTo(false));
372+
expectedTranslogOps.add(new Translog.Index("type", "1", 0, primaryTerm, 1, VersionType.EXTERNAL,
373+
"{}".getBytes(StandardCharsets.UTF_8), null, null, -1));
374+
try (Translog.Snapshot snapshot = getTranslog(shards.getPrimary()).newSnapshot()) {
375+
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
376+
}
377+
378+
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
379+
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
380+
expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString()));
381+
382+
BulkItemResponse deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
383+
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
384+
expectedTranslogOps.add(new Translog.NoOp(2, primaryTerm, deleteException.toString()));
385+
shards.assertAllEqual(1);
359386

360-
// add some replicas
361387
int nReplica = randomIntBetween(1, 3);
362388
for (int i = 0; i < nReplica; i++) {
363389
shards.addReplica();
364390
}
365391
shards.startReplicas(nReplica);
366-
response = shards.index(
367-
new IndexRequest(index.getName(), "type", "1")
368-
.source("{}", XContentType.JSON)
369-
);
370-
assertTrue(response.isFailed());
371-
assertNoOpTranslogOperationForDocumentFailure(shards, 2, shards.getPrimary().getPendingPrimaryTerm(), failureMessage);
372-
shards.assertAllEqual(0);
392+
for (IndexShard shard : shards) {
393+
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
394+
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
395+
}
396+
}
397+
// unlike previous failures, these two failures replicated directly from the replication channel.
398+
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
399+
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
400+
expectedTranslogOps.add(new Translog.NoOp(3, primaryTerm, indexException.toString()));
401+
402+
deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
403+
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
404+
expectedTranslogOps.add(new Translog.NoOp(4, primaryTerm, deleteException.toString()));
405+
406+
for (IndexShard shard : shards) {
407+
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
408+
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
409+
}
410+
}
411+
shards.assertAllEqual(1);
373412
}
374413
}
375414

@@ -541,47 +580,4 @@ public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception {
541580
shards.assertAllEqual(0);
542581
}
543582
}
544-
545-
/** Throws <code>documentFailure</code> on every indexing operation */
546-
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
547-
final String documentFailureMessage;
548-
549-
ThrowingDocumentFailureEngineFactory(String documentFailureMessage) {
550-
this.documentFailureMessage = documentFailureMessage;
551-
}
552-
553-
@Override
554-
public Engine newReadWriteEngine(EngineConfig config) {
555-
return InternalEngineTests.createInternalEngine((directory, writerConfig) ->
556-
new IndexWriter(directory, writerConfig) {
557-
@Override
558-
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
559-
assert documentFailureMessage != null;
560-
throw new IOException(documentFailureMessage);
561-
}
562-
}, null, null, config);
563-
}
564-
}
565-
566-
private static void assertNoOpTranslogOperationForDocumentFailure(
567-
Iterable<IndexShard> replicationGroup,
568-
int expectedOperation,
569-
long expectedPrimaryTerm,
570-
String failureMessage) throws IOException {
571-
for (IndexShard indexShard : replicationGroup) {
572-
try(Translog.Snapshot snapshot = getTranslog(indexShard).newSnapshot()) {
573-
assertThat(snapshot.totalOperations(), equalTo(expectedOperation));
574-
long expectedSeqNo = 0L;
575-
Translog.Operation op = snapshot.next();
576-
do {
577-
assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP));
578-
assertThat(op.seqNo(), equalTo(expectedSeqNo));
579-
assertThat(op.primaryTerm(), equalTo(expectedPrimaryTerm));
580-
assertThat(((Translog.NoOp) op).reason(), containsString(failureMessage));
581-
op = snapshot.next();
582-
expectedSeqNo++;
583-
} while (op != null);
584-
}
585-
}
586-
}
587583
}

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.action.resync.ResyncReplicationResponse;
3838
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
3939
import org.elasticsearch.action.support.PlainActionFuture;
40+
import org.elasticsearch.action.support.WriteRequest;
4041
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
4142
import org.elasticsearch.action.support.replication.ReplicationOperation;
4243
import org.elasticsearch.action.support.replication.ReplicationRequest;
@@ -193,14 +194,23 @@ public int appendDocs(final int numOfDoc) throws Exception {
193194
}
194195

195196
public BulkItemResponse index(IndexRequest indexRequest) throws Exception {
197+
return executeWriteRequest(indexRequest, indexRequest.getRefreshPolicy());
198+
}
199+
200+
public BulkItemResponse delete(DeleteRequest deleteRequest) throws Exception {
201+
return executeWriteRequest(deleteRequest, deleteRequest.getRefreshPolicy());
202+
}
203+
204+
private BulkItemResponse executeWriteRequest(
205+
DocWriteRequest<?> writeRequest, WriteRequest.RefreshPolicy refreshPolicy) throws Exception {
196206
PlainActionFuture<BulkItemResponse> listener = new PlainActionFuture<>();
197207
final ActionListener<BulkShardResponse> wrapBulkListener = ActionListener.wrap(
198-
bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]),
199-
listener::onFailure);
208+
bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]),
209+
listener::onFailure);
200210
BulkItemRequest[] items = new BulkItemRequest[1];
201-
items[0] = new BulkItemRequest(0, indexRequest);
202-
BulkShardRequest request = new BulkShardRequest(shardId, indexRequest.getRefreshPolicy(), items);
203-
new IndexingAction(request, wrapBulkListener, this).execute();
211+
items[0] = new BulkItemRequest(0, writeRequest);
212+
BulkShardRequest request = new BulkShardRequest(shardId, refreshPolicy, items);
213+
new WriteReplicationAction(request, wrapBulkListener, this).execute();
204214
return listener.get();
205215
}
206216

@@ -598,9 +608,9 @@ public void respond(ActionListener<Response> listener) {
598608

599609
}
600610

601-
class IndexingAction extends ReplicationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
611+
class WriteReplicationAction extends ReplicationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
602612

603-
IndexingAction(BulkShardRequest request, ActionListener<BulkShardResponse> listener, ReplicationGroup replicationGroup) {
613+
WriteReplicationAction(BulkShardRequest request, ActionListener<BulkShardResponse> listener, ReplicationGroup replicationGroup) {
604614
super(request, listener, replicationGroup, "indexing");
605615
}
606616

0 commit comments

Comments
 (0)