Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix BulkProcessor hangs for threads deadlocked (#44556) #46790

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkRespons
Objects.requireNonNull(consumer, "consumer");
Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
scheduledThreadPoolExecutor.setCorePoolSize(2);
return new Builder(consumer, listener,
buildScheduler(scheduledThreadPoolExecutor),
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,46 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.ExpectedException;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

public class BulkProcessorTests extends ESTestCase {

private ThreadPool threadPool;
private ExecutorService asyncExec = Executors.newFixedThreadPool(1);
private ExecutorService userExec = Executors.newFixedThreadPool(1);
private ScheduledThreadPoolExecutor scheduleThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);

@Rule
public ExpectedException exception = ExpectedException.none();

@Before
public void startThreadPool() {
threadPool = new TestThreadPool("BulkProcessorTests");
Expand All @@ -48,6 +67,75 @@ public void startThreadPool() {
@After
public void stopThreadPool() throws InterruptedException {
terminate(threadPool);
asyncExec.shutdown();
userExec.shutdownNow();
scheduleThreadPoolExecutor.shutdownNow();
}

public void testBulkProcessorThreadBlocked() throws Exception {
exception.expect(TimeoutException.class);
Future<?> future = buildAndExecuteBulkProcessor(initScheduler(1));
future.get(8, TimeUnit.SECONDS);// thread has been blocked, the IndexRequest cannot be successfully executed.
}

public void testBulkProcessorThread() throws Exception {
Future<?> future = buildAndExecuteBulkProcessor(initScheduler(2));
assertNull(future.get(4, TimeUnit.SECONDS));//the IndexRequest executed successfully.
}

private Scheduler initScheduler(int corePoolSize) {
scheduleThreadPoolExecutor.setCorePoolSize(corePoolSize);
return (command, delay, executor) ->
Scheduler.wrapAsScheduledCancellable(scheduleThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
}

private Future<?> buildAndExecuteBulkProcessor(Scheduler scheduler) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final int concurrentRequests = 0;
final int bulkActions = 3;
final TimeValue flushInterval = TimeValue.timeValueMillis(1000L);
final BackoffPolicy backoff = BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100L), 1);
final ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
BulkProcessor bulkProcessor = new BulkProcessor(bulkAsync(latch), backoff, emptyListener(),
concurrentRequests, bulkActions, bulkSize, flushInterval,
scheduler, null, BulkRequest::new);
Future<?> future = userExec.submit(() -> {
bulkProcessor.add(new IndexRequest());
bulkProcessor.add(new IndexRequest());
bulkProcessor.add(new IndexRequest());// Step-1: execute `BulkRequestHandler` and locked 'BulkProcessor'
});
Thread.sleep(2000L);// Step-2: wait and ensure IntervalFlush is called
latch.countDown(); // Due to step-1, the scheduling thread state is BLOCKED (on object monitor)
return future;
}

private BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkAsync(CountDownLatch latch) {
return (request, listener) ->
{
// Step-3: retry of bulk request by using scheduler thread
// Due to step-2, scheduler thread is already occupied, causing the retry policy to not be executed
// Causes the lock and latch in step-1 not to be released
asyncExec.execute(() -> {
try {
latch.await();
listener.onResponse(createBulkResponse());
} catch (InterruptedException e) {
throw ExceptionsHelper.convertToRuntime(e);
}
});
};
}

private BulkResponse createBulkResponse() {
EsRejectedExecutionException exception =new EsRejectedExecutionException();
Failure failure = new Failure("", "", "", exception, ExceptionsHelper.status(exception));
BulkItemResponse[] bulkActionResponses = new BulkItemResponse[] {
new BulkItemResponse(),
new BulkItemResponse(),
new BulkItemResponse(3, OpType.INDEX, failure)
};
BulkResponse bulkResponse = new BulkResponse(bulkActionResponses, 3000L);
return bulkResponse;
}

public void testBulkProcessorFlushPreservesContext() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import static org.hamcrest.Matchers.nullValue;

public class RetryTests extends ESTestCase {
// no need to wait fof a long time in tests
// no need to wait for a long time in tests
private static final TimeValue DELAY = TimeValue.timeValueMillis(1L);
private static final int CALLS_TO_FAIL = 5;

Expand Down