diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index 6dc05fa8fe843..786fabca78d63 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -72,7 +72,6 @@ private static void executeInSortOrder(SearchContext context, BucketCollector co searcher.setProfiler(context); try { searcher.search(context.rewrittenQuery(), collector); - collector.postCollection(); } catch (IOException e) { throw new AggregationExecutionException("Could not perform time series aggregation", e); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java index 93e44c0343d88..e7db34fb61969 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java @@ -66,8 +66,8 @@ public class GlobalOrdCardinalityAggregator extends NumericMetricsAggregator.Sin // Build at post-collection phase @Nullable private HyperLogLogPlusPlusSparse counts; - private SortedSetDocValues values; private ObjectArray visitedOrds; + private SortedSetDocValues values; public GlobalOrdCardinalityAggregator( String name, @@ -105,16 +105,23 @@ public ScoreMode scoreMode() { private class CompetitiveIterator extends DocIdSetIterator { private final BitArray visitedOrds; + private final SortedSetDocValues values; private long numNonVisitedOrds; private final TermsEnum indexTerms; private final DocIdSetIterator docsWithField; - CompetitiveIterator(int numNonVisitedOrds, BitArray visitedOrds, Terms indexTerms, DocIdSetIterator docsWithField) - throws IOException { + CompetitiveIterator( + SortedSetDocValues values, + int numNonVisitedOrds, + BitArray visitedOrds, + Terms indexTerms, + DocIdSetIterator docsWithField + ) throws IOException { this.visitedOrds = visitedOrds; this.numNonVisitedOrds = numNonVisitedOrds; this.indexTerms = Objects.requireNonNull(indexTerms).iterator(); this.docsWithField = docsWithField; + this.values = values; } private Map nonVisitedOrds; @@ -211,6 +218,7 @@ public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, if (maxOrd <= MAX_FIELD_CARDINALITY_FOR_DYNAMIC_PRUNING || numNonVisitedOrds <= MAX_TERMS_FOR_DYNAMIC_PRUNING) { dynamicPruningAttempts++; return new LeafBucketCollector() { + final SortedSetDocValues docValues = valuesSource.globalOrdinalsValues(aggCtx.getLeafReaderContext()); final BitArray bits; final CompetitiveIterator competitiveIterator; @@ -226,7 +234,7 @@ public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, } this.bits = bits; final DocIdSetIterator docsWithField = valuesSource.ordinalsValues(aggCtx.getLeafReaderContext()); - competitiveIterator = new CompetitiveIterator(numNonVisitedOrds, bits, indexTerms, docsWithField); + competitiveIterator = new CompetitiveIterator(docValues, numNonVisitedOrds, bits, indexTerms, docsWithField); if (numNonVisitedOrds <= MAX_TERMS_FOR_DYNAMIC_PRUNING) { competitiveIterator.startPruning(); } @@ -234,8 +242,8 @@ public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, @Override public void collect(int doc, long bucketOrd) throws IOException { - if (values.advanceExact(doc)) { - for (long ord = values.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = values.nextOrd()) { + if (docValues.advanceExact(doc)) { + for (long ord = docValues.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = docValues.nextOrd()) { if (bits.getAndSet(ord) == false) { competitiveIterator.onVisitedOrdinal(ord); } @@ -267,6 +275,8 @@ public CompetitiveIterator competitiveIterator() { bruteForce++; return new LeafBucketCollector() { + final SortedSetDocValues docValues = valuesSource.globalOrdinalsValues(aggCtx.getLeafReaderContext()); + @Override public void collect(int doc, long bucketOrd) throws IOException { visitedOrds = bigArrays.grow(visitedOrds, bucketOrd + 1); @@ -275,8 +285,8 @@ public void collect(int doc, long bucketOrd) throws IOException { bits = new BitArray(maxOrd, bigArrays); visitedOrds.set(bucketOrd, bits); } - if (values.advanceExact(doc)) { - for (long ord = values.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = values.nextOrd()) { + if (docValues.advanceExact(doc)) { + for (long ord = docValues.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = docValues.nextOrd()) { bits.set((int) ord); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java index 16efc62f2704f..375ccd127dc9e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java @@ -95,11 +95,13 @@ public void search(Query query, BucketCollector bucketCollector) throws IOExcept Weight weight = searcher.createWeight(query, bucketCollector.scoreMode(), 1); if (searcher.getExecutor() == null) { search(bucketCollector, weight); + bucketCollector.postCollection(); return; } // offload to the search worker thread pool whenever possible. It will be null only when search.worker_threads_enabled is false RunnableFuture task = new FutureTask<>(() -> { search(bucketCollector, weight); + bucketCollector.postCollection(); return null; }); searcher.getExecutor().execute(task);