@@ -62,6 +62,7 @@ public class BulkIngester<Context> implements AutoCloseable {
6262
6363 private @ Nullable ScheduledFuture <?> flushTask ;
6464 private @ Nullable ScheduledExecutorService scheduler ;
65+ private boolean isExternalScheduler = false ;
6566
6667 // Current state
6768 private List <BulkOperation > operations = new ArrayList <>();
@@ -82,7 +83,8 @@ private static class RequestExecution<Context> {
8283 public final List <Context > contexts ;
8384 public final CompletionStage <BulkResponse > futureResponse ;
8485
85- RequestExecution (long id , BulkRequest request , List <Context > contexts , CompletionStage <BulkResponse > futureResponse ) {
86+ RequestExecution (long id , BulkRequest request , List <Context > contexts ,
87+ CompletionStage <BulkResponse > futureResponse ) {
8688 this .id = id ;
8789 this .request = request ;
8890 this .contexts = contexts ;
@@ -99,27 +101,25 @@ private BulkIngester(Builder<Context> builder) {
99101 this .maxOperations = builder .bulkOperations < 0 ? Integer .MAX_VALUE : builder .bulkOperations ;
100102 this .listener = builder .listener ;
101103 this .flushIntervalMillis = builder .flushIntervalMillis ;
102-
103- if (flushIntervalMillis != null ) {
104- long flushInterval = flushIntervalMillis ;
105104
105+ if (flushIntervalMillis != null || listener != null ) {
106106 // Create a scheduler if needed
107- ScheduledExecutorService scheduler ;
108107 if (builder .scheduler == null ) {
109- scheduler = Executors .newSingleThreadScheduledExecutor ((r ) -> {
110- Thread t = Executors .defaultThreadFactory ().newThread (r );
111- t .setName ("bulk-ingester-flusher#" + ingesterId );
112- t .setDaemon (true );
113- return t ;
114- });
115-
116- // Keep it, we'll have to close it.
117- this .scheduler = scheduler ;
108+ this .scheduler = Executors .newScheduledThreadPool (maxRequests + 1 , (r ) -> {
109+ Thread t = Executors .defaultThreadFactory ().newThread (r );
110+ t .setName ("bulk-ingester-executor#" + ingesterId + "#" + t .getId ());
111+ t .setDaemon (true );
112+ return t ;
113+ });
118114 } else {
119115 // It's not ours, we will not close it.
120- scheduler = builder .scheduler ;
116+ this .scheduler = builder .scheduler ;
117+ this .isExternalScheduler = true ;
121118 }
122-
119+ }
120+
121+ if (flushIntervalMillis != null ) {
122+ long flushInterval = flushIntervalMillis ;
123123 this .flushTask = scheduler .scheduleWithFixedDelay (
124124 this ::failsafeFlush ,
125125 flushInterval , flushInterval ,
@@ -221,7 +221,7 @@ public long requestCount() {
221221 * @see Builder#maxConcurrentRequests
222222 */
223223 public long requestContentionsCount () {
224- return this .sendRequestCondition .contentions ();
224+ return this .sendRequestCondition .contentions ();
225225 }
226226
227227 //----- Predicates for the condition variables
@@ -265,7 +265,7 @@ private BulkRequest.Builder newRequest() {
265265 private void failsafeFlush () {
266266 try {
267267 flush ();
268- } catch (Throwable thr ) {
268+ } catch (Throwable thr ) {
269269 // Log the error and continue
270270 logger .error ("Error in background flush" , thr );
271271 }
@@ -280,7 +280,8 @@ public void flush() {
280280 () -> {
281281 // Build the request
282282 BulkRequest request = newRequest ().operations (operations ).build ();
283- List <Context > requestContexts = contexts == null ? Collections .nCopies (operations .size (), null ) : contexts ;
283+ List <Context > requestContexts = contexts == null ? Collections .nCopies (operations .size (),
284+ null ) : contexts ;
284285
285286 // Prepare for next round
286287 operations = new ArrayList <>();
@@ -291,7 +292,8 @@ public void flush() {
291292 long id = sendRequestCondition .invocations ();
292293
293294 if (listener != null ) {
294- listener .beforeBulk (id , request , requestContexts );
295+ BulkRequest finalRequest = request ;
296+ scheduler .submit (() -> listener .beforeBulk (id , finalRequest , requestContexts ));
295297 }
296298
297299 CompletionStage <BulkResponse > result = client .bulk (request );
@@ -303,7 +305,7 @@ public void flush() {
303305 }
304306
305307 return new RequestExecution <>(id , request , requestContexts , result );
306- });
308+ });
307309
308310 if (exec != null ) {
309311 // A request was actually sent
@@ -317,12 +319,14 @@ public void flush() {
317319 if (resp != null ) {
318320 // Success
319321 if (listener != null ) {
320- listener .afterBulk (exec .id , exec .request , exec .contexts , resp );
322+ scheduler .submit (() -> listener .afterBulk (exec .id , exec .request ,
323+ exec .contexts , resp ));
321324 }
322325 } else {
323326 // Failure
324327 if (listener != null ) {
325- listener .afterBulk (exec .id , exec .request , exec .contexts , thr );
328+ scheduler .submit (() -> listener .afterBulk (exec .id , exec .request ,
329+ exec .contexts , thr ));
326330 }
327331 }
328332 return null ;
@@ -383,13 +387,14 @@ public void close() {
383387 // Flush buffered operations
384388 flush ();
385389 // and wait for all requests to be completed
386- closeCondition .whenReady (() -> {});
390+ closeCondition .whenReady (() -> {
391+ });
387392
388393 if (flushTask != null ) {
389394 flushTask .cancel (false );
390395 }
391396
392- if (scheduler != null ) {
397+ if (scheduler != null && ! isExternalScheduler ) {
393398 scheduler .shutdownNow ();
394399 }
395400 }
@@ -404,7 +409,7 @@ public static class Builder<Context> implements ObjectBuilder<BulkIngester<Conte
404409 private ElasticsearchAsyncClient client ;
405410 private BulkRequest globalSettings ;
406411 private int bulkOperations = 1000 ;
407- private long bulkSize = 5 * 1024 * 1024 ;
412+ private long bulkSize = 5 * 1024 * 1024 ;
408413 private int maxConcurrentRequests = 1 ;
409414 private Long flushIntervalMillis ;
410415 private BulkListener <Context > listener ;
@@ -438,7 +443,8 @@ public Builder<Context> maxOperations(int count) {
438443 }
439444
440445 /**
441- * Sets when to flush a new bulk request based on the size in bytes of actions currently added. A request is sent
446+ * Sets when to flush a new bulk request based on the size in bytes of actions currently added. A
447+ * request is sent
442448 * once that size has been exceeded. Defaults to 5 megabytes. Can be set to {@code -1} to disable it.
443449 *
444450 * @throws IllegalArgumentException if less than -1.
@@ -452,7 +458,8 @@ public Builder<Context> maxSize(long bytes) {
452458 }
453459
454460 /**
455- * Sets the number of concurrent requests allowed to be executed. A value of 1 means 1 request is allowed to be executed
461+ * Sets the number of concurrent requests allowed to be executed. A value of 1 means 1 request is
462+ * allowed to be executed
456463 * while accumulating new bulk requests. Defaults to {@code 1}.
457464 *
458465 * @throws IllegalArgumentException if less than 1.
@@ -468,7 +475,8 @@ public Builder<Context> maxConcurrentRequests(int max) {
468475 /**
469476 * Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set.
470477 * <p>
471- * Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
478+ * Flushing is still subject to the maximum number of requests set with
479+ * {@link #maxConcurrentRequests}.
472480 *
473481 * @throws IllegalArgumentException if not a positive duration.
474482 */
@@ -483,13 +491,25 @@ public Builder<Context> flushInterval(long value, TimeUnit unit) {
483491 /**
484492 * Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set.
485493 * <p>
486- * Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
494+ * Flushing is still subject to the maximum number of requests set with
495+ * {@link #maxConcurrentRequests}.
496+ * @deprecated use {@link #scheduler(ScheduledExecutorService)}
487497 */
498+ @ Deprecated
488499 public Builder <Context > flushInterval (long value , TimeUnit unit , ScheduledExecutorService scheduler ) {
489500 this .scheduler = scheduler ;
490501 return flushInterval (value , unit );
491502 }
492503
504+ /**
505+ * Sets a custom scheduler to run the flush thread and the listener logic. A default one is used if
506+ * not set.
507+ */
508+ public Builder <Context > scheduler (ScheduledExecutorService scheduler ) {
509+ this .scheduler = scheduler ;
510+ return this ;
511+ }
512+
493513 public Builder <Context > listener (BulkListener <Context > listener ) {
494514 this .listener = listener ;
495515 return this ;
@@ -518,7 +538,8 @@ public Builder<Context> globalSettings(Function<BulkRequest.Builder, BulkRequest
518538 @ Override
519539 public BulkIngester <Context > build () {
520540 // Ensure some chunking criteria are defined
521- boolean hasCriteria = this .bulkOperations >= 0 || this .bulkSize >= 0 || this .flushIntervalMillis != null ;
541+ boolean hasCriteria =
542+ this .bulkOperations >= 0 || this .bulkSize >= 0 || this .flushIntervalMillis != null ;
522543
523544 if (!hasCriteria ) {
524545 throw new IllegalStateException ("No bulk operation chunking criteria have been set." );
0 commit comments