Skip to content

Commit

Permalink
OAK-11042: bump elastic 8.15.0 / lucene 9.11.1 (#1658)
Browse files Browse the repository at this point in the history
* OAK-11042: bump elastic 8.15.0 / lucene 9.11.1

* OAK-11029: add workaround for elastic/elasticsearch-java#867

* OAK-11029: better doc for workaround

* OAK-11029: improve bulkIngester#close

* OAK-11029: missing static modifier for ElasticBulkProcessorHandler#FAILED_DOC_COUNT_FOR_STATUS_NODE
  • Loading branch information
fabriziofortino authored Aug 22, 2024
1 parent be5b9e6 commit 12f787f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 36 deletions.
4 changes: 2 additions & 2 deletions oak-search-elastic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
<description>Oak Elasticsearch integration subproject</description>

<properties>
<elasticsearch.java.client.version>8.13.2</elasticsearch.java.client.version>
<lucene.version>9.10.0</lucene.version>
<elasticsearch.java.client.version>8.15.0</elasticsearch.java.client.version>
<lucene.version>9.11.1</lucene.version>
</properties>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
Expand All @@ -42,15 +43,17 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

class ElasticBulkProcessorHandler {

private static final Logger LOG = LoggerFactory.getLogger(ElasticBulkProcessorHandler.class);
private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = Integer.getInteger("oak.failedDocStatusLimit", 10000);
private static final int FAILED_DOC_COUNT_FOR_STATUS_NODE = Integer.getInteger("oak.failedDocStatusLimit", 10000);

private static final int BULK_PROCESSOR_CONCURRENCY =
Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency", 1);
Expand Down Expand Up @@ -85,6 +88,9 @@ class ElasticBulkProcessorHandler {

protected long totalOperations;

// TODO: workaround for https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
private final ScheduledExecutorService scheduler;

private ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection,
@NotNull String indexName,
@NotNull ElasticIndexDefinition indexDefinition,
Expand All @@ -95,6 +101,13 @@ private ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection
this.indexDefinition = indexDefinition;
this.definitionBuilder = definitionBuilder;
this.waitForESAcknowledgement = waitForESAcknowledgement;
// TODO: workaround for https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
this.scheduler = Executors.newScheduledThreadPool(BULK_PROCESSOR_CONCURRENCY + 1, (r) -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("oak-bulk-ingester#");
t.setDaemon(true);
return t;
});
this.bulkIngester = initBulkIngester();
}

Expand Down Expand Up @@ -150,6 +163,10 @@ private BulkIngester<String> initBulkIngester() {
if (indexDefinition.bulkFlushIntervalMs > 0) {
b = b.flushInterval(indexDefinition.bulkFlushIntervalMs, TimeUnit.MILLISECONDS);
}

// TODO: workaround for https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
b = b.scheduler(scheduler);

return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
});
}
Expand Down Expand Up @@ -183,35 +200,40 @@ private void add(BulkOperation operation, String context) throws IOException {
* @throws IOException if an error happened while processing the bulk requests
*/
public boolean close() throws IOException {
LOG.trace("Calling close on bulk ingester {}", bulkIngester);
bulkIngester.close();
LOG.trace("Bulk Ingester {} closed", bulkIngester);
try {
LOG.trace("Calling close on bulk ingester {}", bulkIngester);
bulkIngester.close();
LOG.trace("Bulk Ingester {} closed", bulkIngester);

// de-register main controller
int phase = phaser.arriveAndDeregister();
// de-register main controller
int phase = phaser.arriveAndDeregister();

if (totalOperations == 0) { // no need to invoke phaser await if we already know there were no operations
LOG.debug("No operations executed in this processor. Close immediately");
return false;
}
if (totalOperations == 0) { // no need to invoke phaser await if we already know there were no operations
LOG.debug("No operations executed in this processor. Close immediately");
return false;
}

if (waitForESAcknowledgement) {
try {
phaser.awaitAdvanceInterruptibly(phase, indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOG.error("Error waiting for bulk requests to return", e);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for bulk processor to close", e);
Thread.currentThread().interrupt(); // restore interrupt status
if (waitForESAcknowledgement) {
try {
phaser.awaitAdvanceInterruptibly(phase, indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOG.error("Error waiting for bulk requests to return", e);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for bulk processor to close", e);
Thread.currentThread().interrupt(); // restore interrupt status
}
}
}

checkFailures();
checkFailures();

if (LOG.isTraceEnabled()) {
LOG.trace("Bulk identifier -> update status = {}", updatesMap);
if (LOG.isTraceEnabled()) {
LOG.trace("Bulk identifier -> update status = {}", updatesMap);
}
return updatesMap.containsValue(Boolean.TRUE);
} finally {
// TODO: workaround for https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed
new ExecutorCloser(scheduler).close();
}
return updatesMap.containsValue(Boolean.TRUE);
}

private class OakBulkListener implements BulkListener<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,27 @@ public static Query newDepthQuery(String path, FulltextIndexPlanner.PlanResult p
int depth = PathUtils.getDepth(path) + planResult.getParentDepth() + 1;
return Query.of(q -> q.term(t -> t.field(PATH_DEPTH).value(v->v.longValue(depth))));
}

private static <R> Query newRangeQuery(String field, R first, R last, boolean firstIncluding,
boolean lastIncluding) {
boolean lastIncluding) {

return Query.of(fn -> fn.range(fnr -> {
return Query.of(fn -> fn.range(fnr -> fnr.date(date -> {
if (first != null) {
if (firstIncluding) {
fnr.gte(JsonData.of(first));
date.gte(first.toString());
} else {
fnr.gt(JsonData.of(first));
date.gt(first.toString());
}
}
if (last != null) {
if (lastIncluding) {
fnr.lte(JsonData.of(last));
date.lte(last.toString());
} else {
fnr.lt(JsonData.of(last));
date.lt(last.toString());
}
}
return fnr.field(field);
}));
return date.field(field);
})));
}

private static <R> FieldValue toFieldValue(R value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public class ElasticTestServer implements AutoCloseable {
"8.7.1.0", "80c8d34334b0cf4def79835ea6dab78b59ba9ee54c8f5f3cba0bde53123d7820",
"8.10.4.0", "b2ae8faf1e272319594b4d47a72580fa4f61a5c11cbc8d3f13453fd34b153441",
"8.11.0.0", "8d4d80b850c4da4da6dfe2d675b2e2355d2014307f8bdc54cc1b34323c81c7ae",
"8.11.1.0", "a00a920d4bc29f0deacde7c2ef3d3f70692b00b62bf7fb82b0fe18eeb1dafee9",
"8.11.3.0", "1f14b496baf973fb5c64e77fc458d9814dd6905170d7b15350f9f1a009824f41",
"8.13.2.0", "586f553b109266d7996265f3f34a20914b569d494b49da2c0534428770e551f0");
"8.13.2.0", "586f553b109266d7996265f3f34a20914b569d494b49da2c0534428770e551f0",
"8.15.0.0", "6cbb54d68d654a3476df0b730856cfa3194bce5c6e1050a35e7a86ffec8a3e20");

private static final ElasticTestServer SERVER = new ElasticTestServer();
private static volatile ElasticsearchContainer CONTAINER;
Expand Down

0 comments on commit 12f787f

Please sign in to comment.