Skip to content

Commit

Permalink
OAK-11029: add workaround for elastic/elasticsearch-java#867
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziofortino committed Aug 20, 2024
1 parent e52d92d commit 9c588eb
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -85,6 +87,8 @@ class ElasticBulkProcessorHandler {

protected long totalOperations;

private final ScheduledExecutorService scheduler;

private ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection,
@NotNull String indexName,
@NotNull ElasticIndexDefinition indexDefinition,
Expand All @@ -95,6 +99,13 @@ private ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection
this.indexDefinition = indexDefinition;
this.definitionBuilder = definitionBuilder;
this.waitForESAcknowledgement = waitForESAcknowledgement;
// TODO: workaround for https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
this.scheduler = Executors.newScheduledThreadPool(BULK_PROCESSOR_CONCURRENCY + 1, (r) -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("oak-bulk-ingester#");
t.setDaemon(true);
return t;
});
this.bulkIngester = initBulkIngester();
}

Expand Down Expand Up @@ -150,7 +161,7 @@ private BulkIngester<String> initBulkIngester() {
if (indexDefinition.bulkFlushIntervalMs > 0) {
b = b.flushInterval(indexDefinition.bulkFlushIntervalMs, TimeUnit.MILLISECONDS);
}
return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
return b.scheduler(scheduler).maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
});
}

Expand Down Expand Up @@ -206,6 +217,8 @@ public boolean close() throws IOException {
}
}

scheduler.shutdownNow();

checkFailures();

if (LOG.isTraceEnabled()) {
Expand Down

0 comments on commit 9c588eb

Please sign in to comment.