diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index a6e61367e..e5b3af038 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -58,10 +59,11 @@ public class BulkIngester implements AutoCloseable { private final long maxSize; private final int maxOperations; private final @Nullable BulkListener listener; + private ExecutorService listenerExecutor = null; // Created only if listener is present private final Long flushIntervalMillis; private @Nullable ScheduledFuture flushTask; - private @Nullable ScheduledExecutorService scheduler; + private @Nullable ScheduledExecutorService flushScheduler; // Current state private List operations = new ArrayList<>(); @@ -98,6 +100,14 @@ private BulkIngester(Builder builder) { this.maxSize = builder.bulkSize < 0 ? Long.MAX_VALUE : builder.bulkSize; this.maxOperations = builder.bulkOperations < 0 ? Integer.MAX_VALUE : builder.bulkOperations; this.listener = builder.listener; + if(listener != null) { + this.listenerExecutor = Executors.newSingleThreadScheduledExecutor((r) -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("listener-executor#" + ingesterId); + t.setDaemon(true); + return t; + }); + } this.flushIntervalMillis = builder.flushIntervalMillis; if (flushIntervalMillis != null) { @@ -114,7 +124,7 @@ private BulkIngester(Builder builder) { }); // Keep it, we'll have to close it. - this.scheduler = scheduler; + this.flushScheduler = scheduler; } else { // It's not ours, we will not close it. scheduler = builder.scheduler; @@ -291,7 +301,8 @@ public void flush() { long id = sendRequestCondition.invocations(); if (listener != null) { - listener.beforeBulk(id, request, requestContexts); + BulkRequest finalRequest = request; + listenerExecutor.submit(() -> listener.beforeBulk(id, finalRequest, requestContexts)); } CompletionStage result = client.bulk(request); @@ -317,12 +328,12 @@ public void flush() { if (resp != null) { // Success if (listener != null) { - listener.afterBulk(exec.id, exec.request, exec.contexts, resp); + listenerExecutor.submit(() -> listener.afterBulk(exec.id, exec.request, exec.contexts, resp)); } } else { // Failure if (listener != null) { - listener.afterBulk(exec.id, exec.request, exec.contexts, thr); + listenerExecutor.submit(() -> listener.afterBulk(exec.id, exec.request, exec.contexts, thr)); } } return null; @@ -389,8 +400,12 @@ public void close() { flushTask.cancel(false); } - if (scheduler != null) { - scheduler.shutdownNow(); + if (flushScheduler != null) { + flushScheduler.shutdownNow(); + } + + if (listenerExecutor != null){ + listenerExecutor.shutdownNow(); } }