From 2fc260c41fef1d89162eda947af9fd1abd2799a3 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Mon, 14 Oct 2019 16:56:40 +0800 Subject: [PATCH 1/7] optimize composite aggregation by index sorting --- .../bucket/composite/BinaryValuesSource.java | 14 ++- .../bucket/composite/CompositeAggregator.java | 77 +++++++++++- .../CompositeValuesCollectorQueue.java | 28 ++++- .../bucket/composite/DoubleValuesSource.java | 7 +- .../composite/GlobalOrdinalValuesSource.java | 16 ++- .../bucket/composite/LongValuesSource.java | 11 +- .../composite/PointsSortedDocsProducer.java | 51 ++++++-- .../SingleDimensionValuesSource.java | 9 +- .../bucket/composite/SortedDocsProducer.java | 5 + .../composite/TermsSortedDocsProducer.java | 21 ++++ .../composite/CompositeAggregatorTests.java | 118 ++++++++++++++++++ .../CompositeValuesCollectorQueueTests.java | 4 +- .../SingleDimensionValuesSourceTests.java | 88 ++++++------- 13 files changed, 373 insertions(+), 76 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java index 21346844aac89..00b05aa32e99e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java @@ -187,10 +187,18 @@ public void collect(int doc, long bucket) throws IOException { } @Override - SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) { + boolean canBeOptimizedBySortedDocs(IndexReader reader, Query query) { if (checkIfSortedDocsIsApplicable(reader, fieldType) == false || - fieldType instanceof StringFieldType == false || - (query != null && query.getClass() != MatchAllDocsQuery.class)) { + fieldType instanceof StringFieldType == false || + (query != null && query.getClass() != MatchAllDocsQuery.class)) { + return false; + } + return true; + } + + @Override + SortedDocsProducer createSortedDocsProducerOrNull(Query query) { + if (fieldType == null) { return null; } return new TermsSortedDocsProducer(fieldType.name()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java index 4effb22f30cb2..980c02acc3f40 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -19,19 +19,27 @@ package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.index.DocValues; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.CollectionTerminatedException; -import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; +import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.RoaringDocIdSet; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.index.IndexSortConfig; +import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -72,6 +80,8 @@ final class CompositeAggregator extends BucketsAggregator { private LeafReaderContext currentLeaf; private RoaringDocIdSet.Builder docIdSetBuilder; private BucketCollector deferredCollectors; + private SortField leadingSortField; + private int startDocId = 0; CompositeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, List pipelineAggregators, Map metaData, @@ -93,7 +103,7 @@ final class CompositeAggregator extends BucketsAggregator { this.sources[i] = createValuesSource(context.bigArrays(), context.searcher().getIndexReader(), sourceConfigs[i], size); } this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); - this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query()); + this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.query()); } @Override @@ -153,6 +163,8 @@ private void finishLeaf() { entries.add(new Entry(currentLeaf, docIdSet)); currentLeaf = null; docIdSetBuilder = null; + queue.setEarlyTerminate(false); + startDocId = 0; } } @@ -160,7 +172,8 @@ private void finishLeaf() { protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { finishLeaf(); boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; - if (sortedDocsProducer != null) { + if (sortedDocsProducer != null && + sources[0].canBeOptimizedBySortedDocs(context.searcher().getIndexReader(), context.query())) { /* The producer will visit documents sorted by the leading source of the composite definition and terminates when the leading source value is guaranteed to be greater than the lowest @@ -182,12 +195,38 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket currentLeaf = ctx; docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc()); } + + QueryShardContext shardContext = context.getQueryShardContext(); + IndexSortConfig indexSortConfig = shardContext.getIndexSettings().getIndexSortConfig(); + if (indexSortConfig.hasIndexSort()) { + Sort sort = indexSortConfig.buildIndexSort( + shardContext::fieldMapper, shardContext::getForField); + this.leadingSortField = sort.getSort()[0]; + } + + if (leadingSortField != null && isSingleValued(ctx.reader(), leadingSortField) + && leadingSortField.getField().equals(sourceNames.get(0))) { + queue.setLeadingSort(true); + SingleDimensionValuesSource leadingSource = sources[0]; + int leadingFieldReverse = leadingSortField.getReverse() == false ? 1 : -1; + // if source and leading field have the same order, get start doc id for filtering + if (sortedDocsProducer != null && leadingSource.reverseMul * leadingFieldReverse > 0) { + startDocId = sortedDocsProducer.getStartDocId(queue, ctx); + } + } + final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder)); return new LeafBucketCollector() { @Override public void collect(int doc, long zeroBucket) throws IOException { assert zeroBucket == 0L; - inner.collect(doc); + if (queue.isEarlyTerminate()) { + throw new CollectionTerminatedException(); + } + + if (doc >= startDocId) { + inner.collect(doc); + } } }; } @@ -272,6 +311,14 @@ public void collect(int doc, long zeroBucket) throws IOException { }; } + public CompositeValuesCollectorQueue getQueue() { + return queue; + } + + public int getStartDocId() { + return startDocId; + } + private SingleDimensionValuesSource createValuesSource(BigArrays bigArrays, IndexReader reader, CompositeValuesSourceConfig config, int size) { @@ -357,5 +404,23 @@ private static class Entry { this.docIdSet = docIdSet; } } + + private boolean isSingleValued(IndexReader reader, SortField field) throws IOException { + SortField.Type type = IndexSortConfig.getSortFieldType(field); + for (LeafReaderContext context : reader.leaves()) { + if (type == SortField.Type.STRING) { + final SortedSetDocValues values = DocValues.getSortedSet(context.reader(), field.getField()); + if (values.cost() > 0 && DocValues.unwrapSingleton(values) == null) { + return false; + } + } else { + final SortedNumericDocValues values = DocValues.getSortedNumeric(context.reader(), field.getField()); + if (values.cost() > 0 && DocValues.unwrapSingleton(values) == null) { + return false; + } + } + } + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index 58887d9e6a2dc..00a09b467e3dd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -65,6 +65,8 @@ public int hashCode() { private final SingleDimensionValuesSource[] arrays; private IntArray docCounts; private boolean afterKeyIsSet = false; + private boolean isLeadingSort = false; + private boolean isEarlyTerminate = false; /** * Constructs a composite queue with the specified size and sources. @@ -242,6 +244,18 @@ LeafBucketCollector getLeafCollector(Comparable forceLeadSourceValue, return collector; } + public void setLeadingSort(boolean isLeadingSort) { + this.isLeadingSort = isLeadingSort; + } + + public boolean isEarlyTerminate() { + return isEarlyTerminate; + } + + public void setEarlyTerminate(boolean earlyTerminate) { + this.isEarlyTerminate = earlyTerminate; + } + /** * Check if the current candidate should be added in the queue. * @return The target slot of the candidate or -1 is the candidate is not competitive. @@ -256,12 +270,22 @@ int addIfCompetitive() { } if (afterKeyIsSet && compareCurrentWithAfter() <= 0) { // this key is greater than the top value collected in the previous round, skip it + if (isLeadingSort && size() >= maxSize) { + if (arrays[0].compareCurrentWithAfter() < 0) { + isEarlyTerminate = true; + } + } return -1; } if (size() >= maxSize - // the tree map is full, check if the candidate key should be kept - && compare(CANDIDATE_SLOT, top()) > 0) { + // the tree map is full, check if the candidate key should be kept + && compare(CANDIDATE_SLOT, top()) > 0) { // the candidate key is not competitive, skip it + if (isLeadingSort) { + if (arrays[0].compareCurrent(top()) > 0) { + isEarlyTerminate = true; + } + } return -1; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java index beb66398a6869..945cc5432f0e0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java @@ -183,10 +183,15 @@ public void collect(int doc, long bucket) throws IOException { } @Override - SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) { + SortedDocsProducer createSortedDocsProducerOrNull(Query query) { return null; } + @Override + boolean canBeOptimizedBySortedDocs(IndexReader reader, Query query) { + return false; + } + @Override public void close() { Releasables.close(values, bits); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java index 3d29aee19b166..6ca783ec506a4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java @@ -181,15 +181,23 @@ public void collect(int doc, long bucket) throws IOException { } @Override - SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) { - if (checkIfSortedDocsIsApplicable(reader, fieldType) == false || - fieldType instanceof StringFieldType == false || - (query != null && query.getClass() != MatchAllDocsQuery.class)) { + SortedDocsProducer createSortedDocsProducerOrNull(Query query) { + if (fieldType == null) { return null; } return new TermsSortedDocsProducer(fieldType.name()); } + @Override + boolean canBeOptimizedBySortedDocs(IndexReader reader, Query query) { + if (checkIfSortedDocsIsApplicable(reader, fieldType) == false || + fieldType instanceof StringFieldType == false || + (query != null && query.getClass() != MatchAllDocsQuery.class)) { + return false; + } + return true; + } + @Override public void close() { Releasables.close(values); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java index d71ed3c3bd97d..16d84652015d4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java @@ -232,12 +232,17 @@ private static boolean checkMatchAllOrRangeQuery(Query query, String fieldName) } @Override - SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) { + boolean canBeOptimizedBySortedDocs(IndexReader reader, Query query) { query = extractQuery(query); if (checkIfSortedDocsIsApplicable(reader, fieldType) == false || - checkMatchAllOrRangeQuery(query, fieldType.name()) == false) { - return null; + checkMatchAllOrRangeQuery(query, fieldType.name()) == false) { + return false; } + return true; + } + + @Override + SortedDocsProducer createSortedDocsProducerOrNull(Query query) { final byte[] lowerPoint; final byte[] upperPoint; if (query instanceof PointRangeQuery) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java index d600e0d887c38..66ae0acb62dec 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java @@ -23,6 +23,7 @@ import org.apache.lucene.index.PointValues; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Query; import org.apache.lucene.util.DocIdSetBuilder; import org.apache.lucene.util.FutureArrays; @@ -53,6 +54,36 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, // no value for the field return DocIdSet.EMPTY; } + + DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), values, field) : null; + Visitor visitor = getIntersectVisitor(values, queue, context, builder, false); + try { + values.intersect(visitor); + visitor.flush(); + } catch (CollectionTerminatedException exc) {} + return fillDocIdSet ? builder.build() : DocIdSet.EMPTY; + } + + @Override + int getStartDocId(CompositeValuesCollectorQueue queue, LeafReaderContext context) throws IOException { + final PointValues values = context.reader().getPointValues(field); + if (values == null) { + return 0; + } + DocIdSetBuilder builder = new DocIdSetBuilder(context.reader().maxDoc(), values, field); + Visitor visitor = getIntersectVisitor(values, queue, context, builder, true); + try { + values.intersect(visitor); + } catch (CollectionTerminatedException exc) {} + int docId = builder.build().iterator().nextDoc(); + if (docId != DocIdSetIterator.NO_MORE_DOCS) { + return docId; + } + return 0; + } + + Visitor getIntersectVisitor(PointValues values, CompositeValuesCollectorQueue queue, LeafReaderContext context, + DocIdSetBuilder builder, boolean terminateAfterStartDoc) throws IOException { long lowerBucket = Long.MIN_VALUE; Comparable lowerValue = queue.getLowerValueLeadSource(); if (lowerValue != null) { @@ -70,16 +101,11 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, } upperBucket = (Long) upperValue; } - DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), values, field) : null; - Visitor visitor = new Visitor(context, queue, builder, values.getBytesPerDimension(), lowerBucket, upperBucket); - try { - values.intersect(visitor); - visitor.flush(); - } catch (CollectionTerminatedException exc) {} - return fillDocIdSet ? builder.build() : DocIdSet.EMPTY; + + return new Visitor(context, queue, builder, values.getBytesPerDimension(), lowerBucket, upperBucket, terminateAfterStartDoc); } - private class Visitor implements PointValues.IntersectVisitor { + protected class Visitor implements PointValues.IntersectVisitor { final LeafReaderContext context; final CompositeValuesCollectorQueue queue; final DocIdSetBuilder builder; @@ -87,6 +113,7 @@ private class Visitor implements PointValues.IntersectVisitor { final int bytesPerDim; final long lowerBucket; final long upperBucket; + final boolean terminateAfterStartDoc; DocIdSetBuilder bucketDocsBuilder; DocIdSetBuilder.BulkAdder adder; @@ -95,7 +122,7 @@ private class Visitor implements PointValues.IntersectVisitor { boolean first = true; Visitor(LeafReaderContext context, CompositeValuesCollectorQueue queue, DocIdSetBuilder builder, - int bytesPerDim, long lowerBucket, long upperBucket) { + int bytesPerDim, long lowerBucket, long upperBucket, boolean terminateAfterStartDoc) { this.context = context; this.maxDoc = context.reader().maxDoc(); this.queue = queue; @@ -104,6 +131,7 @@ private class Visitor implements PointValues.IntersectVisitor { this.upperBucket = upperBucket; this.bucketDocsBuilder = new DocIdSetBuilder(maxDoc); this.bytesPerDim = bytesPerDim; + this.terminateAfterStartDoc = terminateAfterStartDoc; } @Override @@ -143,6 +171,11 @@ public void visit(int docID, byte[] packedValue) throws IOException { first = false; adder.add(docID); remaining --; + + if (terminateAfterStartDoc) { + builder.grow(1).add(docID); + throw new CollectionTerminatedException(); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java index f49e20e5bd0dc..b3bef0b5ac7aa 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java @@ -143,9 +143,14 @@ abstract LeafBucketCollector getLeafCollector(Comparable value, LeafReaderContext context, LeafBucketCollector next) throws IOException; /** - * Returns a {@link SortedDocsProducer} or null if this source cannot produce sorted docs. + * Returns a {@link SortedDocsProducer}, could be null. */ - abstract SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query); + abstract SortedDocsProducer createSortedDocsProducerOrNull(Query query); + + /** + * Check if this source could produce sorted docs, then could optimize the query. + */ + abstract boolean canBeOptimizedBySortedDocs(IndexReader reader, Query query); /** * Returns true if a {@link SortedDocsProducer} should be used to optimize the execution. diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java index 63530a4eed6ed..e6970a9229df0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java @@ -105,4 +105,9 @@ public void collect(int doc, long bucket) throws IOException { */ abstract DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReaderContext context, boolean fillDocIdSet) throws IOException; + + /** + * Returns the first doc id based on the lower source field. + */ + abstract int getStartDocId(CompositeValuesCollectorQueue queue, LeafReaderContext context) throws IOException; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java index 2c0c6188f5c07..18b59f1df84b3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Query; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.DocIdSetBuilder; @@ -77,4 +78,24 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, } while (te.next() != null); return fillDocIdSet ? builder.build() : DocIdSet.EMPTY; } + + @Override + int getStartDocId(CompositeValuesCollectorQueue queue, LeafReaderContext context) throws IOException { + final Terms terms = context.reader().terms(field); + if (terms == null) { + return 0; + } + Comparable lowerSource = queue.getLowerValueLeadSource(); + if (lowerSource == null) { + return 0; + } + final TermsEnum te = terms.iterator(); + if (te.seekCeil((BytesRef) lowerSource) != TermsEnum.SeekStatus.END) { + PostingsEnum reuse = te.postings(null, PostingsEnum.NONE); + if (reuse.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + return reuse.docID(); + } + } + return 0; + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index 1520dfde8a116..6b8558ddf0cec 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.bucket.composite; +import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.DoublePoint; import org.apache.lucene.document.Field; @@ -31,17 +32,29 @@ import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.Term; import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.search.SortedSetSortField; +import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.NumericUtils; +import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatters; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.GeoPointFieldMapper; @@ -52,6 +65,7 @@ import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileGridAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; @@ -63,6 +77,7 @@ import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.IndexSettingsModule; import org.junit.After; import org.junit.Before; @@ -83,6 +98,7 @@ import java.util.function.Function; import java.util.function.Supplier; +import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -627,6 +643,92 @@ public void testWithKeywordAndLong() throws Exception { ); } + public void testEarlyTerminateAndStartDocIdFilter() throws Exception { + final List>> dataset = new ArrayList<>(); + dataset.addAll( + Arrays.asList( + createDocument("keyword", "a", "long", 100L, "foo", "bar"), + createDocument("keyword", "c", "long", 100L, "foo", "bar"), + createDocument("keyword", "a", "long", 0L, "foo", "bar"), + createDocument("keyword", "d", "long", 10L, "foo", "bar"), + createDocument("keyword", "b", "long", 10L, "foo", "bar"), + createDocument("keyword", "c", "long", 10L, "foo", "bar") + ) + ); + + final Sort sort = new Sort( + new SortedSetSortField("keyword", false), + new SortedNumericSortField("long", SortField.Type.LONG) + ); + + testSearchCase(Arrays.asList(new TermQuery(new Term("foo", "bar")), + new DocValuesFieldExistsQuery("keyword")), dataset, + () -> new CompositeAggregationBuilder("name", + Arrays.asList( + new TermsValuesSourceBuilder("keyword").field("keyword"), + new TermsValuesSourceBuilder("long").field("long") + ) + ).size(3), + (result) -> { + assertEquals(3, result.getBuckets().size()); + assertEquals("{keyword=b, long=10}", result.afterKey().toString()); + assertEquals("{keyword=a, long=0}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=a, long=100}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(1).getDocCount()); + assertEquals("{keyword=b, long=10}", result.getBuckets().get(2).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(2).getDocCount()); + } + ); + + Query query = new DocValuesFieldExistsQuery("keyword"); + CompositeAggregationBuilder aggregationBuilder = new CompositeAggregationBuilder("name", + Arrays.asList( + new TermsValuesSourceBuilder("keyword").field("keyword"), + new TermsValuesSourceBuilder("long").field("long") + ) + ).aggregateAfter(createAfterKey("keyword", "b", "long", 10L)).size(2); + + IndexSettings indexSettings = createIndexSettings(sort); + try (Directory directory = newDirectory()) { + IndexWriterConfig config = LuceneTestCase.newIndexWriterConfig(random(), new MockAnalyzer(random())); + if (sort != null) { + config.setIndexSort(sort); + config.setCodec(TestUtil.getDefaultCodec()); + } + + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config)) { + Document document = new Document(); + for (Map> fields : dataset) { + addToDocument(document, fields); + indexWriter.addDocument(document); + document.clear(); + } + } + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS); + CompositeAggregator a = createAggregator(query, aggregationBuilder, indexSearcher, + indexSettings, bucketConsumer, FIELD_TYPES); + a.preCollection(); + indexSearcher.search(query, a); + a.postCollection(); + + assertEquals(a.getStartDocId(), 2); + assertTrue(a.getQueue().isEarlyTerminate()); + + final InternalComposite result = (InternalComposite)a.buildAggregation(0L); + assertEquals(2, result.getBuckets().size()); + assertEquals("{keyword=c, long=100}", result.afterKey().toString()); + assertEquals("{keyword=c, long=10}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=c, long=100}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(1).getDocCount()); + } + } + } + public void testWithKeywordAndLongDesc() throws Exception { final List>> dataset = new ArrayList<>(); dataset.addAll( @@ -1853,6 +1955,7 @@ private void executeTestCase(boolean reduced, List>> dataset, Supplier create, Consumer verify) throws IOException { + try (Directory directory = newDirectory()) { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { Document document = new Document(); @@ -1876,6 +1979,21 @@ private void executeTestCase(boolean reduced, } } + private static IndexSettings createIndexSettings(Sort sort) { + Settings.Builder builder = Settings.builder(); + if (sort != null) { + String[] fields = Arrays.stream(sort.getSort()) + .map(SortField::getField) + .toArray(String[]::new); + String[] orders = Arrays.stream(sort.getSort()) + .map((o) -> o.getReverse() ? "desc" : "asc") + .toArray(String[]::new); + builder.putList("index.sort.field", fields); + builder.putList("index.sort.order", orders); + } + return IndexSettingsModule.newIndexSettings(new Index("_index", "0"), builder.build()); + } + private void addToDocument(Document doc, Map> keys) { for (Map.Entry> entry : keys.entrySet()) { final String name = entry.getKey(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java index 6516309de965f..53b4f4d84c6c7 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java @@ -274,7 +274,7 @@ private void testRandomCase(boolean forceMerge, boolean missingBucket, ClassAndN while (pos < size) { final CompositeValuesCollectorQueue queue = new CompositeValuesCollectorQueue(BigArrays.NON_RECYCLING_INSTANCE, sources, size, last); - final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()); + final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(new MatchAllDocsQuery()); for (LeafReaderContext leafReaderContext : reader.leaves()) { final LeafBucketCollector leafCollector = new LeafBucketCollector() { @Override @@ -282,7 +282,7 @@ public void collect(int doc, long bucket) throws IOException { queue.addIfCompetitive(); } }; - if (docsProducer != null && withProducer) { + if (docsProducer != null && sources[0].canBeOptimizedBySortedDocs(reader, new MatchAllDocsQuery()) && withProducer) { assertEquals(DocIdSet.EMPTY, docsProducer.processLeaf(new MatchAllDocsQuery(), queue, leafReaderContext, false)); } else { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java index a34ec7ada20a6..cc9bbdf52fd82 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java @@ -53,12 +53,12 @@ public void testBinarySorted() { 1, 1 ); - assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null)); + assertFalse(source.canBeOptimizedBySortedDocs(mockIndexReader(100, 49), null)); IndexReader reader = mockIndexReader(1, 1); - assertNotNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); - assertNotNull(source.createSortedDocsProducerOrNull(reader, null)); - assertNull(source.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("foo", "bar")))); - assertNull(source.createSortedDocsProducerOrNull(reader, + assertTrue(source.canBeOptimizedBySortedDocs(reader, new MatchAllDocsQuery())); + assertTrue(source.canBeOptimizedBySortedDocs(reader, null)); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new TermQuery(new Term("foo", "bar")))); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new TermQuery(new Term("keyword", "toto)")))); source = new BinaryValuesSource( @@ -71,8 +71,8 @@ public void testBinarySorted() { 1, 1 ); - assertNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); - assertNull(source.createSortedDocsProducerOrNull(reader, null)); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new MatchAllDocsQuery())); + assertFalse(source.canBeOptimizedBySortedDocs(reader, null)); source = new BinaryValuesSource( BigArrays.NON_RECYCLING_INSTANCE, @@ -84,7 +84,7 @@ public void testBinarySorted() { 0, -1 ); - assertNull(source.createSortedDocsProducerOrNull(reader, null)); + assertFalse(source.canBeOptimizedBySortedDocs(reader, null)); MappedFieldType ip = new IpFieldMapper.IpFieldType(); ip.setName("ip"); @@ -97,7 +97,7 @@ public void testBinarySorted() { false, 1, 1); - assertNull(source.createSortedDocsProducerOrNull(reader, null)); + assertFalse(source.canBeOptimizedBySortedDocs(reader, null)); } public void testGlobalOrdinalsSorted() { @@ -111,12 +111,12 @@ public void testGlobalOrdinalsSorted() { 1, 1 ); - assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null)); + assertFalse(source.canBeOptimizedBySortedDocs(mockIndexReader(100, 49), null)); IndexReader reader = mockIndexReader(1, 1); - assertNotNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); - assertNotNull(source.createSortedDocsProducerOrNull(reader, null)); - assertNull(source.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("foo", "bar")))); - assertNull(source.createSortedDocsProducerOrNull(reader, + assertTrue(source.canBeOptimizedBySortedDocs(reader, new MatchAllDocsQuery())); + assertTrue(source.canBeOptimizedBySortedDocs(reader, null)); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new TermQuery(new Term("foo", "bar")))); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new TermQuery(new Term("keyword", "toto)")))); source = new GlobalOrdinalValuesSource( @@ -128,9 +128,9 @@ public void testGlobalOrdinalsSorted() { 1, 1 ); - assertNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); - assertNull(source.createSortedDocsProducerOrNull(reader, null)); - assertNull(source.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("foo", "bar")))); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new MatchAllDocsQuery())); + assertFalse(source.canBeOptimizedBySortedDocs(reader, null)); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new TermQuery(new Term("foo", "bar")))); source = new GlobalOrdinalValuesSource( BigArrays.NON_RECYCLING_INSTANCE, @@ -141,8 +141,8 @@ public void testGlobalOrdinalsSorted() { 1, -1 ); - assertNull(source.createSortedDocsProducerOrNull(reader, null)); - assertNull(source.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("foo", "bar")))); + assertFalse(source.canBeOptimizedBySortedDocs(reader, null)); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new TermQuery(new Term("foo", "bar")))); final MappedFieldType ip = new IpFieldMapper.IpFieldType(); ip.setName("ip"); @@ -155,8 +155,8 @@ public void testGlobalOrdinalsSorted() { 1, 1 ); - assertNull(source.createSortedDocsProducerOrNull(reader, null)); - assertNull(source.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("foo", "bar")))); + assertFalse(source.canBeOptimizedBySortedDocs(reader, null)); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new TermQuery(new Term("foo", "bar")))); } public void testNumericSorted() { @@ -179,19 +179,19 @@ public void testNumericSorted() { 1, 1 ); - assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null)); + assertFalse(source.canBeOptimizedBySortedDocs(mockIndexReader(100, 49), null)); IndexReader reader = mockIndexReader(1, 1); - assertNotNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); - assertNotNull(source.createSortedDocsProducerOrNull(reader, null)); - assertNotNull(source.createSortedDocsProducerOrNull(reader, LongPoint.newRangeQuery("number", 0, 1))); - assertNotNull(source.createSortedDocsProducerOrNull(reader, new IndexOrDocValuesQuery( + assertTrue(source.canBeOptimizedBySortedDocs(reader, new MatchAllDocsQuery())); + assertTrue(source.canBeOptimizedBySortedDocs(reader, null)); + assertTrue(source.canBeOptimizedBySortedDocs(reader, LongPoint.newRangeQuery("number", 0, 1))); + assertTrue(source.canBeOptimizedBySortedDocs(reader, new IndexOrDocValuesQuery( LongPoint.newRangeQuery("number", 0, 1), new MatchAllDocsQuery()))); - assertNotNull(source.createSortedDocsProducerOrNull(reader, new DocValuesFieldExistsQuery("number"))); - assertNotNull(source.createSortedDocsProducerOrNull(reader, + assertTrue(source.canBeOptimizedBySortedDocs(reader, new DocValuesFieldExistsQuery("number"))); + assertTrue(source.canBeOptimizedBySortedDocs(reader, new ConstantScoreQuery(new DocValuesFieldExistsQuery("number")))); - assertNotNull(source.createSortedDocsProducerOrNull(reader, new BoostQuery(new IndexOrDocValuesQuery( + assertTrue(source.canBeOptimizedBySortedDocs(reader, new BoostQuery(new IndexOrDocValuesQuery( LongPoint.newRangeQuery("number", 0, 1), new MatchAllDocsQuery()), 2.0f))); - assertNull(source.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("keyword", "toto)")))); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new TermQuery(new Term("keyword", "toto)")))); LongValuesSource sourceWithMissing = new LongValuesSource( BigArrays.NON_RECYCLING_INSTANCE, @@ -202,11 +202,11 @@ public void testNumericSorted() { true, 1, 1); - assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); - assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, null)); - assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("keyword", "toto)")))); - assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, new DocValuesFieldExistsQuery("number"))); - assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, + assertFalse(sourceWithMissing.canBeOptimizedBySortedDocs(reader, new MatchAllDocsQuery())); + assertFalse(sourceWithMissing.canBeOptimizedBySortedDocs(reader, null)); + assertFalse(sourceWithMissing.canBeOptimizedBySortedDocs(reader, new TermQuery(new Term("keyword", "toto)")))); + assertFalse(sourceWithMissing.canBeOptimizedBySortedDocs(reader, new DocValuesFieldExistsQuery("number"))); + assertFalse(sourceWithMissing.canBeOptimizedBySortedDocs(reader, new ConstantScoreQuery(new DocValuesFieldExistsQuery("number")))); LongValuesSource sourceRev = new LongValuesSource( @@ -219,11 +219,11 @@ public void testNumericSorted() { 1, -1 ); - assertNull(sourceRev.createSortedDocsProducerOrNull(reader, null)); - assertNull(sourceRev.createSortedDocsProducerOrNull(reader, new DocValuesFieldExistsQuery("number"))); - assertNull(sourceRev.createSortedDocsProducerOrNull(reader, + assertFalse(sourceRev.canBeOptimizedBySortedDocs(reader, null)); + assertFalse(sourceRev.canBeOptimizedBySortedDocs(reader, new DocValuesFieldExistsQuery("number"))); + assertFalse(sourceRev.canBeOptimizedBySortedDocs(reader, new ConstantScoreQuery(new DocValuesFieldExistsQuery("number")))); - assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("keyword", "toto)")))); + assertFalse(sourceWithMissing.canBeOptimizedBySortedDocs(reader, new TermQuery(new Term("keyword", "toto)")))); } else if (numberType == NumberFieldMapper.NumberType.HALF_FLOAT || numberType == NumberFieldMapper.NumberType.FLOAT || numberType == NumberFieldMapper.NumberType.DOUBLE) { @@ -237,15 +237,15 @@ public void testNumericSorted() { 1 ); IndexReader reader = mockIndexReader(1, 1); - assertNull(source.createSortedDocsProducerOrNull(reader, null)); - assertNull(source.createSortedDocsProducerOrNull(reader, new DocValuesFieldExistsQuery("number"))); - assertNull(source.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("keyword", "toto)")))); - assertNull(source.createSortedDocsProducerOrNull(reader, + assertFalse(source.canBeOptimizedBySortedDocs(reader, null)); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new DocValuesFieldExistsQuery("number"))); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new TermQuery(new Term("keyword", "toto)")))); + assertFalse(source.canBeOptimizedBySortedDocs(reader, new ConstantScoreQuery(new DocValuesFieldExistsQuery("number")))); } else{ throw new AssertionError ("missing type:" + numberType.typeName()); } - assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null)); + assertFalse(source.canBeOptimizedBySortedDocs(mockIndexReader(100, 49), null)); } } From 85e1c6372a7df53b11e73320cda87b3c06a7f8cd Mon Sep 17 00:00:00 2001 From: danielhuang Date: Mon, 14 Oct 2019 17:32:33 +0800 Subject: [PATCH 2/7] fix long ling issue --- .../bucket/composite/CompositeValuesCollectorQueueTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java index 53b4f4d84c6c7..6d7fc103cfc87 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java @@ -282,7 +282,8 @@ public void collect(int doc, long bucket) throws IOException { queue.addIfCompetitive(); } }; - if (docsProducer != null && sources[0].canBeOptimizedBySortedDocs(reader, new MatchAllDocsQuery()) && withProducer) { + if (docsProducer != null && + sources[0].canBeOptimizedBySortedDocs(reader, new MatchAllDocsQuery()) && withProducer) { assertEquals(DocIdSet.EMPTY, docsProducer.processLeaf(new MatchAllDocsQuery(), queue, leafReaderContext, false)); } else { From 692f03109b26e65c66b706c4c01c47a567e467f0 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Mon, 14 Oct 2019 19:41:49 +0800 Subject: [PATCH 3/7] enhance comment --- .../bucket/composite/SingleDimensionValuesSource.java | 2 +- .../bucket/composite/CompositeAggregatorTests.java | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java index b3bef0b5ac7aa..0288ffe6288d1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java @@ -148,7 +148,7 @@ abstract LeafBucketCollector getLeafCollector(Comparable value, abstract SortedDocsProducer createSortedDocsProducerOrNull(Query query); /** - * Check if this source could produce sorted docs, then could optimize the query. + * Check if this source could produce sorted docs to optimize the execution. */ abstract boolean canBeOptimizedBySortedDocs(IndexReader reader, Query query); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index 6b8558ddf0cec..d7100d557a608 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -661,8 +661,7 @@ public void testEarlyTerminateAndStartDocIdFilter() throws Exception { new SortedNumericSortField("long", SortField.Type.LONG) ); - testSearchCase(Arrays.asList(new TermQuery(new Term("foo", "bar")), - new DocValuesFieldExistsQuery("keyword")), dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( new TermsValuesSourceBuilder("keyword").field("keyword"), @@ -681,7 +680,8 @@ public void testEarlyTerminateAndStartDocIdFilter() throws Exception { } ); - Query query = new DocValuesFieldExistsQuery("keyword"); + // none match all query also could be optimized + Query query = new TermQuery(new Term("foo", "bar")); CompositeAggregationBuilder aggregationBuilder = new CompositeAggregationBuilder("name", Arrays.asList( new TermsValuesSourceBuilder("keyword").field("keyword"), @@ -715,6 +715,7 @@ public void testEarlyTerminateAndStartDocIdFilter() throws Exception { indexSearcher.search(query, a); a.postCollection(); + // check start doc id filtered and early terminated assertEquals(a.getStartDocId(), 2); assertTrue(a.getQueue().isEarlyTerminate()); From f5327c27742f7901b507866a28bf01db94f9397e Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 15 Oct 2019 21:13:30 +0800 Subject: [PATCH 4/7] fix a reset issue --- .../aggregations/bucket/composite/CompositeAggregator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java index 980c02acc3f40..ac6c186e6eb18 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -163,9 +163,9 @@ private void finishLeaf() { entries.add(new Entry(currentLeaf, docIdSet)); currentLeaf = null; docIdSetBuilder = null; - queue.setEarlyTerminate(false); - startDocId = 0; } + queue.setEarlyTerminate(false); + startDocId = 0; } @Override From 2441f6cfb4685daffd08ff3c11edd9ba6af5b9c2 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Wed, 16 Oct 2019 16:03:41 +0800 Subject: [PATCH 5/7] enhance test case --- .../bucket/composite/CompositeAggregatorTests.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index d7100d557a608..3eea9c1484560 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -698,12 +698,14 @@ public void testEarlyTerminateAndStartDocIdFilter() throws Exception { } try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config)) { - Document document = new Document(); + List documents = new ArrayList<>(); for (Map> fields : dataset) { + Document document = new Document(); addToDocument(document, fields); - indexWriter.addDocument(document); - document.clear(); + documents.add(document); } + indexWriter.addDocuments(documents); + indexWriter.commit(); } try (IndexReader indexReader = DirectoryReader.open(directory)) { IndexSearcher indexSearcher = new IndexSearcher(indexReader); @@ -713,11 +715,11 @@ public void testEarlyTerminateAndStartDocIdFilter() throws Exception { indexSettings, bucketConsumer, FIELD_TYPES); a.preCollection(); indexSearcher.search(query, a); - a.postCollection(); // check start doc id filtered and early terminated assertEquals(a.getStartDocId(), 2); assertTrue(a.getQueue().isEarlyTerminate()); + a.postCollection(); final InternalComposite result = (InternalComposite)a.buildAggregation(0L); assertEquals(2, result.getBuckets().size()); From a73ed1c557653b549263c29cd8f6c044dff59fc0 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 22 Oct 2019 15:18:10 +0800 Subject: [PATCH 6/7] add reverse order test case --- .../CompositeValuesCollectorQueue.java | 2 + .../composite/CompositeAggregatorTests.java | 118 ++++++++++++++++-- 2 files changed, 113 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index 00a09b467e3dd..3eb91f1256b47 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -272,6 +272,7 @@ int addIfCompetitive() { // this key is greater than the top value collected in the previous round, skip it if (isLeadingSort && size() >= maxSize) { if (arrays[0].compareCurrentWithAfter() < 0) { + // the leading source field value is greater than after value, early terminate collection isEarlyTerminate = true; } } @@ -283,6 +284,7 @@ && compare(CANDIDATE_SLOT, top()) > 0) { // the candidate key is not competitive, skip it if (isLeadingSort) { if (arrays[0].compareCurrent(top()) > 0) { + // the leading source field value is greater than top value of queue, early terminate collection isEarlyTerminate = true; } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index 3eea9c1484560..269ee2d7c683f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -643,7 +643,7 @@ public void testWithKeywordAndLong() throws Exception { ); } - public void testEarlyTerminateAndStartDocIdFilter() throws Exception { + public void testEarlyTerminateAndStartDocIdFilterWithMatchOrder() throws Exception { final List>> dataset = new ArrayList<>(); dataset.addAll( Arrays.asList( @@ -656,11 +656,6 @@ public void testEarlyTerminateAndStartDocIdFilter() throws Exception { ) ); - final Sort sort = new Sort( - new SortedSetSortField("keyword", false), - new SortedNumericSortField("long", SortField.Type.LONG) - ); - testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -689,7 +684,11 @@ public void testEarlyTerminateAndStartDocIdFilter() throws Exception { ) ).aggregateAfter(createAfterKey("keyword", "b", "long", 10L)).size(2); - IndexSettings indexSettings = createIndexSettings(sort); + final Sort sort = new Sort( + new SortedSetSortField("keyword", false), + new SortedNumericSortField("long", SortField.Type.LONG) + ); + try (Directory directory = newDirectory()) { IndexWriterConfig config = LuceneTestCase.newIndexWriterConfig(random(), new MockAnalyzer(random())); if (sort != null) { @@ -711,6 +710,7 @@ public void testEarlyTerminateAndStartDocIdFilter() throws Exception { IndexSearcher indexSearcher = new IndexSearcher(indexReader); MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS); + IndexSettings indexSettings = createIndexSettings(sort); CompositeAggregator a = createAggregator(query, aggregationBuilder, indexSearcher, indexSettings, bucketConsumer, FIELD_TYPES); a.preCollection(); @@ -732,6 +732,110 @@ public void testEarlyTerminateAndStartDocIdFilter() throws Exception { } } + public void testEarlyTerminateAndStartDocIdFilter() throws Exception { + final List>> dataset = new ArrayList<>(); + dataset.addAll( + Arrays.asList( + createDocument("keyword", "a", "long", 100L, "foo", "bar"), + createDocument("keyword", "c", "long", 100L, "foo", "bar"), + createDocument("keyword", "a", "long", 0L, "foo", "bar"), + createDocument("keyword", "d", "long", 10L, "foo", "bar"), + createDocument("keyword", "b", "long", 10L, "foo", "bar"), + createDocument("keyword", "c", "long", 10L, "foo", "bar"), + createDocument("keyword", "e", "long", 100L, "foo", "bar"), + createDocument("keyword", "e", "long", 10L, "foo", "bar") + ) + ); + + // none match all query also could be optimized + Query query = new TermQuery(new Term("foo", "bar")); + + // index sort config + final Sort sort = new Sort( + new SortedSetSortField("keyword", false), + new SortedNumericSortField("long", SortField.Type.LONG) + ); + IndexWriterConfig config = LuceneTestCase.newIndexWriterConfig(random(), new MockAnalyzer(random())); + config.setIndexSort(sort); + config.setCodec(TestUtil.getDefaultCodec()); + + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config)) { + List documents = new ArrayList<>(); + for (Map> fields : dataset) { + Document document = new Document(); + addToDocument(document, fields); + documents.add(document); + } + indexWriter.addDocuments(documents); + indexWriter.commit(); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS); + IndexSettings indexSettings = createIndexSettings(sort); + + // source field and index sorting config have the same order + CompositeAggregationBuilder aggregationBuilder = new CompositeAggregationBuilder("name", + Arrays.asList( + new TermsValuesSourceBuilder("keyword").field("keyword"), + new TermsValuesSourceBuilder("long").field("long") + ) + ).aggregateAfter(createAfterKey("keyword", "b", "long", 10L)).size(2); + + CompositeAggregator a = createAggregator(query, aggregationBuilder, indexSearcher, + indexSettings, bucketConsumer, FIELD_TYPES); + a.preCollection(); + indexSearcher.search(query, a); + + // check start doc id filtered and early terminated + assertEquals(a.getStartDocId(), 2); + assertTrue(a.getQueue().isEarlyTerminate()); + a.postCollection(); + + // check final result + InternalComposite result = (InternalComposite)a.buildAggregation(0L); + assertEquals(2, result.getBuckets().size()); + assertEquals("{keyword=c, long=100}", result.afterKey().toString()); + assertEquals("{keyword=c, long=10}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=c, long=100}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(1).getDocCount()); + + + // source field and index sorting config have the different order + aggregationBuilder = new CompositeAggregationBuilder("name", + Arrays.asList( + // reverse source order + new TermsValuesSourceBuilder("keyword").field("keyword").order(SortOrder.DESC), + new TermsValuesSourceBuilder("long").field("long").order(SortOrder.DESC) + ) + ).aggregateAfter(createAfterKey("keyword", "c", "long", 10L)).size(2); + + a = createAggregator(query, aggregationBuilder, indexSearcher, + indexSettings, bucketConsumer, FIELD_TYPES); + a.preCollection(); + indexSearcher.search(query, a); + + // different order could not be filtered by start doc id, but still could be early terminated + assertEquals(a.getStartDocId(), 0); + assertTrue(a.getQueue().isEarlyTerminate()); + a.postCollection(); + + // check final result + result = (InternalComposite)a.buildAggregation(0L); + assertEquals(2, result.getBuckets().size()); + assertEquals("{keyword=a, long=100}", result.afterKey().toString()); + assertEquals("{keyword=b, long=10}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=a, long=100}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(1).getDocCount()); + } + } + } + public void testWithKeywordAndLongDesc() throws Exception { final List>> dataset = new ArrayList<>(); dataset.addAll( From 979bd58ac3a7ba971a3c1b70fcc8cea4e373b490 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 22 Oct 2019 15:24:31 +0800 Subject: [PATCH 7/7] remove extra test case --- .../composite/CompositeAggregatorTests.java | 89 ------------------- 1 file changed, 89 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index 269ee2d7c683f..9f8425753a5e0 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -643,95 +643,6 @@ public void testWithKeywordAndLong() throws Exception { ); } - public void testEarlyTerminateAndStartDocIdFilterWithMatchOrder() throws Exception { - final List>> dataset = new ArrayList<>(); - dataset.addAll( - Arrays.asList( - createDocument("keyword", "a", "long", 100L, "foo", "bar"), - createDocument("keyword", "c", "long", 100L, "foo", "bar"), - createDocument("keyword", "a", "long", 0L, "foo", "bar"), - createDocument("keyword", "d", "long", 10L, "foo", "bar"), - createDocument("keyword", "b", "long", 10L, "foo", "bar"), - createDocument("keyword", "c", "long", 10L, "foo", "bar") - ) - ); - - testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, - () -> new CompositeAggregationBuilder("name", - Arrays.asList( - new TermsValuesSourceBuilder("keyword").field("keyword"), - new TermsValuesSourceBuilder("long").field("long") - ) - ).size(3), - (result) -> { - assertEquals(3, result.getBuckets().size()); - assertEquals("{keyword=b, long=10}", result.afterKey().toString()); - assertEquals("{keyword=a, long=0}", result.getBuckets().get(0).getKeyAsString()); - assertEquals(1L, result.getBuckets().get(0).getDocCount()); - assertEquals("{keyword=a, long=100}", result.getBuckets().get(1).getKeyAsString()); - assertEquals(1L, result.getBuckets().get(1).getDocCount()); - assertEquals("{keyword=b, long=10}", result.getBuckets().get(2).getKeyAsString()); - assertEquals(1L, result.getBuckets().get(2).getDocCount()); - } - ); - - // none match all query also could be optimized - Query query = new TermQuery(new Term("foo", "bar")); - CompositeAggregationBuilder aggregationBuilder = new CompositeAggregationBuilder("name", - Arrays.asList( - new TermsValuesSourceBuilder("keyword").field("keyword"), - new TermsValuesSourceBuilder("long").field("long") - ) - ).aggregateAfter(createAfterKey("keyword", "b", "long", 10L)).size(2); - - final Sort sort = new Sort( - new SortedSetSortField("keyword", false), - new SortedNumericSortField("long", SortField.Type.LONG) - ); - - try (Directory directory = newDirectory()) { - IndexWriterConfig config = LuceneTestCase.newIndexWriterConfig(random(), new MockAnalyzer(random())); - if (sort != null) { - config.setIndexSort(sort); - config.setCodec(TestUtil.getDefaultCodec()); - } - - try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config)) { - List documents = new ArrayList<>(); - for (Map> fields : dataset) { - Document document = new Document(); - addToDocument(document, fields); - documents.add(document); - } - indexWriter.addDocuments(documents); - indexWriter.commit(); - } - try (IndexReader indexReader = DirectoryReader.open(directory)) { - IndexSearcher indexSearcher = new IndexSearcher(indexReader); - MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = - new MultiBucketConsumerService.MultiBucketConsumer(DEFAULT_MAX_BUCKETS); - IndexSettings indexSettings = createIndexSettings(sort); - CompositeAggregator a = createAggregator(query, aggregationBuilder, indexSearcher, - indexSettings, bucketConsumer, FIELD_TYPES); - a.preCollection(); - indexSearcher.search(query, a); - - // check start doc id filtered and early terminated - assertEquals(a.getStartDocId(), 2); - assertTrue(a.getQueue().isEarlyTerminate()); - a.postCollection(); - - final InternalComposite result = (InternalComposite)a.buildAggregation(0L); - assertEquals(2, result.getBuckets().size()); - assertEquals("{keyword=c, long=100}", result.afterKey().toString()); - assertEquals("{keyword=c, long=10}", result.getBuckets().get(0).getKeyAsString()); - assertEquals(1L, result.getBuckets().get(0).getDocCount()); - assertEquals("{keyword=c, long=100}", result.getBuckets().get(1).getKeyAsString()); - assertEquals(1L, result.getBuckets().get(1).getDocCount()); - } - } - } - public void testEarlyTerminateAndStartDocIdFilter() throws Exception { final List>> dataset = new ArrayList<>(); dataset.addAll(