From b3fc73bd3a6dd75e7dfb4c096a05de57d6993297 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Thu, 31 Oct 2019 13:02:57 -0500 Subject: [PATCH] Prevent deadlock by using separate schedulers (#48697) Currently the BulkProcessor class uses a single scheduler to schedule flushes and retries. Functionally these are very different concerns but can result in a dead lock. Specifically, the single shared scheduler can kick off a flush task, which only finishes it's task when the bulk that is being flushed finishes. If (for what ever reason), any items in that bulk fails it will (by default) schedule a retry. However, that retry will never run it's task, since the flush task is consuming the 1 and only thread available from the shared scheduler. Since the BulkProcessor is mostly client based code, the client can provide their own scheduler. As-is the scheduler would require at minimum 2 worker threads to avoid the potential deadlock. Since the number of threads is a configuration option in the scheduler, the code can not enforce this 2 worker rule until runtime. For this reason this commit splits the single task scheduler into 2 schedulers. This eliminates the potential for the flush task to block the retry task and removes this deadlock scenario. This commit also deprecates the Java APIs that presume a single scheduler, and updates any internal code to no longer use those APIs. Fixes #47599 Note - #41451 fixed the general case where a bulk fails and is retried that can result in a deadlock. This fix should address that case as well as the case when a bulk failure *from the flush* needs to be retried. --- .../action/bulk/BulkProcessor.java | 71 ++++++++++++++++--- .../action/bulk/BulkProcessorIT.java | 10 +-- .../action/bulk/BulkProcessorRetryIT.java | 2 +- .../xpack/ccr/IndexFollowingIT.java | 2 +- .../elasticsearch/xpack/watcher/Watcher.java | 4 +- .../execution/TriggeredWatchStoreTests.java | 3 +- .../watcher/history/HistoryStoreTests.java | 2 +- 7 files changed, 72 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 68775b5af5cfc..0523c8535a47a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -82,7 +82,8 @@ public static class Builder { private final BiConsumer> consumer; private final Listener listener; - private final Scheduler scheduler; + private final Scheduler flushScheduler; + private final Scheduler retryScheduler; private final Runnable onClose; private int concurrentRequests = 1; private int bulkActions = 1000; @@ -95,10 +96,11 @@ public static class Builder { private String globalPipeline; private Builder(BiConsumer> consumer, Listener listener, - Scheduler scheduler, Runnable onClose) { + Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) { this.consumer = consumer; this.listener = listener; - this.scheduler = scheduler; + this.flushScheduler = flushScheduler; + this.retryScheduler = retryScheduler; this.onClose = onClose; } @@ -182,7 +184,7 @@ public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) { */ public BulkProcessor build() { return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, - bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults()); + bulkSize, flushInterval, flushScheduler, retryScheduler, onClose, createBulkRequestWithGlobalDefaults()); } private Supplier createBulkRequestWithGlobalDefaults() { @@ -192,19 +194,55 @@ private Supplier createBulkRequestWithGlobalDefaults() { } } + /** + * @param client The client that executes the bulk operations + * @param listener The BulkProcessor listener that gets called on bulk events + * @param flushScheduler The scheduler that is used to flush + * @param retryScheduler The scheduler that is used for retries + * @param onClose The runnable instance that is executed on close. Consumers are required to clean up the schedulers. + * @return the builder for BulkProcessor + */ + public static Builder builder(Client client, Listener listener, Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose) { + Objects.requireNonNull(client, "client"); + Objects.requireNonNull(listener, "listener"); + return new Builder(client::bulk, listener, flushScheduler, retryScheduler, onClose); + } + + + /** + * @param client The client that executes the bulk operations + * @param listener The BulkProcessor listener that gets called on bulk events + * @return the builder for BulkProcessor + * @deprecated Use {@link #builder(java.util.function.BiConsumer, org.elasticsearch.action.bulk.BulkProcessor.Listener)} + * with client::bulk as the first argument, or {@link #builder(org.elasticsearch.client.Client, + * org.elasticsearch.action.bulk.BulkProcessor.Listener, org.elasticsearch.threadpool.Scheduler, + * org.elasticsearch.threadpool.Scheduler, java.lang.Runnable)} and manage the flush and retry schedulers explicitly + */ + @Deprecated public static Builder builder(Client client, Listener listener) { Objects.requireNonNull(client, "client"); Objects.requireNonNull(listener, "listener"); - return new Builder(client::bulk, listener, client.threadPool(), () -> {}); + return new Builder(client::bulk, listener, client.threadPool(), client.threadPool(), () -> {}); } + /** + * @param consumer The consumer that is called to fulfil bulk operations + * @param listener The BulkProcessor listener that gets called on bulk events + * @return the builder for BulkProcessor + */ public static Builder builder(BiConsumer> consumer, Listener listener) { Objects.requireNonNull(consumer, "consumer"); Objects.requireNonNull(listener, "listener"); - final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor flushScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor retryScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); return new Builder(consumer, listener, - buildScheduler(scheduledThreadPoolExecutor), - () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); + buildScheduler(flushScheduledThreadPoolExecutor), + buildScheduler(retryScheduledThreadPoolExecutor), + () -> + { + Scheduler.terminate(flushScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS); + Scheduler.terminate(retryScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS); + }); } private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) { @@ -229,18 +267,29 @@ private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThr BulkProcessor(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, - Scheduler scheduler, Runnable onClose, Supplier bulkRequestSupplier) { + Scheduler flushScheduler, Scheduler retryScheduler, Runnable onClose, Supplier bulkRequestSupplier) { this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); this.scheduler = scheduler; this.bulkRequest = bulkRequestSupplier.get(); this.bulkRequestSupplier = bulkRequestSupplier; - this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests); + this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, retryScheduler, concurrentRequests); // Start period flushing task after everything is setup - this.cancellableFlushTask = startFlushTask(flushInterval, scheduler); + this.cancellableFlushTask = startFlushTask(flushInterval, flushScheduler); this.onClose = onClose; } + /** + * @deprecated use the {@link BulkProcessor} constructor which uses separate schedulers for flush and retry + */ + @Deprecated + BulkProcessor(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener, + int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, + Scheduler scheduler, Runnable onClose, Supplier bulkRequestSupplier) { + this(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, + scheduler, scheduler, onClose, bulkRequestSupplier ); + } + /** * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. */ diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java index 76a99994e04ee..fe784fe07b804 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java @@ -63,7 +63,7 @@ public void testThatBulkProcessorCountIsCorrect() throws Exception { BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); int numDocs = randomIntBetween(10, 100); - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) //let's make sure that the bulk action limit trips, one single execution will index all the documents .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) @@ -87,7 +87,7 @@ public void testBulkProcessorFlush() throws Exception { int numDocs = randomIntBetween(10, 100); - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) //let's make sure that this bulk won't be automatically flushed .setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100)) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { @@ -122,7 +122,7 @@ public void testBulkProcessorConcurrentRequests() throws Exception { MultiGetRequestBuilder multiGetRequestBuilder; - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) //set interval and size to high values .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { @@ -204,7 +204,7 @@ public void testBulkProcessorWaitOnClose() throws Exception { BulkProcessorTestListener listener = new BulkProcessorTestListener(); int numDocs = randomIntBetween(10, 100); - BulkProcessor processor = BulkProcessor.builder(client(), listener) + BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) //let's make sure that the bulk action limit trips, one single execution will index all the documents .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), @@ -251,7 +251,7 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet(); BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) //set interval and size to high values .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index f1731083ae376..ce3835a96ec3d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -77,7 +77,7 @@ private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejec assertAcked(prepareCreate(INDEX_NAME)); ensureGreen(); - BulkProcessor bulkProcessor = BulkProcessor.builder(client(), new BulkProcessor.Listener() { + BulkProcessor bulkProcessor = BulkProcessor.builder(client()::bulk, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { // no op diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index ae5be24c3410c..c592d6e910098 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -438,7 +438,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} }; int bulkSize = between(1, 20); - BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient(), listener) + BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener) .setBulkActions(bulkSize) .setConcurrentRequests(4) .build(); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 7af26953ff68d..bd820fc508961 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -57,7 +58,6 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ssl.SSLService; @@ -356,7 +356,7 @@ public Collection createComponents(Client client, ClusterService cluster final InputRegistry inputRegistry = new InputRegistry(inputFactories); inputFactories.put(ChainInput.TYPE, new ChainInputFactory(inputRegistry)); - bulkProcessor = BulkProcessor.builder(ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN), new BulkProcessor.Listener() { + bulkProcessor = BulkProcessor.builder(new OriginSettingClient(client, WATCHER_ORIGIN)::bulk, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java index 428ec96df97e0..e55ad626271cd 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java @@ -128,7 +128,8 @@ public void init() { when(client.settings()).thenReturn(settings); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); parser = mock(TriggeredWatch.Parser.class); - BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build(); + BulkProcessor bulkProcessor = BulkProcessor. + builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); triggeredWatchStore = new TriggeredWatchStore(settings, client, parser, bulkProcessor); } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java index 2ea364de18b4e..072854d7cd467 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java @@ -70,7 +70,7 @@ public void init() { when(client.settings()).thenReturn(settings); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings)); BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class); - BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build(); + BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); historyStore = new HistoryStore(bulkProcessor); }