Skip to content

Commit

Permalink
Upgrade Elasticsearch 8.x to version 8.15.0. [DOX-412]
Browse files Browse the repository at this point in the history
Incompatible change in elastic/elasticsearch-java#830.

(Totally meaningless release notes entry: "Fixed bug in BulkIngester")

https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.15/release-highlights.html
  • Loading branch information
blackwinter committed Aug 12, 2024
1 parent ca09bd4 commit a49f1c4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ gradle.rootProject {
'commons-cli' : '1.3.1',
'commons-io' : '2.7',
'elasticsearch2' : '2.2.1',
'elasticsearch8' : '8.14.1',
'elasticsearch8' : '8.15.0',
'htsjdk' : '4.0.1',
'jackson' : '2.13.4.2',
'jdk' : '17',
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/hbz/limetrans/ElasticsearchClientV8.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -64,6 +67,7 @@ public class ElasticsearchClientV8 extends ElasticsearchClient { // checkstyle-d

private BulkIngester<Void> mBulkIngester;
private RestClientTransport mTransport;
private ScheduledExecutorService mBulkScheduler;
private co.elastic.clients.elasticsearch.ElasticsearchClient mClient;

public ElasticsearchClientV8(final Settings aSettings) {
Expand All @@ -75,6 +79,7 @@ public ElasticsearchClientV8(final Settings aSettings) {
@Override
public void reset() {
mBulkIngester = null;
mBulkScheduler = null;

super.reset();
}
Expand Down Expand Up @@ -269,8 +274,16 @@ private void addBulk(final Function<BulkOperation.Builder, ObjectBuilder<BulkOpe

@Override
protected void createBulk(final int aBulkActions, final int aBulkRequests) {
mBulkScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("limetrans-bulk-ingester-executor#");
t.setDaemon(true);
return t;
});

mBulkIngester = BulkIngester.of(b -> b
.client(mClient)
.scheduler(mBulkScheduler)
.listener(new ElasticsearchBulkListener(this))
.maxOperations(aBulkActions)
.maxSize(mBulkSizeValue)
Expand All @@ -287,10 +300,13 @@ protected boolean isBulkClosed() {
protected boolean closeBulk() throws InterruptedException {
try {
mBulkIngester.close();
mBulkScheduler.shutdown();
mBulkScheduler.awaitTermination(2, TimeUnit.MINUTES);
return mBulkIngester.pendingRequests() == 0;
}
finally {
mBulkIngester = null;
mBulkScheduler = null;
}
}

Expand Down

0 comments on commit a49f1c4

Please sign in to comment.