diff --git a/modules/lang-painless/src/yamlRestTest/resources/rest-api-spec/test/painless/100_terms_agg.yml b/modules/lang-painless/src/yamlRestTest/resources/rest-api-spec/test/painless/100_terms_agg.yml index aa01647811c83..d97a5b0adc77b 100644 --- a/modules/lang-painless/src/yamlRestTest/resources/rest-api-spec/test/painless/100_terms_agg.yml +++ b/modules/lang-painless/src/yamlRestTest/resources/rest-api-spec/test/painless/100_terms_agg.yml @@ -66,6 +66,24 @@ setup: - is_false: aggregations.str_terms.buckets.1.key_as_string - match: { aggregations.str_terms.buckets.1.doc_count: 1 } +--- +"Global String Value Script with doc notation": + + - do: + search: + rest_total_hits_as_int: true + body: { "size" : 0, "aggs" : { "str_terms" : { "terms" : { "field" : "str","execution_hint": "global_ordinals", "script": { "source": "return doc.str[0] + \"1\""} } } } } + + - match: { hits.total: 3 } + + - length: { aggregations.str_terms.buckets: 2 } + - match: { aggregations.str_terms.buckets.0.key: "abc1" } + - is_false: aggregations.str_terms.buckets.0.key_as_string + - match: { aggregations.str_terms.buckets.0.doc_count: 2 } + - match: { aggregations.str_terms.buckets.1.key: "bcd1" } + - is_false: aggregations.str_terms.buckets.1.key_as_string + - match: { aggregations.str_terms.buckets.1.doc_count: 1 } + --- "Long Value Script with doc notation": diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregator.java index 5f7b4b5a91e44..f949ba6ac6d43 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregator.java @@ -31,8 +31,13 @@ package org.opensearch.search.aggregations.bucket.missing; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.FieldInfos; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.Weight; import org.opensearch.index.fielddata.DocValueBits; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.CardinalityUpperBound; @@ -46,7 +51,11 @@ import org.opensearch.search.internal.SearchContext; import java.io.IOException; +import java.util.HashSet; import java.util.Map; +import java.util.Set; + +import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_DOCS; /** * Aggregate all docs that are missing a value. @@ -55,7 +64,10 @@ */ public class MissingAggregator extends BucketsAggregator implements SingleBucketAggregator { + private Weight weight; private final ValuesSource valuesSource; + protected final String fieldName; + private final ValuesSourceConfig valuesSourceConfig; public MissingAggregator( String name, @@ -69,6 +81,16 @@ public MissingAggregator( super(name, factories, aggregationContext, parent, cardinality, metadata); // TODO: Stop using nulls here this.valuesSource = valuesSourceConfig.hasValues() ? valuesSourceConfig.getValuesSource() : null; + if (this.valuesSource != null) { + this.fieldName = valuesSource.getIndexFieldName(); + } else { + this.fieldName = null; + } + this.valuesSourceConfig = valuesSourceConfig; + } + + public void setWeight(Weight weight) { + this.weight = weight; } @Override @@ -94,6 +116,58 @@ public void collect(int doc, long bucket) throws IOException { }; } + @Override + protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException { + // The optimization does not work when there are subaggregations. + if (subAggregators.length > 0) { + return false; + } + + // When fieldname does not exist, we cannot collect through the precomputation. + if (fieldName == null || weight == null) { + return false; + } + + // we do not collect any documents through the missing aggregation when the missing parameter + // is up. + if (valuesSourceConfig != null && valuesSourceConfig.missing() != null) { + return true; + } + + // The optimization could only be used if there are no deleted documents and the top-level + // query matches all documents in the segment. + if (weight.count(ctx) == 0) { + return true; + } else if (weight.count(ctx) != ctx.reader().maxDoc()) { + return false; + } + + Set indexedFields = new HashSet<>(FieldInfos.getIndexedFields(ctx.reader())); + + // This will only work if the field name is indexed because otherwise, the reader would not + // have kept track of the doc count of the fieldname. There is a case where a field might be nonexistent + // but still can be calculated. + if (indexedFields.contains(fieldName) == false && ctx.reader().getFieldInfos().fieldInfo(fieldName) != null) { + return false; + } + + NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); + if (docCountValues.nextDoc() != NO_MORE_DOCS) { + // This segment has at least one document with the _doc_count field. + return false; + } + + long docCountWithFieldName = ctx.reader().getDocCount(fieldName); + int totalDocCount = ctx.reader().maxDoc(); + + // The missing aggregation bucket will count the number of documents where the field name is + // either null or not present in that document. We are subtracting the documents where the field + // value is valid. + incrementBucketDocCount(0, totalDocCount - docCountWithFieldName); + + return true; + } + @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { return buildAggregationsForSingleBucket( diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 686e04590f7de..b45493643601e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -73,6 +73,7 @@ import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import org.opensearch.search.aggregations.support.ValuesSource; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.startree.StarTreeQueryHelper; import org.opensearch.search.startree.filter.DimensionFilter; @@ -107,6 +108,18 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr private final SetOnce dvs = new SetOnce<>(); protected int segmentsWithSingleValuedOrds = 0; protected int segmentsWithMultiValuedOrds = 0; + LongUnaryOperator globalOperator; + private final ValuesSourceConfig config; + + /** + * Lookup global ordinals + * + * @opensearch.internal + */ + public interface GlobalOrdLookupFunction { + BytesRef apply(long ord) throws IOException; + } + protected CardinalityUpperBound cardinalityUpperBound; public GlobalOrdinalsStringTermsAggregator( @@ -124,7 +137,8 @@ public GlobalOrdinalsStringTermsAggregator( SubAggCollectionMode collectionMode, boolean showTermDocCountError, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); this.cardinalityUpperBound = cardinality; @@ -146,9 +160,8 @@ public GlobalOrdinalsStringTermsAggregator( return new DenseGlobalOrds(); }); } - this.fieldName = (valuesSource instanceof ValuesSource.Bytes.WithOrdinals.FieldData) - ? ((ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource).getIndexFieldName() - : null; + this.fieldName = valuesSource.getIndexFieldName(); + this.config = config; } String descriptCollectionStrategy() { @@ -185,6 +198,14 @@ boolean tryCollectFromTermFrequencies(LeafReaderContext ctx, BiConsumer metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { super( name, @@ -499,7 +521,8 @@ static class LowCardinality extends GlobalOrdinalsStringTermsAggregator { collectionMode, showTermDocCountError, CardinalityUpperBound.ONE, - metadata + metadata, + config ); assert factories == null || factories.countAggregators() == 0; this.segmentDocCounts = context.bigArrays().newLongArray(1, true); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index 7fd4e12ad39c4..8c79a297bf05c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -31,8 +31,13 @@ package org.opensearch.search.aggregations.bucket.terms; +import org.apache.lucene.index.DocValues; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.PriorityQueue; @@ -40,6 +45,7 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.LongArray; import org.opensearch.index.fielddata.SortedBinaryDocValues; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -54,6 +60,7 @@ import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import org.opensearch.search.aggregations.support.ValuesSource; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; import java.io.IOException; @@ -65,6 +72,8 @@ import java.util.function.Supplier; import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder; +import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_DOCS; +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; /** * An aggregator of string values that hashes the strings on the fly rather @@ -75,8 +84,11 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { private final CollectorSource collectorSource; private final ResultStrategy resultStrategy; + private Weight weight; private final BytesKeyedBucketOrds bucketOrds; private final IncludeExclude.StringFilter includeExclude; + protected final String fieldName; + private final ValuesSourceConfig config; public MapStringTermsAggregator( String name, @@ -92,13 +104,25 @@ public MapStringTermsAggregator( SubAggCollectionMode collectionMode, boolean showTermDocCountError, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); this.collectorSource = collectorSource; this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. this.includeExclude = includeExclude; bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), cardinality); + if (collectorSource instanceof ValuesSourceCollectorSource) { + ValuesSource valuesCollectorSource = ((ValuesSourceCollectorSource) collectorSource).getValuesSource(); + this.fieldName = valuesCollectorSource.getIndexFieldName(); + } else { + this.fieldName = null; + } + this.config = config; + } + + public void setWeight(Weight weight) { + this.weight = weight; } @Override @@ -130,6 +154,69 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol ); } + @Override + protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException { + // TODO: A note is that in scripted aggregations, the way of collecting from buckets is determined from + // the script aggregator. For now, we will not be able to support the script aggregation. + + // The optimization does not work when there are subaggregations or if there is a filter. + // The query has to be a match all, otherwise + if (subAggregators.length > 0 || includeExclude != null || fieldName == null || weight == null) { + return false; + } + + // The optimization could only be used if there are no deleted documents and the top-level + // query matches all documents in the segment. + if (weight.count(ctx) == 0) { + return true; + } else if (weight.count(ctx) != ctx.reader().maxDoc()) { + return false; + } + + // If the missing property is specified in the builder, and there are documents with the + // field missing, we might not be able to use the index unless there is some way we can + // calculate which ordinal value that missing field is (something I am not sure how to + // do yet). + // Custom scripts cannot be supported because when the aggregation is returned, parts of the custom + // script are not included. See test 'org.opensearch.painless.\ + // LangPainlessClientYamlTestSuiteIT.test {yaml=painless/100_terms_agg/String Value Script with doc notation}' + // for more details on why it cannot be supported. + if ((config != null) + && ((config.missing() != null && ((weight.count(ctx) != ctx.reader().getDocCount(fieldName)))) || (config.script() != null))) { + return false; + } + + Terms stringTerms = ctx.reader().terms(fieldName); + if (stringTerms == null) { + // Field is not indexed. + return false; + } + + NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); + if (docCountValues.nextDoc() != NO_MORE_DOCS) { + // This segment has at least one document with the _doc_count field. + return false; + } + + TermsEnum stringTermsEnum = stringTerms.iterator(); + BytesRef stringTerm = stringTermsEnum.next(); + + // Here, we will iterate over all the terms in the segment and add the counts into the bucket. + while (stringTerm != null) { + long bucketOrdinal = bucketOrds.add(0L, stringTerm); + if (bucketOrdinal < 0) { // already seen + bucketOrdinal = -1 - bucketOrdinal; + } + int docCount = stringTermsEnum.docFreq(); + if (resultStrategy instanceof SignificantTermsResults sigTermsResultStrategy) { + sigTermsResultStrategy.updateSubsetSizes(0L, docCount); + } + incrementBucketDocCount(bucketOrdinal, docCount); + stringTerm = stringTermsEnum.next(); + } + return true; + } + @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { return resultStrategy.buildAggregations(owningBucketOrds); @@ -196,6 +283,10 @@ public boolean needsScores() { return valuesSource.needsScores(); } + public ValuesSource getValuesSource() { + return valuesSource; + } + @Override public LeafBucketCollector getLeafCollector( IncludeExclude.StringFilter includeExclude, @@ -502,6 +593,11 @@ String describe() { return "significant_terms"; } + public void updateSubsetSizes(long owningBucketOrd, int amount) { + subsetSizes = context.bigArrays().grow(subsetSizes, owningBucketOrd + 1); + subsetSizes.increment(owningBucketOrd, amount); + } + @Override LeafBucketCollector wrapCollector(LeafBucketCollector primary) { return new LeafBucketCollectorBase(primary, null) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorFactory.java index b5f3abe89ac59..b546c2469d586 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorFactory.java @@ -97,7 +97,8 @@ public Aggregator build( SearchContext context, Aggregator parent, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { ExecutionMode execution = ExecutionMode.MAP; // TODO global ords not implemented yet, only supports "map" @@ -123,7 +124,8 @@ public Aggregator build( metadata, maxDocCount, precision, - cardinality + cardinality, + config ); } @@ -148,7 +150,8 @@ public Aggregator build( SearchContext context, Aggregator parent, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { if ((includeExclude != null) && (includeExclude.isRegexBased())) { @@ -233,7 +236,8 @@ protected Aggregator doCreateInternal( searchContext, parent, cardinality, - metadata + metadata, + config ); } @@ -263,7 +267,8 @@ Aggregator create( Map metadata, long maxDocCount, double precision, - CardinalityUpperBound cardinality + CardinalityUpperBound cardinality, + ValuesSourceConfig config ) throws IOException { int maxRegexLength = context.getQueryShardContext().getIndexSettings().getMaxRegexLength(); final IncludeExclude.StringFilter filter = includeExclude == null @@ -280,7 +285,8 @@ Aggregator create( metadata, maxDocCount, precision, - cardinality + cardinality, + config ); } @@ -317,7 +323,8 @@ abstract Aggregator create( Map metadata, long maxDocCount, double precision, - CardinalityUpperBound cardinality + CardinalityUpperBound cardinality, + ValuesSourceConfig config ) throws IOException; abstract boolean needsGlobalOrdinals(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorSupplier.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorSupplier.java index 5f8888e2819c2..147d7571c4f16 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorSupplier.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorSupplier.java @@ -36,6 +36,7 @@ import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.ValuesSource; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; import java.io.IOException; @@ -58,6 +59,7 @@ Aggregator build( SearchContext context, Aggregator parent, CardinalityUpperBound carinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java index f6802a58dfed2..cb1eb4b74acb5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java @@ -102,7 +102,8 @@ public Aggregator build( SignificanceHeuristic significanceHeuristic, SignificanceLookup lookup, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { ExecutionMode execution = null; @@ -138,7 +139,8 @@ public Aggregator build( significanceHeuristic, lookup, cardinality, - metadata + metadata, + config ); } }; @@ -164,7 +166,8 @@ public Aggregator build( SignificanceHeuristic significanceHeuristic, SignificanceLookup lookup, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { if ((includeExclude != null) && (includeExclude.isRegexBased())) { @@ -302,7 +305,8 @@ protected Aggregator doCreateInternal( significanceHeuristic, lookup, cardinality, - metadata + metadata, + config ); } @@ -333,7 +337,8 @@ Aggregator create( SignificanceHeuristic significanceHeuristic, SignificanceLookup lookup, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { int maxRegexLength = aggregationContext.getQueryShardContext().getIndexSettings().getMaxRegexLength(); final IncludeExclude.StringFilter filter = includeExclude == null @@ -353,7 +358,8 @@ Aggregator create( SubAggCollectionMode.BREADTH_FIRST, false, cardinality, - metadata + metadata, + config ); } @@ -374,7 +380,8 @@ Aggregator create( SignificanceHeuristic significanceHeuristic, SignificanceLookup lookup, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { int maxRegexLength = aggregationContext.getQueryShardContext().getIndexSettings().getMaxRegexLength(); final IncludeExclude.OrdinalsFilter filter = includeExclude == null @@ -406,7 +413,8 @@ Aggregator create( SubAggCollectionMode.BREADTH_FIRST, false, cardinality, - metadata + metadata, + config ); } }; @@ -444,7 +452,8 @@ abstract Aggregator create( SignificanceHeuristic significanceHeuristic, SignificanceLookup lookup, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException; @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java index a1fa7ab9d061e..e62efe60a47f3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java @@ -37,6 +37,7 @@ import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import org.opensearch.search.aggregations.support.ValuesSource; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; import java.io.IOException; @@ -61,6 +62,7 @@ Aggregator build( SignificanceHeuristic significanceHeuristic, SignificanceLookup lookup, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java index 61a2fd9aae3c6..c2255eb8bdda5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java @@ -168,7 +168,8 @@ protected Aggregator createInternal( SubAggCollectionMode.BREADTH_FIRST, false, cardinality, - metadata + metadata, + null ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java index 6a4443adbb42d..fc2ba70f0ce30 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java @@ -31,13 +31,19 @@ package org.opensearch.search.aggregations.bucket.terms; +import org.apache.lucene.index.DocValues; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.search.Weight; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BytesRefHash; import org.opensearch.common.util.SetBackedScalingCuckooFilter; import org.opensearch.index.fielddata.SortedBinaryDocValues; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -46,6 +52,7 @@ import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.LeafBucketCollectorBase; import org.opensearch.search.aggregations.support.ValuesSource; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; import java.io.IOException; @@ -55,6 +62,7 @@ import java.util.Map; import static java.util.Collections.emptyList; +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; /** * An aggregator that finds "rare" string values (e.g. terms agg that orders ascending) @@ -64,7 +72,10 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator { private final ValuesSource.Bytes valuesSource; private final IncludeExclude.StringFilter filter; + private Weight weight; private final BytesKeyedBucketOrds bucketOrds; + protected final String fieldName; + private final ValuesSourceConfig config; StringRareTermsAggregator( String name, @@ -77,12 +88,19 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator { Map metadata, long maxDocCount, double precision, - CardinalityUpperBound cardinality + CardinalityUpperBound cardinality, + ValuesSourceConfig config ) throws IOException { super(name, factories, context, parent, metadata, maxDocCount, precision, format); this.valuesSource = valuesSource; this.filter = filter; this.bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), cardinality); + this.fieldName = valuesSource.getIndexFieldName(); + this.config = config; + } + + public void setWeight(Weight weight) { + this.weight = weight; } @Override @@ -122,6 +140,57 @@ public void collect(int docId, long owningBucketOrd) throws IOException { }; } + @Override + protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException { + if (subAggregators.length > 0 || filter != null || weight == null) { + // The optimization does not work when there are subaggregations or if there is a filter. + // The query has to be a match all, otherwise + return false; + } + + // If the missing property is specified in the builder, and there are documents with the + // field missing, we might not be able to use the index unless there is some way we can + // calculate which ordinal value that missing field is (something I am not sure how to + // do yet). + if (config != null && config.missing() != null && ((weight.count(ctx) == ctx.reader().getDocCount(fieldName)) == false)) { + return false; + } + + // The optimization could only be used if there are no deleted documents and the top-level + // query matches all documents in the segment. + if (weight.count(ctx) == 0) { + return true; + } else if (weight.count(ctx) != ctx.reader().maxDoc()) { + return false; + } + + Terms stringTerms = ctx.reader().terms(fieldName); + if (stringTerms == null) { + // Field is not indexed. + return false; + } + + NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); + if (docCountValues.nextDoc() != NO_MORE_DOCS) { + // This segment has at least one document with the _doc_count field. + return false; + } + + TermsEnum stringTermsEnum = stringTerms.iterator(); + BytesRef stringTerm = stringTermsEnum.next(); + + // Here, we will iterate over all the terms in the segment and add the counts into the bucket. + while (stringTerm != null) { + long bucketOrdinal = bucketOrds.add(0L, stringTerm); + if (bucketOrdinal < 0) { // already seen + bucketOrdinal = -1 - bucketOrdinal; + } + incrementBucketDocCount(bucketOrdinal, stringTermsEnum.docFreq()); + stringTerm = stringTermsEnum.next(); + } + return true; + } + @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { /* diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index a4d73bfd3e634..7267ad2665679 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -107,7 +107,8 @@ public Aggregator build( SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { ExecutionMode execution = null; if (executionHint != null) { @@ -150,7 +151,8 @@ public Aggregator build( subAggCollectMode, showTermDocCountError, cardinality, - metadata + metadata, + config ); } @@ -178,7 +180,8 @@ public Aggregator build( SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { if ((includeExclude != null) && (includeExclude.isRegexBased())) { @@ -321,7 +324,8 @@ protected Aggregator doCreateInternal( collectMode, showTermDocCountError, cardinality, - metadata + metadata, + config ); } @@ -387,7 +391,8 @@ Aggregator create( SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { int maxRegexLength = context.getQueryShardContext().getIndexSettings().getMaxRegexLength(); final IncludeExclude.StringFilter filter = includeExclude == null @@ -407,7 +412,8 @@ Aggregator create( subAggCollectMode, showTermDocCountError, cardinality, - metadata + metadata, + config ); } }, @@ -427,7 +433,8 @@ Aggregator create( SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException { final long maxOrd = getMaxOrd(valuesSource, context.searcher()); @@ -466,7 +473,8 @@ Aggregator create( false, subAggCollectMode, showTermDocCountError, - metadata + metadata, + config ); } @@ -514,7 +522,8 @@ Aggregator create( subAggCollectMode, showTermDocCountError, cardinality, - metadata + metadata, + config ); } }; @@ -549,7 +558,8 @@ abstract Aggregator create( SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException; @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorSupplier.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorSupplier.java index ea1484c6b9eef..cb53f222ec48c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorSupplier.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorSupplier.java @@ -37,6 +37,7 @@ import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.ValuesSource; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; import java.io.IOException; @@ -62,6 +63,7 @@ Aggregator build( Aggregator.SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, CardinalityUpperBound cardinality, - Map metadata + Map metadata, + ValuesSourceConfig config ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/MissingValues.java b/server/src/main/java/org/opensearch/search/aggregations/support/MissingValues.java index 166334292d438..9595a2b3db723 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/MissingValues.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/MissingValues.java @@ -71,6 +71,11 @@ public SortedBinaryDocValues bytesValues(LeafReaderContext context) throws IOExc public String toString() { return "anon ValuesSource.Bytes of [" + super.toString() + "]"; } + + @Override + public String getIndexFieldName() { + return valuesSource.getIndexFieldName(); + } }; } @@ -149,6 +154,11 @@ public SortedNumericDoubleValues doubleValues(LeafReaderContext context) throws public String toString() { return "anon ValuesSource.Numeric of [" + super.toString() + "]"; } + + @Override + public String getIndexFieldName() { + return valuesSource.getIndexFieldName(); + } }; } @@ -269,6 +279,11 @@ public String toString() { return "anon ValuesSource.Bytes.WithOrdinals of [" + super.toString() + "]"; } + @Override + public String getIndexFieldName() { + return valuesSource.getIndexFieldName(); + } + }; } @@ -460,6 +475,11 @@ public MultiGeoPointValues geoPointValues(LeafReaderContext context) { public String toString() { return "anon ValuesSource.GeoPoint of [" + super.toString() + "]"; } + + @Override + public String getIndexFieldName() { + return valuesSource.getIndexFieldName(); + } }; } @@ -520,6 +540,11 @@ public GeoShapeValue getGeoShapeValues(LeafReaderContext context) { public SortedBinaryDocValues bytesValues(LeafReaderContext context) throws IOException { return replaceMissing(valuesSource.bytesValues(context), new BytesRef(missing.toString())); } + + @Override + public String getIndexFieldName() { + return valuesSource.getIndexFieldName(); + } }; } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java index c0254e5de5048..d9aa1974e2f95 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java @@ -73,6 +73,8 @@ import java.util.function.Function; import java.util.function.LongUnaryOperator; +import static java.util.Collections.sort; + /** * Base class for a ValuesSource; the primitive data for an agg * @@ -113,6 +115,10 @@ public boolean hasGlobalOrdinals() { return false; } + public String getIndexFieldName() { + return null; + } + /** * Range type * @@ -127,6 +133,11 @@ public Range(IndexFieldData indexFieldData, RangeType rangeType) { this.rangeType = rangeType; } + @Override + public String getIndexFieldName() { + return this.indexFieldData.getFieldName(); + } + @Override public SortedBinaryDocValues bytesValues(LeafReaderContext context) { return indexFieldData.load(context).getBytesValues(); @@ -249,6 +260,7 @@ public FieldData(IndexOrdinalsFieldData indexFieldData) { this.indexFieldData = indexFieldData; } + @Override public String getIndexFieldName() { return this.indexFieldData.getFieldName(); } @@ -309,6 +321,11 @@ public SortedBinaryDocValues bytesValues(LeafReaderContext context) { return indexFieldData.load(context).getBytesValues(); } + @Override + public String getIndexFieldName() { + return this.indexFieldData.getFieldName(); + } + } /** @@ -356,6 +373,11 @@ public boolean needsScores() { return script.needs_score(); } + @Override + public String getIndexFieldName() { + return delegate.getIndexFieldName(); + } + @Override public SortedBinaryDocValues bytesValues(LeafReaderContext context) throws IOException { return new BytesValues(delegate.bytesValues(context), script.newInstance(context)); @@ -500,6 +522,11 @@ public boolean needsScores() { return script.needs_score(); } + @Override + public String getIndexFieldName() { + return delegate.getIndexFieldName(); + } + @Override public SortedBinaryDocValues bytesValues(LeafReaderContext context) throws IOException { return new Bytes.WithScript.BytesValues(delegate.bytesValues(context), script.newInstance(context)); @@ -631,6 +658,7 @@ public SortedNumericDoubleValues doubleValues(LeafReaderContext context) { return indexFieldData.load(context).getDoubleValues(); } + @Override public String getIndexFieldName() { return indexFieldData.getFieldName(); } @@ -730,6 +758,11 @@ public Fielddata(IndexGeoPointFieldData indexFieldData) { this.indexFieldData = indexFieldData; } + @Override + public String getIndexFieldName() { + return indexFieldData.getFieldName(); + } + @Override public SortedBinaryDocValues bytesValues(LeafReaderContext context) { return indexFieldData.load(context).getBytesValues(); @@ -802,6 +835,11 @@ public FieldData(AbstractGeoShapeIndexFieldData indexFieldData) { this.indexFieldData = indexFieldData; } + @Override + public String getIndexFieldName() { + return indexFieldData.getFieldName(); + } + /** * Get the current {@link BytesValues}. * diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregatorTests.java index b530dc31d30ed..333e4c19589d4 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregatorTests.java @@ -33,7 +33,12 @@ package org.opensearch.search.aggregations.bucket.missing; import org.apache.lucene.document.BinaryDocValuesField; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexableField; @@ -44,6 +49,7 @@ import org.apache.lucene.util.BytesRef; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.settings.Settings; +import org.opensearch.index.mapper.KeywordFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.index.mapper.NumberFieldMapper.NumberType; @@ -56,9 +62,14 @@ import org.opensearch.script.ScriptService; import org.opensearch.script.ScriptType; import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorTestCase; +import org.opensearch.search.aggregations.BucketOrder; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregatorFactory; import org.opensearch.search.aggregations.support.AggregationInspectionHelper; import org.opensearch.search.aggregations.support.CoreValuesSourceType; +import org.opensearch.search.aggregations.support.ValueType; import org.opensearch.search.aggregations.support.ValuesSourceType; import org.opensearch.search.lookup.LeafDocLookup; @@ -95,14 +106,36 @@ public void testMatchNoDocs() throws IOException { final MissingAggregationBuilder builder = new MissingAggregationBuilder("_name").field(fieldType.name()); - testCase(newMatchAllQuery(), builder, writer -> { + // This will be the case where the field data written will be indexed. + CheckedConsumer writeIndexIndexed = (writer -> { + for (int i = 0; i < numDocs; i++) { + final long randomLong = randomLong(); + writer.addDocument( + Set.of( + new SortedNumericDocValuesField(fieldType.name(), randomLong), + new StringField(fieldType.name(), String.valueOf(randomLong), Store.NO) + ) + ); + } + }); + + // The field data was not indexed internally. + CheckedConsumer writeIndexNotIndexed = (writer -> { for (int i = 0; i < numDocs; i++) { writer.addDocument(singleton(new SortedNumericDocValuesField(fieldType.name(), randomLong()))); } - }, internalMissing -> { + }); + + // The precompute optimization kicked in, so no docs were traversed. + testCase(newMatchAllQuery(), builder, writeIndexIndexed, internalMissing -> { assertEquals(0, internalMissing.getDocCount()); assertFalse(AggregationInspectionHelper.hasValue(internalMissing)); - }, singleton(fieldType)); + }, singleton(fieldType), 0); + + testCase(newMatchAllQuery(), builder, writeIndexNotIndexed, internalMissing -> { + assertEquals(0, internalMissing.getDocCount()); + assertFalse(AggregationInspectionHelper.hasValue(internalMissing)); + }, singleton(fieldType), numDocs); } public void testMatchAllDocs() throws IOException { @@ -113,14 +146,97 @@ public void testMatchAllDocs() throws IOException { final MissingAggregationBuilder builder = new MissingAggregationBuilder("_name").field(aggFieldType.name()); - testCase(newMatchAllQuery(), builder, writer -> { + // This will be the case where the field data written will be indexed. + CheckedConsumer writeIndexIndexed = (writer -> { + for (int i = 0; i < numDocs; i++) { + final long randomLong = randomLong(); + writer.addDocument( + Set.of( + new SortedNumericDocValuesField(anotherFieldType.name(), randomLong), + new StringField(anotherFieldType.name(), String.valueOf(randomLong), Store.NO) + ) + ); + } + }); + + // The field data was not indexed internally. + CheckedConsumer writeIndexNotIndexed = (writer -> { for (int i = 0; i < numDocs; i++) { writer.addDocument(singleton(new SortedNumericDocValuesField(anotherFieldType.name(), randomLong()))); } - }, internalMissing -> { + }); + + // The precompute optimization kicked in, so no docs were traversed. + testCase(newMatchAllQuery(), builder, writeIndexIndexed, internalMissing -> { + assertEquals(numDocs, internalMissing.getDocCount()); + assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); + }, List.of(aggFieldType, anotherFieldType), 0); + + // We can use precomputation because we are counting a field that has never been added. + testCase(newMatchAllQuery(), builder, writeIndexNotIndexed, internalMissing -> { assertEquals(numDocs, internalMissing.getDocCount()); assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); - }, List.of(aggFieldType, anotherFieldType)); + }, List.of(aggFieldType, anotherFieldType), 0); + } + + public void testDocValues() throws IOException { + int numDocs = randomIntBetween(10, 200); + final long _doc_count = 5; + + final MappedFieldType aggFieldType = new NumberFieldMapper.NumberFieldType("agg_field", NumberType.LONG); + final MappedFieldType anotherFieldType = new NumberFieldMapper.NumberFieldType("another_field", NumberType.LONG); + + final MissingAggregationBuilder builder = new MissingAggregationBuilder("_name").field(aggFieldType.name()); + + // This will be the case where the field data written will be indexed. + CheckedConsumer writeIndexIndexed = (writer -> { + for (int i = 0; i < numDocs; i++) { + final long randomLong = randomLong(); + writer.addDocument( + Set.of( + new SortedNumericDocValuesField(anotherFieldType.name(), randomLong), + new StringField(anotherFieldType.name(), String.valueOf(randomLong), Store.NO), + new NumericDocValuesField("_doc_count", _doc_count) + ) + ); + } + }); + + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + writeIndexIndexed.accept(indexWriter); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + final IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + final MappedFieldType[] fieldTypesArray = List.of(aggFieldType, anotherFieldType).toArray(new MappedFieldType[0]); + // When counting the number of collects, we want to record how many collects actually happened. The new composite type + // ends up keeping track of the number of counts that happened, allowing us to verify whether the precomputation was used + // or not. + final InternalMissing missing = searchAndReduce(indexSearcher, newMatchAllQuery(), builder, fieldTypesArray); + assertEquals(_doc_count * numDocs, missing.getDocCount()); + } + } + } + + public void testEmpty() throws IOException { + + final MappedFieldType aggFieldType = new NumberFieldMapper.NumberFieldType("agg_field", NumberType.LONG); + final MappedFieldType anotherFieldType = new NumberFieldMapper.NumberFieldType("another_field", NumberType.LONG); + + final MissingAggregationBuilder builder = new MissingAggregationBuilder("_name").field(aggFieldType.name()); + + CheckedConsumer writeIndex = (writer -> { return; }); + + // The precompute optimization kicked in, so no docs were traversed. + testCase( + newMatchAllQuery(), + builder, + writeIndex, + internalMissing -> { assertEquals(0, internalMissing.getDocCount()); }, + List.of(aggFieldType, anotherFieldType), + 0 + ); } public void testMatchSparse() throws IOException { @@ -131,8 +247,32 @@ public void testMatchSparse() throws IOException { final int numDocs = randomIntBetween(100, 200); int docsMissingAggField = 0; + int docsIndexedMissingAggField = 0; final List> docs = new ArrayList<>(); + + // The list of documents that were added with the field value indexed + final List> docsIndexed = new ArrayList<>(); + for (int i = 0; i < numDocs; i++) { + if (randomBoolean()) { + final long randomLong = randomLong(); + docsIndexed.add( + Set.of( + new SortedNumericDocValuesField(aggFieldType.name(), randomLong), + new StringField(aggFieldType.name(), String.valueOf(randomLong), Store.NO) + ) + ); + } else { + final long randomLong = randomLong(); + docsIndexed.add( + Set.of( + new SortedNumericDocValuesField(anotherFieldType.name(), randomLong), + new StringField(anotherFieldType.name(), String.valueOf(randomLong), Store.NO) + ) + ); + docsIndexedMissingAggField++; + } + if (randomBoolean()) { docs.add(singleton(new SortedNumericDocValuesField(aggFieldType.name(), randomLong()))); } else { @@ -141,11 +281,18 @@ public void testMatchSparse() throws IOException { } } final int finalDocsMissingAggField = docsMissingAggField; + final int finalDocsIndexedMissingAggField = docsIndexedMissingAggField; + + // The precompute optimization kicked in, so no docs were traversed. + testCase(newMatchAllQuery(), builder, writer -> writer.addDocuments(docsIndexed), internalMissing -> { + assertEquals(finalDocsIndexedMissingAggField, internalMissing.getDocCount()); + assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); + }, List.of(aggFieldType, anotherFieldType), 0); testCase(newMatchAllQuery(), builder, writer -> writer.addDocuments(docs), internalMissing -> { assertEquals(finalDocsMissingAggField, internalMissing.getDocCount()); assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); - }, List.of(aggFieldType, anotherFieldType)); + }, List.of(aggFieldType, anotherFieldType), numDocs); } public void testMatchSparseRangeField() throws IOException { @@ -161,8 +308,27 @@ public void testMatchSparseRangeField() throws IOException { final int numDocs = randomIntBetween(100, 200); int docsMissingAggField = 0; + int docsIndexedMissingAggField = 0; + final List> docs = new ArrayList<>(); + + // The list of documents that were added with the field value indexed + final List> docsIndexed = new ArrayList<>(); + for (int i = 0; i < numDocs; i++) { + if (randomBoolean()) { + docsIndexed.add(singleton(encodedRangeField)); + } else { + final long randomLong = randomLong(); + docsIndexed.add( + Set.of( + new SortedNumericDocValuesField(anotherFieldType.name(), randomLong), + new StringField(anotherFieldType.name(), String.valueOf(randomLong), Store.NO) + ) + ); + docsIndexedMissingAggField++; + } + if (randomBoolean()) { docs.add(singleton(encodedRangeField)); } else { @@ -171,11 +337,75 @@ public void testMatchSparseRangeField() throws IOException { } } final int finalDocsMissingAggField = docsMissingAggField; + final int finalDocsIndexedMissingAggField = docsIndexedMissingAggField; + + // The precompute does not work because only the other field was actually indexed. Therefore, the + // precomputation could not declare whether the field was simply not indexed or if there were + // actually no values in that field. + testCase(newMatchAllQuery(), builder, writer -> writer.addDocuments(docsIndexed), internalMissing -> { + assertEquals(finalDocsIndexedMissingAggField, internalMissing.getDocCount()); + assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); + }, Arrays.asList(aggFieldType, anotherFieldType), numDocs); testCase(newMatchAllQuery(), builder, writer -> writer.addDocuments(docs), internalMissing -> { assertEquals(finalDocsMissingAggField, internalMissing.getDocCount()); assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); - }, Arrays.asList(aggFieldType, anotherFieldType)); + }, Arrays.asList(aggFieldType, anotherFieldType), numDocs); + } + + public void testNestedTermsAgg() throws Exception { + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + Document document = new Document(); + document.add(new SortedDocValuesField("field1", new BytesRef("a"))); + document.add(new SortedDocValuesField("field2", new BytesRef("b"))); + document.add(new StringField("field1", "a", Store.NO)); + document.add(new StringField("field2", "b", Store.NO)); + indexWriter.addDocument(document); + document = new Document(); + document.add(new SortedDocValuesField("field1", new BytesRef("c"))); + document.add(new SortedDocValuesField("field2", new BytesRef("d"))); + document.add(new StringField("field1", "c", Store.NO)); + document.add(new StringField("field2", "d", Store.NO)); + indexWriter.addDocument(document); + document = new Document(); + document.add(new SortedDocValuesField("field1", new BytesRef("e"))); + document.add(new SortedDocValuesField("field2", new BytesRef("f"))); + document.add(new StringField("field1", "e", Store.NO)); + document.add(new StringField("field2", "f", Store.NO)); + indexWriter.addDocument(document); + document = new Document(); + document.add(new SortedDocValuesField("field2", new BytesRef("g"))); + document.add(new StringField("field2", "g", Store.NO)); + indexWriter.addDocument(document); + try (IndexReader indexReader = maybeWrapReaderEs(indexWriter.getReader())) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + String executionHint = randomFrom(TermsAggregatorFactory.ExecutionMode.values()).toString(); + Aggregator.SubAggCollectionMode collectionMode = randomFrom(Aggregator.SubAggCollectionMode.values()); + MissingAggregationBuilder aggregationBuilder = new MissingAggregationBuilder("_name1").field("field1") + .subAggregation( + new TermsAggregationBuilder("_name2").userValueTypeHint(ValueType.STRING) + .executionHint(executionHint) + .collectMode(collectionMode) + .field("field2") + .order(BucketOrder.key(true)) + ); + MappedFieldType fieldType1 = new KeywordFieldMapper.KeywordFieldType("field1"); + MappedFieldType fieldType2 = new KeywordFieldMapper.KeywordFieldType("field2"); + + final InternalMissing missing = searchAndReduceCounting( + 4, + indexSearcher, + newMatchAllQuery(), + aggregationBuilder, + new MappedFieldType[] { fieldType1, fieldType2 } + ); + + assertEquals(1, missing.getDocCount()); + assertTrue(AggregationInspectionHelper.hasValue(missing)); + } + } + } } public void testUnmappedWithoutMissingParam() throws IOException { @@ -184,14 +414,36 @@ public void testUnmappedWithoutMissingParam() throws IOException { final MissingAggregationBuilder builder = new MissingAggregationBuilder("_name").field("unknown_field"); - testCase(newMatchAllQuery(), builder, writer -> { + // This will be the case where the field data written will be indexed. + CheckedConsumer writeIndexIndexed = (writer -> { + for (int i = 0; i < numDocs; i++) { + final long randomLong = randomLong(); + writer.addDocument( + Set.of( + new SortedNumericDocValuesField(aggFieldType.name(), randomLong), + new StringField(aggFieldType.name(), String.valueOf(randomLong), Store.NO) + ) + ); + } + }); + + // The field data was not indexed internally. + CheckedConsumer writeIndexNotIndexed = (writer -> { for (int i = 0; i < numDocs; i++) { writer.addDocument(singleton(new SortedNumericDocValuesField(aggFieldType.name(), randomLong()))); } - }, internalMissing -> { + }); + + // Unfortunately, the values source is not provided, therefore, we cannot use the precomputation. + testCase(newMatchAllQuery(), builder, writeIndexIndexed, internalMissing -> { + assertEquals(numDocs, internalMissing.getDocCount()); + assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); + }, singleton(aggFieldType), numDocs); + + testCase(newMatchAllQuery(), builder, writeIndexNotIndexed, internalMissing -> { assertEquals(numDocs, internalMissing.getDocCount()); assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); - }, singleton(aggFieldType)); + }, singleton(aggFieldType), numDocs); } public void testUnmappedWithMissingParam() throws IOException { @@ -200,6 +452,22 @@ public void testUnmappedWithMissingParam() throws IOException { final MissingAggregationBuilder builder = new MissingAggregationBuilder("_name").field("unknown_field").missing(randomLong()); + // Because the field is unmapped, the fieldname will not existm so we cannot collect through precomputation optimization. + testCase(newMatchAllQuery(), builder, writer -> { + for (int i = 0; i < numDocs; i++) { + final long randomLong = randomLong(); + writer.addDocument( + Set.of( + new SortedNumericDocValuesField(aggFieldType.name(), randomLong), + new StringField(aggFieldType.name(), String.valueOf(randomLong), Store.NO) + ) + ); + } + }, internalMissing -> { + assertEquals(0, internalMissing.getDocCount()); + assertFalse(AggregationInspectionHelper.hasValue(internalMissing)); + }, singleton(aggFieldType), numDocs); + testCase(newMatchAllQuery(), builder, writer -> { for (int i = 0; i < numDocs; i++) { writer.addDocument(singleton(new SortedNumericDocValuesField(aggFieldType.name(), randomLong()))); @@ -207,7 +475,7 @@ public void testUnmappedWithMissingParam() throws IOException { }, internalMissing -> { assertEquals(0, internalMissing.getDocCount()); assertFalse(AggregationInspectionHelper.hasValue(internalMissing)); - }, singleton(aggFieldType)); + }, singleton(aggFieldType), numDocs); } public void testMissingParam() throws IOException { @@ -218,6 +486,22 @@ public void testMissingParam() throws IOException { final MissingAggregationBuilder builder = new MissingAggregationBuilder("_name").field(aggFieldType.name()).missing(randomLong()); + // Having the missing parameter will make the missing aggregator not responsible for any documents, so it will short-circuit + testCase(newMatchAllQuery(), builder, writer -> { + for (int i = 0; i < numDocs; i++) { + final long randomLong = randomLong(); + writer.addDocument( + Set.of( + new SortedNumericDocValuesField(anotherFieldType.name(), randomLong), + new StringField(anotherFieldType.name(), String.valueOf(randomLong), Store.NO) + ) + ); + } + }, internalMissing -> { + assertEquals(0, internalMissing.getDocCount()); + assertFalse(AggregationInspectionHelper.hasValue(internalMissing)); + }, List.of(aggFieldType, anotherFieldType), 0); + testCase(newMatchAllQuery(), builder, writer -> { for (int i = 0; i < numDocs; i++) { writer.addDocument(singleton(new SortedNumericDocValuesField(anotherFieldType.name(), randomLong()))); @@ -225,7 +509,7 @@ public void testMissingParam() throws IOException { }, internalMissing -> { assertEquals(0, internalMissing.getDocCount()); assertFalse(AggregationInspectionHelper.hasValue(internalMissing)); - }, List.of(aggFieldType, anotherFieldType)); + }, List.of(aggFieldType, anotherFieldType), 0); } public void testMultiValuedField() throws IOException { @@ -236,8 +520,33 @@ public void testMultiValuedField() throws IOException { final int numDocs = randomIntBetween(100, 200); int docsMissingAggField = 0; + int docsIndexedMissingAggField = 0; final List> docs = new ArrayList<>(); + final List> docsIndexed = new ArrayList<>(); + for (int i = 0; i < numDocs; i++) { + if (randomBoolean()) { + final long randomLong = randomLong(); + docsIndexed.add( + Set.of( + new SortedNumericDocValuesField(aggFieldType.name(), randomLong), + new SortedNumericDocValuesField(aggFieldType.name(), randomLong + 1), + new StringField(aggFieldType.name(), String.valueOf(randomLong), Store.NO), + new StringField(aggFieldType.name(), String.valueOf(randomLong + 1), Store.NO) + + ) + ); + } else { + final long randomLong = randomLong(); + docsIndexed.add( + Set.of( + new SortedNumericDocValuesField(anotherFieldType.name(), randomLong), + new StringField(anotherFieldType.name(), String.valueOf(randomLong), Store.NO) + ) + ); + docsIndexedMissingAggField++; + } + if (randomBoolean()) { final long randomLong = randomLong(); docs.add( @@ -252,11 +561,18 @@ public void testMultiValuedField() throws IOException { } } final int finalDocsMissingAggField = docsMissingAggField; + final int finalDocsIndexedMissingAggField = docsIndexedMissingAggField; + + // The precompute optimization kicked in, so no docs were traversed. + testCase(newMatchAllQuery(), builder, writer -> writer.addDocuments(docsIndexed), internalMissing -> { + assertEquals(finalDocsIndexedMissingAggField, internalMissing.getDocCount()); + assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); + }, List.of(aggFieldType, anotherFieldType), 0); testCase(newMatchAllQuery(), builder, writer -> writer.addDocuments(docs), internalMissing -> { assertEquals(finalDocsMissingAggField, internalMissing.getDocCount()); assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); - }, List.of(aggFieldType, anotherFieldType)); + }, List.of(aggFieldType, anotherFieldType), numDocs); } public void testSingleValuedFieldWithValueScript() throws IOException { @@ -275,8 +591,29 @@ private void valueScriptTestCase(Script script) throws IOException { final int numDocs = randomIntBetween(100, 200); int docsMissingAggField = 0; + int docsIndexedMissingAggField = 0; final List> docs = new ArrayList<>(); + final List> docsIndexed = new ArrayList<>(); + for (int i = 0; i < numDocs; i++) { + final long randomLong = randomLong(); + if (randomBoolean()) { + docsIndexed.add( + Set.of( + new SortedNumericDocValuesField(aggFieldType.name(), randomLong), + new StringField(aggFieldType.name(), String.valueOf(randomLong), Store.NO) + ) + ); + } else { + docsIndexed.add( + Set.of( + new SortedNumericDocValuesField(anotherFieldType.name(), randomLong), + new StringField(anotherFieldType.name(), String.valueOf(randomLong), Store.NO) + ) + ); + docsIndexedMissingAggField++; + } + if (randomBoolean()) { docs.add(singleton(new SortedNumericDocValuesField(aggFieldType.name(), randomLong()))); } else { @@ -284,12 +621,20 @@ private void valueScriptTestCase(Script script) throws IOException { docsMissingAggField++; } } + + final int finalDocsIndexedMissingAggField = docsIndexedMissingAggField; final int finalDocsMissingField = docsMissingAggField; + // The precompute optimization kicked in, so no docs were traversed. + testCase(newMatchAllQuery(), builder, writer -> writer.addDocuments(docsIndexed), internalMissing -> { + assertEquals(finalDocsIndexedMissingAggField, internalMissing.getDocCount()); + assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); + }, List.of(aggFieldType, anotherFieldType), 0); + testCase(newMatchAllQuery(), builder, writer -> writer.addDocuments(docs), internalMissing -> { assertEquals(finalDocsMissingField, internalMissing.getDocCount()); assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); - }, List.of(aggFieldType, anotherFieldType)); + }, List.of(aggFieldType, anotherFieldType), numDocs); } public void testMultiValuedFieldWithFieldScriptWithParams() throws IOException { @@ -313,6 +658,26 @@ private void fieldScriptTestCase(Script script, long threshold) throws IOExcepti final int numDocs = randomIntBetween(100, 200); int docsBelowThreshold = 0; final List> docs = new ArrayList<>(); + + int docsIndexedBelowThreshold = 0; + final List> docsIndexed = new ArrayList<>(); + + for (int i = 0; i < numDocs; i++) { + final long firstIndexedValue = randomLongBetween(0, 100); + final long secondIndexedValue = firstIndexedValue + 1; + if (firstIndexedValue < threshold && secondIndexedValue < threshold) { + docsIndexedBelowThreshold++; + } + docsIndexed.add( + Set.of( + new SortedNumericDocValuesField(aggFieldType.name(), firstIndexedValue), + new StringField(aggFieldType.name(), String.valueOf(firstIndexedValue), Store.NO), + new SortedNumericDocValuesField(aggFieldType.name(), secondIndexedValue), + new StringField(aggFieldType.name(), String.valueOf(secondIndexedValue), Store.NO) + ) + ); + } + for (int i = 0; i < numDocs; i++) { final long firstValue = randomLongBetween(0, 100); final long secondValue = firstValue + 1; @@ -326,12 +691,20 @@ private void fieldScriptTestCase(Script script, long threshold) throws IOExcepti ) ); } + final int finalDocsBelowThreshold = docsBelowThreshold; + final int finalDocsIndexedBelowThreshold = docsIndexedBelowThreshold; + + // The precompute optimization did not kick in because the values source did not have an indexed name. + testCase(newMatchAllQuery(), builder, writer -> writer.addDocuments(docsIndexed), internalMissing -> { + assertEquals(finalDocsIndexedBelowThreshold, internalMissing.getDocCount()); + assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); + }, singleton(aggFieldType), numDocs); testCase(newMatchAllQuery(), builder, writer -> writer.addDocuments(docs), internalMissing -> { assertEquals(finalDocsBelowThreshold, internalMissing.getDocCount()); assertTrue(AggregationInspectionHelper.hasValue(internalMissing)); - }, singleton(aggFieldType)); + }, singleton(aggFieldType), numDocs); } private void testCase( @@ -339,7 +712,8 @@ private void testCase( MissingAggregationBuilder builder, CheckedConsumer writeIndex, Consumer verify, - Collection fieldTypes + Collection fieldTypes, + int expectedCount ) throws IOException { try (Directory directory = newDirectory()) { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { @@ -349,7 +723,8 @@ private void testCase( try (IndexReader indexReader = DirectoryReader.open(directory)) { final IndexSearcher indexSearcher = newSearcher(indexReader, true, true); final MappedFieldType[] fieldTypesArray = fieldTypes.toArray(new MappedFieldType[0]); - final InternalMissing missing = searchAndReduce(indexSearcher, query, builder, fieldTypesArray); + // When counting the number of collects, we want to record how many collects actually happened. + final InternalMissing missing = searchAndReduceCounting(expectedCount, indexSearcher, query, builder, fieldTypesArray); verify.accept(missing); } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorTests.java index 6d66cfc24e558..78808872f614a 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorTests.java @@ -39,6 +39,7 @@ import org.apache.lucene.document.SortedDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.FieldExistsQuery; @@ -133,6 +134,14 @@ public void testMatchNoDocs() throws IOException { aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1), agg -> assertEquals(0, agg.getBuckets().size()) ); + testSearchCaseIndexString( + new MatchNoDocsQuery(), + dataset, + aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1), + agg -> assertEquals(0, agg.getBuckets().size()), + true, + 0 + ); testSearchCase( new MatchNoDocsQuery(), dataset, @@ -156,6 +165,12 @@ public void testMatchAllDocs() throws IOException { assertThat(bucket.getKeyAsString(), equalTo("1")); assertThat(bucket.getDocCount(), equalTo(1L)); }); + testSearchCaseIndexString(query, dataset, aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1), agg -> { + assertEquals(1, agg.getBuckets().size()); + StringRareTerms.Bucket bucket = (StringRareTerms.Bucket) agg.getBuckets().get(0); + assertThat(bucket.getKeyAsString(), equalTo("1")); + assertThat(bucket.getDocCount(), equalTo(1L)); + }, true, 0); } public void testManyDocsOneRare() throws IOException { @@ -182,6 +197,12 @@ public void testManyDocsOneRare() throws IOException { assertThat(bucket.getKeyAsString(), equalTo("0")); assertThat(bucket.getDocCount(), equalTo(1L)); }); + testSearchCaseIndexString(query, d, aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1), agg -> { + assertEquals(1, agg.getBuckets().size()); + StringRareTerms.Bucket bucket = (StringRareTerms.Bucket) agg.getBuckets().get(0); + assertThat(bucket.getKeyAsString(), equalTo("0")); + assertThat(bucket.getDocCount(), equalTo(1L)); + }, true, 0); } public void testIncludeExclude() throws IOException { @@ -213,6 +234,21 @@ public void testIncludeExclude() throws IOException { assertThat(bucket.getDocCount(), equalTo(2L)); } ); + testSearchCaseIndexString( + query, + dataset, + aggregation -> aggregation.field(KEYWORD_FIELD) + .maxDocCount(2) // bump to 2 since we're only including "2" + .includeExclude(new IncludeExclude(new String[] { "2" }, new String[] {})), + agg -> { + assertEquals(1, agg.getBuckets().size()); + StringRareTerms.Bucket bucket = (StringRareTerms.Bucket) agg.getBuckets().get(0); + assertThat(bucket.getKeyAsString(), equalTo("2")); + assertThat(bucket.getDocCount(), equalTo(2L)); + }, + true, + dataset.size() + ); } public void testEmbeddedMaxAgg() throws IOException { @@ -246,6 +282,20 @@ public void testEmbeddedMaxAgg() throws IOException { assertThat(children.asList().get(0).getName(), equalTo("the_max")); assertThat(((Max) (children.asList().get(0))).getValue(), equalTo(1.0)); }); + testSearchCaseIndexString(query, dataset, aggregation -> { + MaxAggregationBuilder max = new MaxAggregationBuilder("the_max").field(LONG_FIELD); + aggregation.field(KEYWORD_FIELD).maxDocCount(1).subAggregation(max); + }, agg -> { + assertEquals(1, agg.getBuckets().size()); + StringRareTerms.Bucket bucket = (StringRareTerms.Bucket) agg.getBuckets().get(0); + assertThat(bucket.getKey(), equalTo("1")); + assertThat(bucket.getDocCount(), equalTo(1L)); + + Aggregations children = bucket.getAggregations(); + assertThat(children.asList().size(), equalTo(1)); + assertThat(children.asList().get(0).getName(), equalTo("the_max")); + assertThat(((Max) (children.asList().get(0))).getValue(), equalTo(1.0)); + }, true, dataset.size()); } public void testEmpty() throws IOException { @@ -276,6 +326,14 @@ public void testEmpty() throws IOException { aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1), agg -> assertEquals(0, agg.getBuckets().size()) ); + testSearchCaseIndexString( + query, + Collections.emptyList(), + aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1), + agg -> assertEquals(0, agg.getBuckets().size()), + true, + 0 + ); } public void testUnmapped() throws Exception { @@ -581,6 +639,22 @@ private void testSearchCase( } + private void testSearchCaseIndexString( + Query query, + List dataset, + Consumer configure, + Consumer> verify, + boolean shouldIndex, + int expectedCount + ) throws IOException { + RareTermsAggregationBuilder aggregationBuilder = new RareTermsAggregationBuilder("_name"); + if (configure != null) { + configure.accept(aggregationBuilder); + } + verify.accept(executeTestCaseIndexString(query, dataset, aggregationBuilder, shouldIndex, expectedCount)); + + } + private A executeTestCase(Query query, List dataset, AggregationBuilder aggregationBuilder) throws IOException { try (Directory directory = newDirectory()) { @@ -610,6 +684,51 @@ private A executeTestCase(Query query, List A executeTestCaseIndexString( + Query query, + List dataset, + AggregationBuilder aggregationBuilder, + boolean shouldIndex, + int expectedCount + ) throws IOException { + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + Document document = new Document(); + List shuffledDataset = new ArrayList<>(dataset); + Collections.shuffle(shuffledDataset, random()); + for (Long value : shuffledDataset) { + document.add(new SortedNumericDocValuesField(LONG_FIELD, value)); + document.add(new LongPoint(LONG_FIELD, value)); + document.add(new SortedSetDocValuesField(KEYWORD_FIELD, new BytesRef(Long.toString(value)))); + if (shouldIndex) { + document.add(new StringField(KEYWORD_FIELD, Long.toString(value), Field.Store.NO)); + } + document.add(new SortedSetDocValuesField("even_odd", new BytesRef(value % 2 == 0 ? "even" : "odd"))); + indexWriter.addDocument(document); + document.clear(); + } + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + + MappedFieldType[] types = new MappedFieldType[] { + keywordField(KEYWORD_FIELD), + longField(LONG_FIELD), + keywordField("even_odd") }; + return searchAndReduceCounting(expectedCount, indexSearcher, query, aggregationBuilder, types); + } + } + } + @Override public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { /* diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index e59b28d0a51ff..092cbd4c5091f 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -324,14 +324,15 @@ public void testSimpleAggregationLowCardinality() throws Exception { * This test case utilizes the MapStringTermsAggregator. */ public void testSimpleMapStringAggregation() throws Exception { - testSimple( - ADD_SORTED_SET_FIELD_INDEXED, - randomBoolean(), - randomBoolean(), - randomBoolean(), - TermsAggregatorFactory.ExecutionMode.MAP, - 4 - ); + /* Currently, we can precompute the aggregation if we do not include deleted documents in segment and do not include the docCountField. */ + testSimple(ADD_SORTED_SET_FIELD_INDEXED, false, false, false, TermsAggregatorFactory.ExecutionMode.MAP, 0); + testSimple(ADD_SORTED_SET_FIELD_INDEXED, false, true, false, TermsAggregatorFactory.ExecutionMode.MAP, 4); + testSimple(ADD_SORTED_SET_FIELD_INDEXED, false, false, true, TermsAggregatorFactory.ExecutionMode.MAP, 0); + testSimple(ADD_SORTED_SET_FIELD_INDEXED, false, true, true, TermsAggregatorFactory.ExecutionMode.MAP, 4); + testSimple(ADD_SORTED_SET_FIELD_INDEXED, true, false, false, TermsAggregatorFactory.ExecutionMode.MAP, 4); + testSimple(ADD_SORTED_SET_FIELD_INDEXED, true, true, false, TermsAggregatorFactory.ExecutionMode.MAP, 4); + testSimple(ADD_SORTED_SET_FIELD_INDEXED, true, false, true, TermsAggregatorFactory.ExecutionMode.MAP, 4); + testSimple(ADD_SORTED_SET_FIELD_INDEXED, true, true, true, TermsAggregatorFactory.ExecutionMode.MAP, 4); } /** diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index d9348aacd7a11..845d6f390a1e6 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -761,6 +761,212 @@ protected A searchAndReduc return internalAgg; } + protected A searchAndReduceCounting( + int expectedCount, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + boolean shardFanOut, + MappedFieldType... fieldTypes + ) throws IOException { + return searchAndReduceCounting( + expectedCount, + createIndexSettings(), + searcher, + query, + builder, + DEFAULT_MAX_BUCKETS, + shardFanOut, + fieldTypes + ); + } + + protected A searchAndReduceCounting( + int expectedCount, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + MappedFieldType... fieldTypes + ) throws IOException { + return searchAndReduceCounting( + expectedCount, + createIndexSettings(), + searcher, + query, + builder, + DEFAULT_MAX_BUCKETS, + randomBoolean(), + fieldTypes + ); + } + + protected A searchAndReduceCounting( + int expectedCount, + IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + MappedFieldType... fieldTypes + ) throws IOException { + return searchAndReduceCounting( + expectedCount, + indexSettings, + searcher, + query, + builder, + DEFAULT_MAX_BUCKETS, + randomBoolean(), + fieldTypes + ); + } + + protected A searchAndReduceCounting( + int expectedCount, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + MappedFieldType... fieldTypes + ) throws IOException { + return searchAndReduceCounting( + expectedCount, + createIndexSettings(), + searcher, + query, + builder, + maxBucket, + randomBoolean(), + fieldTypes + ); + } + + protected A searchAndReduceCounting( + int expectedCount, + IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + boolean shardFanOut, + MappedFieldType... fieldTypes + ) throws IOException { + return searchAndReduceCounting(expectedCount, indexSettings, searcher, query, builder, maxBucket, false, shardFanOut, fieldTypes); + } + + /** + * Collects all documents that match the provided query {@link Query} and + * returns the reduced {@link InternalAggregation}. + *

+ * Half the time it aggregates each leaf individually and reduces all + * results together. The other half the time it aggregates across the entire + * index at once and runs a final reduction on the single resulting agg. + * The difference between this method and {@link searchAndReduce} is that + * this method also asserts the the number of documents match the expectedCount. + */ + protected A searchAndReduceCounting( + int expectedCount, + IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + boolean hasNested, + boolean shardFanOut, + MappedFieldType... fieldTypes + ) throws IOException { + final IndexReaderContext ctx = searcher.getTopReaderContext(); + final PipelineTree pipelines = builder.buildPipelineTree(); + List aggs = new ArrayList<>(); + if (hasNested) { + query = Queries.filtered(query, Queries.newNonNestedFilter()); + } + Query rewritten = searcher.rewrite(query); + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes); + CountingAggregator rootCount = new CountingAggregator(new AtomicInteger(), root); + int totalCollectCount = 0; + + if (shardFanOut && searcher.getIndexReader().leaves().size() > 0) { + assertThat(ctx, instanceOf(CompositeReaderContext.class)); + final CompositeReaderContext compCTX = (CompositeReaderContext) ctx; + final int size = compCTX.leaves().size(); + final ShardSearcher[] subSearchers = new ShardSearcher[size]; + for (int searcherIDX = 0; searcherIDX < subSearchers.length; searcherIDX++) { + final LeafReaderContextPartition partition = LeafReaderContextPartition.createForEntireSegment( + compCTX.leaves().get(searcherIDX) + ); + subSearchers[searcherIDX] = new ShardSearcher(partition, compCTX); + } + + for (ShardSearcher subSearcher : subSearchers) { + MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + C a = createAggregator(query, builder, subSearcher, indexSettings, shardBucketConsumer, fieldTypes); + CountingAggregator aCount = new CountingAggregator(new AtomicInteger(), a); + aCount.preCollection(); + Weight weight = subSearcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f); + subSearcher.search(weight, aCount); + aCount.postCollection(); + aggs.add(aCount.buildTopLevel()); + totalCollectCount += aCount.getCollectCount().get(); + } + } else { + rootCount.preCollection(); + searcher.search(rewritten, rootCount); + rootCount.postCollection(); + aggs.add(rootCount.buildTopLevel()); + totalCollectCount = rootCount.getCollectCount().get(); + } + + assertEquals(expectedCount, totalCollectCount); + + if (randomBoolean() && aggs.size() > 1) { + // sometimes do an incremental reduce + int toReduceSize = aggs.size(); + Collections.shuffle(aggs, random()); + int r = randomIntBetween(1, toReduceSize); + List toReduce = aggs.subList(0, r); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction( + rootCount.context().bigArrays(), + getMockScriptService(), + () -> PipelineAggregator.PipelineTree.EMPTY + ); + A reduced = (A) aggs.get(0).reduce(toReduce, context); + aggs = new ArrayList<>(aggs.subList(r, toReduceSize)); + aggs.add(reduced); + } + + // now do the final reduce + MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + rootCount.context().bigArrays(), + getMockScriptService(), + reduceBucketConsumer, + pipelines + ); + + @SuppressWarnings("unchecked") + A internalAgg = (A) aggs.get(0).reduce(aggs, context); + + // materialize any parent pipelines + internalAgg = (A) internalAgg.reducePipelines(internalAgg, context, pipelines); + + // materialize any sibling pipelines at top level + for (PipelineAggregator pipelineAggregator : pipelines.aggregators()) { + internalAgg = (A) pipelineAggregator.reduce(internalAgg, context); + } + doAssertReducedMultiBucketConsumer(internalAgg, reduceBucketConsumer); + return internalAgg; + } + protected A searchAndReduceStarTree( IndexSettings indexSettings, IndexSearcher searcher,