diff --git a/oak-search-elastic/pom.xml b/oak-search-elastic/pom.xml index 3c6b1d3489e..293faf6ce14 100644 --- a/oak-search-elastic/pom.xml +++ b/oak-search-elastic/pom.xml @@ -33,8 +33,8 @@ Oak Elasticsearch integration subproject - 8.13.2 - 9.10.0 + 8.15.0 + 9.11.1 diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java index 53523403041..02897b558c2 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java @@ -25,6 +25,7 @@ import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection; import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition; import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition; @@ -42,7 +43,9 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; import java.util.concurrent.Phaser; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -50,7 +53,7 @@ class ElasticBulkProcessorHandler { private static final Logger LOG = LoggerFactory.getLogger(ElasticBulkProcessorHandler.class); - private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = Integer.getInteger("oak.failedDocStatusLimit", 10000); + private static final int FAILED_DOC_COUNT_FOR_STATUS_NODE = Integer.getInteger("oak.failedDocStatusLimit", 10000); private static final int BULK_PROCESSOR_CONCURRENCY = Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency", 1); @@ -85,6 +88,9 @@ class ElasticBulkProcessorHandler { protected long totalOperations; + // TODO: workaround for https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed + private final ScheduledExecutorService scheduler; + private ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection, @NotNull String indexName, @NotNull ElasticIndexDefinition indexDefinition, @@ -95,6 +101,13 @@ private ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection this.indexDefinition = indexDefinition; this.definitionBuilder = definitionBuilder; this.waitForESAcknowledgement = waitForESAcknowledgement; + // TODO: workaround for https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed + this.scheduler = Executors.newScheduledThreadPool(BULK_PROCESSOR_CONCURRENCY + 1, (r) -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("oak-bulk-ingester#"); + t.setDaemon(true); + return t; + }); this.bulkIngester = initBulkIngester(); } @@ -150,6 +163,10 @@ private BulkIngester initBulkIngester() { if (indexDefinition.bulkFlushIntervalMs > 0) { b = b.flushInterval(indexDefinition.bulkFlushIntervalMs, TimeUnit.MILLISECONDS); } + + // TODO: workaround for https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed + b = b.scheduler(scheduler); + return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY); }); } @@ -183,35 +200,40 @@ private void add(BulkOperation operation, String context) throws IOException { * @throws IOException if an error happened while processing the bulk requests */ public boolean close() throws IOException { - LOG.trace("Calling close on bulk ingester {}", bulkIngester); - bulkIngester.close(); - LOG.trace("Bulk Ingester {} closed", bulkIngester); + try { + LOG.trace("Calling close on bulk ingester {}", bulkIngester); + bulkIngester.close(); + LOG.trace("Bulk Ingester {} closed", bulkIngester); - // de-register main controller - int phase = phaser.arriveAndDeregister(); + // de-register main controller + int phase = phaser.arriveAndDeregister(); - if (totalOperations == 0) { // no need to invoke phaser await if we already know there were no operations - LOG.debug("No operations executed in this processor. Close immediately"); - return false; - } + if (totalOperations == 0) { // no need to invoke phaser await if we already know there were no operations + LOG.debug("No operations executed in this processor. Close immediately"); + return false; + } - if (waitForESAcknowledgement) { - try { - phaser.awaitAdvanceInterruptibly(phase, indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - LOG.error("Error waiting for bulk requests to return", e); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for bulk processor to close", e); - Thread.currentThread().interrupt(); // restore interrupt status + if (waitForESAcknowledgement) { + try { + phaser.awaitAdvanceInterruptibly(phase, indexDefinition.bulkFlushIntervalMs * 5, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.error("Error waiting for bulk requests to return", e); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for bulk processor to close", e); + Thread.currentThread().interrupt(); // restore interrupt status + } } - } - checkFailures(); + checkFailures(); - if (LOG.isTraceEnabled()) { - LOG.trace("Bulk identifier -> update status = {}", updatesMap); + if (LOG.isTraceEnabled()) { + LOG.trace("Bulk identifier -> update status = {}", updatesMap); + } + return updatesMap.containsValue(Boolean.TRUE); + } finally { + // TODO: workaround for https://github.com/elastic/elasticsearch-java/pull/867 remove when fixed + new ExecutorCloser(scheduler).close(); } - return updatesMap.containsValue(Boolean.TRUE); } private class OakBulkListener implements BulkListener { diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java index ef93868b407..33743639a45 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/util/TermQueryBuilderFactory.java @@ -70,27 +70,27 @@ public static Query newDepthQuery(String path, FulltextIndexPlanner.PlanResult p int depth = PathUtils.getDepth(path) + planResult.getParentDepth() + 1; return Query.of(q -> q.term(t -> t.field(PATH_DEPTH).value(v->v.longValue(depth)))); } - + private static Query newRangeQuery(String field, R first, R last, boolean firstIncluding, - boolean lastIncluding) { + boolean lastIncluding) { - return Query.of(fn -> fn.range(fnr -> { + return Query.of(fn -> fn.range(fnr -> fnr.date(date -> { if (first != null) { if (firstIncluding) { - fnr.gte(JsonData.of(first)); + date.gte(first.toString()); } else { - fnr.gt(JsonData.of(first)); + date.gt(first.toString()); } } if (last != null) { if (lastIncluding) { - fnr.lte(JsonData.of(last)); + date.lte(last.toString()); } else { - fnr.lt(JsonData.of(last)); + date.lt(last.toString()); } } - return fnr.field(field); - })); + return date.field(field); + }))); } private static FieldValue toFieldValue(R value) { diff --git a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java index 43d2157d833..1d42f8cfa9b 100644 --- a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java +++ b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticTestServer.java @@ -49,9 +49,9 @@ public class ElasticTestServer implements AutoCloseable { "8.7.1.0", "80c8d34334b0cf4def79835ea6dab78b59ba9ee54c8f5f3cba0bde53123d7820", "8.10.4.0", "b2ae8faf1e272319594b4d47a72580fa4f61a5c11cbc8d3f13453fd34b153441", "8.11.0.0", "8d4d80b850c4da4da6dfe2d675b2e2355d2014307f8bdc54cc1b34323c81c7ae", - "8.11.1.0", "a00a920d4bc29f0deacde7c2ef3d3f70692b00b62bf7fb82b0fe18eeb1dafee9", "8.11.3.0", "1f14b496baf973fb5c64e77fc458d9814dd6905170d7b15350f9f1a009824f41", - "8.13.2.0", "586f553b109266d7996265f3f34a20914b569d494b49da2c0534428770e551f0"); + "8.13.2.0", "586f553b109266d7996265f3f34a20914b569d494b49da2c0534428770e551f0", + "8.15.0.0", "6cbb54d68d654a3476df0b730856cfa3194bce5c6e1050a35e7a86ffec8a3e20"); private static final ElasticTestServer SERVER = new ElasticTestServer(); private static volatile ElasticsearchContainer CONTAINER;