From 7299894867469601879ea75465d38f1abca6541b Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Fri, 19 Nov 2021 08:48:01 +0100 Subject: [PATCH 1/2] Added simple conflict test. --- .../action/bulk/BulkIntegrationIT.java | 77 ++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java index cde1c2e19f794..7da9d8613027f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; @@ -19,10 +20,20 @@ import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.Tuple; import org.elasticsearch.ingest.IngestTestPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.StubbableTransport; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; @@ -32,6 +43,10 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -51,7 +66,7 @@ public class BulkIntegrationIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(IngestTestPlugin.class); + return Arrays.asList(IngestTestPlugin.class, MockTransportService.TestPlugin.class); } public void testBulkIndexCreatesMapping() throws Exception { @@ -197,4 +212,64 @@ public void testDeleteIndexWhileIndexing() throws Exception { } } + // tests that we abandon one of two conflicting transactions. + public void testPrepareConflict() throws Exception { + int shards = between(1, 5); + createIndex("test", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shards).build()); + String coordinating = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + Iterable transportServiceIterable = internalCluster().getInstances( + TransportService.class + ); + CountDownLatch ready = new CountDownLatch(1); + Set txes = ConcurrentCollections.newConcurrentSet(); + // todo: only really need to do this on coordinator. + transportServiceIterable.forEach(ts -> ((MockTransportService) ts).addSendBehavior(new StubbableTransport.SendRequestBehavior() { + @Override + public void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException { + if (action.startsWith(ShardPrepareCommitAction.NAME)) { + txes.add(((ShardPrepareCommitRequest) request).txid()); + new Thread(() -> { + try { + ready.await(); + } catch (InterruptedException e) { + fail(); + } + try { + connection.sendRequest(requestId, action, request, options); + } catch (IOException e) { + fail(); + } + }).start(); + } else { + connection.sendRequest(requestId, action, request, options); + } + } + })); + + ActionFuture future1 = client(coordinating).prepareIndex("test") + .setId("1") + .setSource(Map.of("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON) + .execute(); + ActionFuture future2 = client(coordinating).prepareIndex("test") + .setId("1") + .setSource(Map.of("g" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON) + .execute(); + assertBusy(() -> assertThat(txes.size(), equalTo(2))); + + ready.countDown(); + Tuple response1 = resultOrException(future1); + Tuple response2 = resultOrException(future2); + + // one failure + assertThat(response1.v2() != null, is(response2.v2() == null)); + + } + + Tuple resultOrException(ActionFuture future) { + try { + return Tuple.tuple(future.actionGet(), null); + } catch (Exception e) { + return Tuple.tuple(null, e); + } + } } From 94e894fe4cd6169ed94ae341c7f7186ff1490758 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Fri, 19 Nov 2021 09:43:36 +0100 Subject: [PATCH 2/2] Fix prepare bug. --- .../org/elasticsearch/action/bulk/TransportBulkAction.java | 1 + .../main/java/org/elasticsearch/index/shard/IndexShard.java | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 4d541909691a7..5de78d9e03c07 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -779,6 +779,7 @@ private Void commitOrFail(Collection responses) { for (ShardPrepareCommitResponse response : responses) { for (Map.Entry conflict : response.conflicts().entrySet()) { if (conflict.getValue() == false) { + logger.info("aborting transaction due to other transaction " + conflict.getKey()); throw new ElasticsearchException("conflicting transaction [{}] on shard", conflict.getKey()); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 20c2d6d2f795a..3660793ce3573 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -4139,9 +4139,7 @@ public void registerTransaction(TxID id, Set keys) { } public Map prepareCommit(TxID txID) { - // todo: lookup in transaction table - transactionRegistry.prepare(txID); - return new HashMap(); + return transactionRegistry.prepare(txID); }