Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BulkIngester - Running listener code in separate thread pool #830

Merged
merged 6 commits into from
Jun 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class BulkIngester<Context> implements AutoCloseable {

private @Nullable ScheduledFuture<?> flushTask;
private @Nullable ScheduledExecutorService scheduler;
private boolean isExternalScheduler = false;

// Current state
private List<BulkOperation> operations = new ArrayList<>();
Expand All @@ -82,7 +83,8 @@ private static class RequestExecution<Context> {
public final List<Context> contexts;
public final CompletionStage<BulkResponse> futureResponse;

RequestExecution(long id, BulkRequest request, List<Context> contexts, CompletionStage<BulkResponse> futureResponse) {
RequestExecution(long id, BulkRequest request, List<Context> contexts,
CompletionStage<BulkResponse> futureResponse) {
this.id = id;
this.request = request;
this.contexts = contexts;
Expand All @@ -99,27 +101,25 @@ private BulkIngester(Builder<Context> builder) {
this.maxOperations = builder.bulkOperations < 0 ? Integer.MAX_VALUE : builder.bulkOperations;
this.listener = builder.listener;
this.flushIntervalMillis = builder.flushIntervalMillis;

if (flushIntervalMillis != null) {
long flushInterval = flushIntervalMillis;

if (flushIntervalMillis != null || listener != null) {
// Create a scheduler if needed
ScheduledExecutorService scheduler;
if (builder.scheduler == null) {
scheduler = Executors.newSingleThreadScheduledExecutor((r) -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("bulk-ingester-flusher#" + ingesterId);
t.setDaemon(true);
return t;
});

// Keep it, we'll have to close it.
this.scheduler = scheduler;
this.scheduler = Executors.newScheduledThreadPool(maxRequests + 1, (r) -> {
l-trotta marked this conversation as resolved.
Show resolved Hide resolved
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("bulk-ingester-executor#" + ingesterId + "#" + t.getId());
t.setDaemon(true);
return t;
});
} else {
// It's not ours, we will not close it.
scheduler = builder.scheduler;
this.scheduler = builder.scheduler;
this.isExternalScheduler = true;
}

}

if (flushIntervalMillis != null) {
long flushInterval = flushIntervalMillis;
this.flushTask = scheduler.scheduleWithFixedDelay(
this::failsafeFlush,
flushInterval, flushInterval,
Expand Down Expand Up @@ -221,7 +221,7 @@ public long requestCount() {
* @see Builder#maxConcurrentRequests
*/
public long requestContentionsCount() {
return this.sendRequestCondition.contentions();
return this.sendRequestCondition.contentions();
}

//----- Predicates for the condition variables
Expand Down Expand Up @@ -265,7 +265,7 @@ private BulkRequest.Builder newRequest() {
private void failsafeFlush() {
try {
flush();
} catch(Throwable thr) {
} catch (Throwable thr) {
// Log the error and continue
logger.error("Error in background flush", thr);
}
Expand All @@ -280,7 +280,8 @@ public void flush() {
() -> {
// Build the request
BulkRequest request = newRequest().operations(operations).build();
List<Context> requestContexts = contexts == null ? Collections.nCopies(operations.size(), null) : contexts;
List<Context> requestContexts = contexts == null ? Collections.nCopies(operations.size(),
null) : contexts;

// Prepare for next round
operations = new ArrayList<>();
Expand All @@ -291,7 +292,8 @@ public void flush() {
long id = sendRequestCondition.invocations();

if (listener != null) {
listener.beforeBulk(id, request, requestContexts);
BulkRequest finalRequest = request;
scheduler.submit(() -> listener.beforeBulk(id, finalRequest, requestContexts));
}

CompletionStage<BulkResponse> result = client.bulk(request);
Expand All @@ -303,7 +305,7 @@ public void flush() {
}

return new RequestExecution<>(id, request, requestContexts, result);
});
});

if (exec != null) {
// A request was actually sent
Expand All @@ -317,12 +319,14 @@ public void flush() {
if (resp != null) {
// Success
if (listener != null) {
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
exec.contexts, resp));
}
} else {
// Failure
if (listener != null) {
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
exec.contexts, thr));
}
}
return null;
Expand Down Expand Up @@ -383,13 +387,14 @@ public void close() {
// Flush buffered operations
flush();
// and wait for all requests to be completed
closeCondition.whenReady(() -> {});
closeCondition.whenReady(() -> {
});

if (flushTask != null) {
flushTask.cancel(false);
}

if (scheduler != null) {
if (scheduler != null && !isExternalScheduler) {
scheduler.shutdownNow();
}
}
Expand All @@ -404,7 +409,7 @@ public static class Builder<Context> implements ObjectBuilder<BulkIngester<Conte
private ElasticsearchAsyncClient client;
private BulkRequest globalSettings;
private int bulkOperations = 1000;
private long bulkSize = 5*1024*1024;
private long bulkSize = 5 * 1024 * 1024;
private int maxConcurrentRequests = 1;
private Long flushIntervalMillis;
private BulkListener<Context> listener;
Expand Down Expand Up @@ -438,7 +443,8 @@ public Builder<Context> maxOperations(int count) {
}

/**
* Sets when to flush a new bulk request based on the size in bytes of actions currently added. A request is sent
* Sets when to flush a new bulk request based on the size in bytes of actions currently added. A
* request is sent
* once that size has been exceeded. Defaults to 5 megabytes. Can be set to {@code -1} to disable it.
*
* @throws IllegalArgumentException if less than -1.
Expand All @@ -452,7 +458,8 @@ public Builder<Context> maxSize(long bytes) {
}

/**
* Sets the number of concurrent requests allowed to be executed. A value of 1 means 1 request is allowed to be executed
* Sets the number of concurrent requests allowed to be executed. A value of 1 means 1 request is
* allowed to be executed
* while accumulating new bulk requests. Defaults to {@code 1}.
*
* @throws IllegalArgumentException if less than 1.
Expand All @@ -468,7 +475,8 @@ public Builder<Context> maxConcurrentRequests(int max) {
/**
* Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set.
* <p>
* Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
* Flushing is still subject to the maximum number of requests set with
* {@link #maxConcurrentRequests}.
*
* @throws IllegalArgumentException if not a positive duration.
*/
Expand All @@ -483,13 +491,25 @@ public Builder<Context> flushInterval(long value, TimeUnit unit) {
/**
* Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set.
* <p>
* Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
* Flushing is still subject to the maximum number of requests set with
* {@link #maxConcurrentRequests}.
* @deprecated use {@link #scheduler(ScheduledExecutorService)}
*/
@Deprecated
public Builder<Context> flushInterval(long value, TimeUnit unit, ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
return flushInterval(value, unit);
}

/**
* Sets a custom scheduler to run the flush thread and the listener logic. A default one is used if
* not set.
*/
public Builder<Context> scheduler(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
return this;
}

public Builder<Context> listener(BulkListener<Context> listener) {
this.listener = listener;
return this;
Expand Down Expand Up @@ -518,7 +538,8 @@ public Builder<Context> globalSettings(Function<BulkRequest.Builder, BulkRequest
@Override
public BulkIngester<Context> build() {
// Ensure some chunking criteria are defined
boolean hasCriteria = this.bulkOperations >= 0 || this.bulkSize >= 0 || this.flushIntervalMillis != null;
boolean hasCriteria =
this.bulkOperations >= 0 || this.bulkSize >= 0 || this.flushIntervalMillis != null;

if (!hasCriteria) {
throw new IllegalStateException("No bulk operation chunking criteria have been set.");
Expand Down
Loading