Skip to content

Commit

Permalink
running listener code in separate thread
Browse files Browse the repository at this point in the history
  • Loading branch information
l-trotta committed Jun 5, 2024
1 parent f3fbf3c commit 26d5542
Showing 1 changed file with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -58,10 +59,11 @@ public class BulkIngester<Context> implements AutoCloseable {
private final long maxSize;
private final int maxOperations;
private final @Nullable BulkListener<Context> listener;
private ExecutorService listenerExecutor = null; // Created only if listener is present
private final Long flushIntervalMillis;

private @Nullable ScheduledFuture<?> flushTask;
private @Nullable ScheduledExecutorService scheduler;
private @Nullable ScheduledExecutorService flushScheduler;

// Current state
private List<BulkOperation> operations = new ArrayList<>();
Expand Down Expand Up @@ -98,6 +100,14 @@ private BulkIngester(Builder<Context> builder) {
this.maxSize = builder.bulkSize < 0 ? Long.MAX_VALUE : builder.bulkSize;
this.maxOperations = builder.bulkOperations < 0 ? Integer.MAX_VALUE : builder.bulkOperations;
this.listener = builder.listener;
if(listener != null) {
this.listenerExecutor = Executors.newSingleThreadScheduledExecutor((r) -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("listener-executor#" + ingesterId);
t.setDaemon(true);
return t;
});
}
this.flushIntervalMillis = builder.flushIntervalMillis;

if (flushIntervalMillis != null) {
Expand All @@ -114,7 +124,7 @@ private BulkIngester(Builder<Context> builder) {
});

// Keep it, we'll have to close it.
this.scheduler = scheduler;
this.flushScheduler = scheduler;
} else {
// It's not ours, we will not close it.
scheduler = builder.scheduler;
Expand Down Expand Up @@ -291,7 +301,8 @@ public void flush() {
long id = sendRequestCondition.invocations();

if (listener != null) {
listener.beforeBulk(id, request, requestContexts);
BulkRequest finalRequest = request;
listenerExecutor.submit(() -> listener.beforeBulk(id, finalRequest, requestContexts));
}

CompletionStage<BulkResponse> result = client.bulk(request);
Expand All @@ -317,12 +328,12 @@ public void flush() {
if (resp != null) {
// Success
if (listener != null) {
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
listenerExecutor.submit(() -> listener.afterBulk(exec.id, exec.request, exec.contexts, resp));
}
} else {
// Failure
if (listener != null) {
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
listenerExecutor.submit(() -> listener.afterBulk(exec.id, exec.request, exec.contexts, thr));
}
}
return null;
Expand Down Expand Up @@ -389,8 +400,12 @@ public void close() {
flushTask.cancel(false);
}

if (scheduler != null) {
scheduler.shutdownNow();
if (flushScheduler != null) {
flushScheduler.shutdownNow();
}

if (listenerExecutor != null){
listenerExecutor.shutdownNow();
}
}

Expand Down

0 comments on commit 26d5542

Please sign in to comment.