Skip to content

Commit

Permalink
ShardBulkAction ignore primary response on primary (#38901)
Browse files Browse the repository at this point in the history
Previously, if a version conflict occurred and a previous primary
response was present, the original primary response would be used both
for sending to replica and back to client. This was made in the past as an
attempt to fix issues with conflicts after relocations where a bulk request
would experience a closed shard half way through and thus have to retry
on the new primary. It could then fail on its own update.

With sequence numbers, this leads to an issue, since if a primary is
demoted (network partitions), it will send along the original response
in the request. In case of a conflict on the new primary, the old
response is sent to the replica. That data could be stale, leading to
inconsistency between primary and replica.

Relocations now do an explicit hand-off from old to new primary and
ensures that no operations are active while doing this. Above is thus no
longer necessary. This change removes the special handling of conflicts
and ignores primary responses when executing shard bulk requests on the
primary.
  • Loading branch information
henningandersen authored Feb 15, 2019
1 parent 3f1125f commit dacb0df
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,6 @@ public String getConcreteIndex() {
return getCurrentItem().index();
}

/** returns any primary response that was set by a previous primary */
public BulkItemResponse getPreviousPrimaryResponse() {
return getCurrentItem().getPrimaryResponse();
}

/** returns a translog location that is needed to be synced in order to persist all operations executed so far */
public Translog.Location getLocationToSync() {
assert hasMoreOperationsToExecute() == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,7 @@ private static void finalizePrimaryOperationOnCompletion(BulkPrimaryExecutionCon
context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure);
}

final BulkItemResponse primaryResponse;
// if it's a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the failed execution
if (TransportShardBulkAction.isConflictException(failure) && context.getPreviousPrimaryResponse() != null) {
primaryResponse = context.getPreviousPrimaryResponse();
} else {
primaryResponse = executionResult;
}
context.markAsCompleted(primaryResponse);
context.markAsCompleted(executionResult);
} else {
context.markAsCompleted(executionResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ public void testExecuteBulkIndexRequest() throws Exception {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

randomlySetIgnoredPrimaryResponse(primaryRequest);

UpdateHelper updateHelper = null;
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
Expand All @@ -169,6 +171,8 @@ public void testExecuteBulkIndexRequest() throws Exception {
items[0] = primaryRequest;
bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

randomlySetIgnoredPrimaryResponse(primaryRequest);

BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper,
threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {});
Expand Down Expand Up @@ -271,6 +275,8 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(mappingUpdate);

randomlySetIgnoredPrimaryResponse(items[0]);

// Pretend the mappings haven't made it to the node yet
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
AtomicInteger updateCalled = new AtomicInteger();
Expand Down Expand Up @@ -326,6 +332,8 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex

boolean errorOnWait = randomBoolean();

randomlySetIgnoredPrimaryResponse(items[0]);

BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(),
Expand Down Expand Up @@ -365,6 +373,8 @@ public void testExecuteBulkDeleteRequest() throws Exception {
Translog.Location location = new Translog.Location(0, 0, 0);
UpdateHelper updateHelper = null;

randomlySetIgnoredPrimaryResponse(items[0]);

BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(), () -> {});
Expand Down Expand Up @@ -405,6 +415,8 @@ public void testExecuteBulkDeleteRequest() throws Exception {

location = context.getLocationToSync();

randomlySetIgnoredPrimaryResponse(items[0]);

context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(), () -> {});
Expand Down Expand Up @@ -459,6 +471,8 @@ public void testNoopUpdateRequest() throws Exception {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

randomlySetIgnoredPrimaryResponse(primaryRequest);

BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(), () -> {});
Expand Down Expand Up @@ -503,6 +517,7 @@ public void testUpdateRequestWithFailure() throws Exception {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

randomlySetIgnoredPrimaryResponse(primaryRequest);

BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
Expand Down Expand Up @@ -552,6 +567,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

randomlySetIgnoredPrimaryResponse(primaryRequest);

BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
Expand Down Expand Up @@ -598,6 +614,7 @@ public void testUpdateRequestWithSuccess() throws Exception {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

randomlySetIgnoredPrimaryResponse(primaryRequest);

BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
Expand Down Expand Up @@ -643,6 +660,7 @@ public void testUpdateWithDelete() throws Exception {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

randomlySetIgnoredPrimaryResponse(primaryRequest);

BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
Expand Down Expand Up @@ -676,6 +694,7 @@ public void testFailureDuringUpdateProcessing() throws Exception {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

randomlySetIgnoredPrimaryResponse(primaryRequest);

BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
Expand Down Expand Up @@ -809,6 +828,14 @@ public void testRetries() throws Exception {
assertThat(response.getSeqNo(), equalTo(13L));
}

private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
if (randomBoolean()) {
// add a response to the request and thereby check that it is ignored for the primary.
primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(null, "_doc",
"ignore-primary-response-on-primary", 42, 42, 42, false)));
}
}

/**
* Fake IndexResult that has a settable translog location
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -37,6 +38,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption;
Expand Down Expand Up @@ -75,6 +77,18 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class ClusterDisruptionIT extends AbstractDisruptionTestCase {

private enum ConflictMode {
none,
external,
create;


static ConflictMode randomMode() {
ConflictMode[] values = values();
return values[randomInt(values.length-1)];
}
}

/**
* Test that we do not loose document whose indexing request was successful, under a randomly selected disruption scheme
* We also collect & report the type of indexing failures that occur.
Expand Down Expand Up @@ -111,7 +125,9 @@ public void testAckedIndexing() throws Exception {
final AtomicReference<CountDownLatch> countDownLatchRef = new AtomicReference<>();
final List<Exception> exceptedExceptions = new CopyOnWriteArrayList<>();

logger.info("starting indexers");
final ConflictMode conflictMode = ConflictMode.randomMode();

logger.info("starting indexers using conflict mode " + conflictMode);
try {
for (final String node : nodes) {
final Semaphore semaphore = new Semaphore(0);
Expand All @@ -131,11 +147,17 @@ public void testAckedIndexing() throws Exception {
id = Integer.toString(idGenerator.incrementAndGet());
int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
IndexResponse response =
client.prepareIndex("test", "type", id)
.setSource("{}", XContentType.JSON)
.setTimeout(timeout)
.get(timeout);
IndexRequestBuilder indexRequestBuilder = client.prepareIndex("test", "type", id)
.setSource("{}", XContentType.JSON)
.setTimeout(timeout);

if (conflictMode == ConflictMode.external) {
indexRequestBuilder.setVersion(randomIntBetween(1,10)).setVersionType(VersionType.EXTERNAL);
} else if (conflictMode == ConflictMode.create) {
indexRequestBuilder.setCreate(true);
}

IndexResponse response = indexRequestBuilder.get(timeout);
assertThat(response.getResult(), isOneOf(CREATED, UPDATED));
ackedDocs.put(id, node);
logger.trace("[{}] indexed id [{}] through node [{}], response [{}]", name, id, node, response);
Expand Down

0 comments on commit dacb0df

Please sign in to comment.