-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Added aggregation precomputation for rare terms #18106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
11ecaf3
65e20b8
d51c2a0
ab13378
b5e08d8
ebca7e1
9d73b57
66171ca
b4a4128
b60c221
0375104
86a23cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,15 +31,21 @@ | |
|
|
||
| package org.opensearch.search.aggregations.bucket.terms; | ||
|
|
||
| import org.apache.lucene.index.DocValues; | ||
| import org.apache.lucene.index.LeafReaderContext; | ||
| import org.apache.lucene.index.NumericDocValues; | ||
| import org.apache.lucene.index.Terms; | ||
| import org.apache.lucene.index.TermsEnum; | ||
| import org.apache.lucene.search.ScoreMode; | ||
| import org.apache.lucene.search.Weight; | ||
| import org.apache.lucene.util.BytesRef; | ||
| import org.apache.lucene.util.BytesRefBuilder; | ||
| import org.apache.lucene.util.PriorityQueue; | ||
| import org.opensearch.common.lease.Releasable; | ||
| import org.opensearch.common.lease.Releasables; | ||
| import org.opensearch.common.util.LongArray; | ||
| import org.opensearch.index.fielddata.SortedBinaryDocValues; | ||
| import org.opensearch.index.mapper.DocCountFieldMapper; | ||
| import org.opensearch.search.DocValueFormat; | ||
| import org.opensearch.search.aggregations.Aggregator; | ||
| import org.opensearch.search.aggregations.AggregatorFactories; | ||
|
|
@@ -54,6 +60,7 @@ | |
| import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes; | ||
| import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; | ||
| import org.opensearch.search.aggregations.support.ValuesSource; | ||
| import org.opensearch.search.aggregations.support.ValuesSourceConfig; | ||
| import org.opensearch.search.internal.SearchContext; | ||
|
|
||
| import java.io.IOException; | ||
|
|
@@ -65,6 +72,8 @@ | |
| import java.util.function.Supplier; | ||
|
|
||
| import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder; | ||
| import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_DOCS; | ||
| import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; | ||
|
|
||
| /** | ||
| * An aggregator of string values that hashes the strings on the fly rather | ||
|
|
@@ -75,8 +84,11 @@ | |
| public class MapStringTermsAggregator extends AbstractStringTermsAggregator { | ||
| private final CollectorSource collectorSource; | ||
| private final ResultStrategy<?, ?> resultStrategy; | ||
| private Weight weight; | ||
| private final BytesKeyedBucketOrds bucketOrds; | ||
| private final IncludeExclude.StringFilter includeExclude; | ||
| protected final String fieldName; | ||
| private final ValuesSourceConfig config; | ||
|
|
||
| public MapStringTermsAggregator( | ||
| String name, | ||
|
|
@@ -92,13 +104,25 @@ | |
| SubAggCollectionMode collectionMode, | ||
| boolean showTermDocCountError, | ||
| CardinalityUpperBound cardinality, | ||
| Map<String, Object> metadata | ||
| Map<String, Object> metadata, | ||
| ValuesSourceConfig config | ||
| ) throws IOException { | ||
| super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); | ||
| this.collectorSource = collectorSource; | ||
| this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. | ||
| this.includeExclude = includeExclude; | ||
| bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), cardinality); | ||
| if (collectorSource instanceof ValuesSourceCollectorSource) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like the idea of being uncertain about where the Also, you can probably use pattern matching for instanceof:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I will just stick with fetching from the value source. Since I made the modification to add the field name to the constructor, previous implementations should not be affected. |
||
| ValuesSource valuesCollectorSource = ((ValuesSourceCollectorSource) collectorSource).getValuesSource(); | ||
| this.fieldName = valuesCollectorSource.getIndexFieldName(); | ||
| } else { | ||
| this.fieldName = null; | ||
| } | ||
| this.config = config; | ||
| } | ||
|
|
||
| public void setWeight(Weight weight) { | ||
| this.weight = weight; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -130,6 +154,69 @@ | |
| ); | ||
| } | ||
|
|
||
| @Override | ||
| protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException { | ||
| // TODO: A note is that in scripted aggregations, the way of collecting from buckets is determined from | ||
| // the script aggregator. For now, we will not be able to support the script aggregation. | ||
|
|
||
| // The optimization does not work when there are subaggregations or if there is a filter. | ||
| // The query has to be a match all, otherwise | ||
| if (subAggregators.length > 0 || includeExclude != null || fieldName == null || weight == null) { | ||
| return false; | ||
| } | ||
|
|
||
| // The optimization could only be used if there are no deleted documents and the top-level | ||
| // query matches all documents in the segment. | ||
| if (weight.count(ctx) == 0) { | ||
| return true; | ||
| } else if (weight.count(ctx) != ctx.reader().maxDoc()) { | ||
| return false; | ||
| } | ||
|
|
||
| // If the missing property is specified in the builder, and there are documents with the | ||
| // field missing, we might not be able to use the index unless there is some way we can | ||
| // calculate which ordinal value that missing field is (something I am not sure how to | ||
| // do yet). | ||
| // Custom scripts cannot be supported because when the aggregation is returned, parts of the custom | ||
| // script are not included. See test 'org.opensearch.painless.\ | ||
| // LangPainlessClientYamlTestSuiteIT.test {yaml=painless/100_terms_agg/String Value Script with doc notation}' | ||
| // for more details on why it cannot be supported. | ||
| if ((config != null) | ||
| && ((config.missing() != null && ((weight.count(ctx) != ctx.reader().getDocCount(fieldName)))) || (config.script() != null))) { | ||
| return false; | ||
| } | ||
|
|
||
| Terms stringTerms = ctx.reader().terms(fieldName); | ||
| if (stringTerms == null) { | ||
| // Field is not indexed. | ||
| return false; | ||
| } | ||
|
|
||
| NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); | ||
| if (docCountValues.nextDoc() != NO_MORE_DOCS) { | ||
| // This segment has at least one document with the _doc_count field. | ||
| return false; | ||
| } | ||
|
|
||
| TermsEnum stringTermsEnum = stringTerms.iterator(); | ||
| BytesRef stringTerm = stringTermsEnum.next(); | ||
|
|
||
| // Here, we will iterate over all the terms in the segment and add the counts into the bucket. | ||
| while (stringTerm != null) { | ||
| long bucketOrdinal = bucketOrds.add(0L, stringTerm); | ||
| if (bucketOrdinal < 0) { // already seen | ||
| bucketOrdinal = -1 - bucketOrdinal; | ||
|
Check warning on line 208 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java
|
||
| } | ||
| int docCount = stringTermsEnum.docFreq(); | ||
| if (resultStrategy instanceof SignificantTermsResults sigTermsResultStrategy) { | ||
| sigTermsResultStrategy.updateSubsetSizes(0L, docCount); | ||
|
Check warning on line 212 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java
|
||
| } | ||
| incrementBucketDocCount(bucketOrdinal, docCount); | ||
| stringTerm = stringTermsEnum.next(); | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { | ||
| return resultStrategy.buildAggregations(owningBucketOrds); | ||
|
|
@@ -196,6 +283,10 @@ | |
| return valuesSource.needsScores(); | ||
| } | ||
|
|
||
| public ValuesSource getValuesSource() { | ||
| return valuesSource; | ||
| } | ||
|
|
||
| @Override | ||
| public LeafBucketCollector getLeafCollector( | ||
| IncludeExclude.StringFilter includeExclude, | ||
|
|
@@ -502,6 +593,11 @@ | |
| return "significant_terms"; | ||
| } | ||
|
|
||
| public void updateSubsetSizes(long owningBucketOrd, int amount) { | ||
| subsetSizes = context.bigArrays().grow(subsetSizes, owningBucketOrd + 1); | ||
| subsetSizes.increment(owningBucketOrd, amount); | ||
| } | ||
|
Check warning on line 599 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java
|
||
|
|
||
| @Override | ||
| LeafBucketCollector wrapCollector(LeafBucketCollector primary) { | ||
| return new LeafBucketCollectorBase(primary, null) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if you separate out the test cases as I commented in test files - that can give you a good code coverage as well.