From e0daea6828431b88fa61aa01fd78b820a977a18f Mon Sep 17 00:00:00 2001 From: SuXingLee <913910636@qq.com> Date: Wed, 18 Sep 2019 00:25:08 +0800 Subject: [PATCH 1/2] Fix BulkProcessor hangs for threads deadlocked --- .../action/bulk/BulkProcessor.java | 1 + .../action/bulk/BulkProcessorTests.java | 87 +++++++++++++++++++ .../elasticsearch/action/bulk/RetryTests.java | 2 +- 3 files changed, 89 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 68775b5af5cfc..9fbc9f062dd1c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -202,6 +202,7 @@ public static Builder builder(BiConsumer Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index e2527397a780a..de9d226816193 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -19,26 +19,41 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import org.junit.rules.ExpectedException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; public class BulkProcessorTests extends ESTestCase { private ThreadPool threadPool; + private ExecutorService asyncExec = Executors.newFixedThreadPool(1); + private ExecutorService userExec = Executors.newFixedThreadPool(1); + private ScheduledThreadPoolExecutor scheduleThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); @Before public void startThreadPool() { @@ -48,6 +63,78 @@ public void startThreadPool() { @After public void stopThreadPool() throws InterruptedException { terminate(threadPool); + asyncExec.shutdown(); + userExec.shutdownNow(); + scheduleThreadPoolExecutor.shutdownNow(); + } + + public void testBulkProcessorThreadBlocked() throws Exception { + Future future = buildAndExecuteBulkProcessor(initScheduler(1)); + try { + future.get(8, TimeUnit.SECONDS);// thread has been blocked, the IndexRequest cannot be successfully executed. + } catch (Exception e) { + ExpectedException.none().expect(TimeoutException.class); + } + } + + public void testBulkProcessorThread() throws Exception { + Future future = buildAndExecuteBulkProcessor(initScheduler(2)); + assertNull(future.get(8, TimeUnit.SECONDS));//the IndexRequest executed successfully. + } + + private Scheduler initScheduler(int corePoolSize) { + scheduleThreadPoolExecutor.setCorePoolSize(corePoolSize); + return (command, delay, executor) -> + Scheduler.wrapAsScheduledCancellable(scheduleThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); + } + + private Future buildAndExecuteBulkProcessor(Scheduler scheduler) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + final int concurrentRequests = 0; + final int bulkActions = 3; + final TimeValue flushInterval = TimeValue.timeValueMillis(1500L); + final BackoffPolicy backoff = BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100L), 1); + final ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); + BulkProcessor bulkProcessor = new BulkProcessor(bulkAsync(latch), backoff, emptyListener(), + concurrentRequests, bulkActions, bulkSize, flushInterval, + scheduler, null, BulkRequest::new); + Future future = userExec.submit(() -> { + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest()); + bulkProcessor.add(new IndexRequest());// Step-1: execute `BulkRequestHandler` and locked 'BulkProcessor' + }); + Thread.sleep(3000L);// Step-2: wait and ensure IntervalFlush is called + latch.countDown(); // Due to step-1, the scheduling thread state is BLOCKED (on object monitor) + return future; + } + + private BiConsumer> bulkAsync(CountDownLatch latch) { + return (request, listener) -> + { + // Step-3: retry of bulk request by using scheduler thread + // Due to step-2, scheduler thread is already occupied, causing the retry policy to not be executed + // Causes the lock and latch in step-1 not to be released + asyncExec.execute(() -> { + try { + latch.await(); + listener.onResponse(createBulkResponse()); + } catch (InterruptedException e) { + throw ExceptionsHelper.convertToRuntime(e); + } + }); + }; + } + + private BulkResponse createBulkResponse() { + EsRejectedExecutionException exception =new EsRejectedExecutionException(); + Failure failure = new Failure("", "", "", exception, ExceptionsHelper.status(exception)); + BulkItemResponse[] bulkActionResponses = new BulkItemResponse[] { + new BulkItemResponse(), + new BulkItemResponse(), + new BulkItemResponse(3, OpType.INDEX, failure) + }; + BulkResponse bulkResponse = new BulkResponse(bulkActionResponses, 3000L); + return bulkResponse; } public void testBulkProcessorFlushPreservesContext() throws InterruptedException { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java index decee8ceab714..52a153c6f1d6d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java @@ -41,7 +41,7 @@ import static org.hamcrest.Matchers.nullValue; public class RetryTests extends ESTestCase { - // no need to wait fof a long time in tests + // no need to wait for a long time in tests private static final TimeValue DELAY = TimeValue.timeValueMillis(1L); private static final int CALLS_TO_FAIL = 5; From e9c8458dffbbe666c6203cbb2a6ebe4dbcf71d22 Mon Sep 17 00:00:00 2001 From: SuXingLee Date: Thu, 19 Sep 2019 21:44:24 +0800 Subject: [PATCH 2/2] test bulk expect TimeoutException --- .../action/bulk/BulkProcessorTests.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index de9d226816193..82363f10a6e95 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.rules.ExpectedException; import java.util.concurrent.CountDownLatch; @@ -55,6 +56,9 @@ public class BulkProcessorTests extends ESTestCase { private ExecutorService userExec = Executors.newFixedThreadPool(1); private ScheduledThreadPoolExecutor scheduleThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); + @Rule + public ExpectedException exception = ExpectedException.none(); + @Before public void startThreadPool() { threadPool = new TestThreadPool("BulkProcessorTests"); @@ -69,17 +73,14 @@ public void stopThreadPool() throws InterruptedException { } public void testBulkProcessorThreadBlocked() throws Exception { + exception.expect(TimeoutException.class); Future future = buildAndExecuteBulkProcessor(initScheduler(1)); - try { - future.get(8, TimeUnit.SECONDS);// thread has been blocked, the IndexRequest cannot be successfully executed. - } catch (Exception e) { - ExpectedException.none().expect(TimeoutException.class); - } + future.get(8, TimeUnit.SECONDS);// thread has been blocked, the IndexRequest cannot be successfully executed. } public void testBulkProcessorThread() throws Exception { Future future = buildAndExecuteBulkProcessor(initScheduler(2)); - assertNull(future.get(8, TimeUnit.SECONDS));//the IndexRequest executed successfully. + assertNull(future.get(4, TimeUnit.SECONDS));//the IndexRequest executed successfully. } private Scheduler initScheduler(int corePoolSize) { @@ -92,7 +93,7 @@ private Future buildAndExecuteBulkProcessor(Scheduler scheduler) throws Inter CountDownLatch latch = new CountDownLatch(1); final int concurrentRequests = 0; final int bulkActions = 3; - final TimeValue flushInterval = TimeValue.timeValueMillis(1500L); + final TimeValue flushInterval = TimeValue.timeValueMillis(1000L); final BackoffPolicy backoff = BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100L), 1); final ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); BulkProcessor bulkProcessor = new BulkProcessor(bulkAsync(latch), backoff, emptyListener(), @@ -103,7 +104,7 @@ private Future buildAndExecuteBulkProcessor(Scheduler scheduler) throws Inter bulkProcessor.add(new IndexRequest()); bulkProcessor.add(new IndexRequest());// Step-1: execute `BulkRequestHandler` and locked 'BulkProcessor' }); - Thread.sleep(3000L);// Step-2: wait and ensure IntervalFlush is called + Thread.sleep(2000L);// Step-2: wait and ensure IntervalFlush is called latch.countDown(); // Due to step-1, the scheduling thread state is BLOCKED (on object monitor) return future; }