Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Improve string terms aggregation performance using Collector#setWeight #12628

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
- Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499))
- Quickly compute terms aggregations when the top-level query is functionally match-all for a segment ([#11643](https://github.com/opensearch-project/OpenSearch/pull/11643))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
Expand All @@ -46,6 +51,7 @@
import org.opensearch.common.util.LongHash;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.AggregationExecutionException;
import org.opensearch.search.aggregations.Aggregator;
Expand Down Expand Up @@ -73,6 +79,7 @@

import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder;
import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

/**
* An aggregator of string values that relies on global ordinals in order to build buckets.
Expand All @@ -85,6 +92,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr

private final LongPredicate acceptedGlobalOrdinals;
private final long valueCount;
private final String fieldName;
private Weight weight;
private final GlobalOrdLookupFunction lookupGlobalOrd;
protected final CollectionStrategy collectionStrategy;
protected int segmentsWithSingleValuedOrds = 0;
Expand Down Expand Up @@ -136,16 +145,105 @@ public GlobalOrdinalsStringTermsAggregator(
return new DenseGlobalOrds();
});
}
this.fieldName = (valuesSource instanceof ValuesSource.Bytes.WithOrdinals.FieldData)
? ((ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource).getIndexFieldName()
: null;
}

String descriptCollectionStrategy() {
return collectionStrategy.describe();
}

public void setWeight(Weight weight) {
this.weight = weight;
}

/**
Read doc frequencies directly from indexed terms in the segment to skip iterating through individual documents
@param ctx The LeafReaderContext to collect terms from
@param globalOrds The SortedSetDocValues for the field's ordinals
@param ordCountConsumer A consumer to accept collected term frequencies
@return A LeafBucketCollector implementation with collection termination, since collection is complete
@throws IOException If an I/O error occurs during reading
*/
LeafBucketCollector termDocFreqCollector(
LeafReaderContext ctx,
SortedSetDocValues globalOrds,
BiConsumer<Long, Integer> ordCountConsumer
) throws IOException {
if (weight == null) {
// Weight not assigned - cannot use this optimization
return null;
} else {
if (weight.count(ctx) == 0) {
// No documents matches top level query on this segment, we can skip the segment entirely
return LeafBucketCollector.NO_OP_COLLECTOR;
} else if (weight.count(ctx) != ctx.reader().maxDoc()) {
// weight.count(ctx) == ctx.reader().maxDoc() implies there are no deleted documents and
// top-level query matches all docs in the segment
return null;
}
}

Terms segmentTerms = ctx.reader().terms(this.fieldName);
if (segmentTerms == null) {
// Field is not indexed.
return null;
}

NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME);
if (docCountValues.nextDoc() != NO_MORE_DOCS) {
// This segment has at least one document with the _doc_count field.
return null;
}

TermsEnum indexTermsEnum = segmentTerms.iterator();
BytesRef indexTerm = indexTermsEnum.next();
TermsEnum globalOrdinalTermsEnum = globalOrds.termsEnum();
BytesRef ordinalTerm = globalOrdinalTermsEnum.next();

// Iterate over the terms in the segment, look for matches in the global ordinal terms,
// and increment bucket count when segment terms match global ordinal terms.
while (indexTerm != null && ordinalTerm != null) {
int compare = indexTerm.compareTo(ordinalTerm);
if (compare == 0) {
if (acceptedGlobalOrdinals.test(globalOrdinalTermsEnum.ord())) {
ordCountConsumer.accept(globalOrdinalTermsEnum.ord(), indexTermsEnum.docFreq());
}
indexTerm = indexTermsEnum.next();
ordinalTerm = globalOrdinalTermsEnum.next();
} else if (compare < 0) {
indexTerm = indexTermsEnum.next();
} else {
ordinalTerm = globalOrdinalTermsEnum.next();
}
}
return new LeafBucketCollector() {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
throw new CollectionTerminatedException();
}
};
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx);
collectionStrategy.globalOrdsReady(globalOrds);

if (collectionStrategy instanceof DenseGlobalOrds
&& this.resultStrategy instanceof StandardTermsResults
&& sub == LeafBucketCollector.NO_OP_COLLECTOR) {
LeafBucketCollector termDocFreqCollector = termDocFreqCollector(
ctx,
globalOrds,
(ord, docCount) -> incrementBucketDocCount(collectionStrategy.globalOrdToBucketOrd(0, ord), docCount)
);
if (termDocFreqCollector != null) {
return termDocFreqCollector;
}
}

SortedDocValues singleValues = DocValues.unwrapSingleton(globalOrds);
if (singleValues != null) {
segmentsWithSingleValuedOrds++;
Expand Down Expand Up @@ -343,9 +441,20 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
final SortedSetDocValues segmentOrds = valuesSource.ordinalsValues(ctx);
segmentDocCounts = context.bigArrays().grow(segmentDocCounts, 1 + segmentOrds.getValueCount());
assert sub == LeafBucketCollector.NO_OP_COLLECTOR;
final SortedDocValues singleValues = DocValues.unwrapSingleton(segmentOrds);
mapping = valuesSource.globalOrdinalsMapping(ctx);
// Dense mode doesn't support include/exclude so we don't have to check it here.

if (this.resultStrategy instanceof StandardTermsResults) {
LeafBucketCollector termDocFreqCollector = this.termDocFreqCollector(
ctx,
segmentOrds,
(ord, docCount) -> incrementBucketDocCount(mapping.applyAsLong(ord), docCount)
);
if (termDocFreqCollector != null) {
return termDocFreqCollector;
}
}

final SortedDocValues singleValues = DocValues.unwrapSingleton(segmentOrds);
if (singleValues != null) {
segmentsWithSingleValuedOrds++;
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, segmentOrds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ public FieldData(IndexOrdinalsFieldData indexFieldData) {
this.indexFieldData = indexFieldData;
}

public String getIndexFieldName() {
return this.indexFieldData.getFieldName();
}

@Override
public SortedBinaryDocValues bytesValues(LeafReaderContext context) {
final LeafOrdinalsFieldData atomicFieldData = indexFieldData.load(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
return null;
}
}

@Override
public int count(LeafReaderContext context) throws IOException {
return weight.count(context);
}
};
} else {
return weight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
package org.opensearch.search.aggregations.bucket.terms;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
Expand All @@ -41,7 +40,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.TriConsumer;
import org.opensearch.index.mapper.KeywordFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.AggregatorTestCase;
Expand All @@ -57,6 +56,8 @@
public class KeywordTermsAggregatorTests extends AggregatorTestCase {
private static final String KEYWORD_FIELD = "keyword";

private static final Consumer<TermsAggregationBuilder> CONFIGURE_KEYWORD_FIELD = agg -> agg.field(KEYWORD_FIELD);

private static final List<String> dataset;
static {
List<String> d = new ArrayList<>(45);
Expand All @@ -68,51 +69,63 @@ public class KeywordTermsAggregatorTests extends AggregatorTestCase {
dataset = d;
}

private static final Consumer<InternalMappedTerms> VERIFY_MATCH_ALL_DOCS = agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
};

private static final Consumer<InternalMappedTerms> VERIFY_MATCH_NO_DOCS = agg -> { assertEquals(0, agg.getBuckets().size()); };

private static final Query MATCH_ALL_DOCS_QUERY = new MatchAllDocsQuery();

private static final Query MATCH_NO_DOCS_QUERY = new MatchNoDocsQuery();

public void testMatchNoDocs() throws IOException {
testSearchCase(
new MatchNoDocsQuery(),
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_NO_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
agg -> assertEquals(0, agg.getBuckets().size()),
null // without type hint
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_NO_DOCS,
null // without type hint
);

testSearchCase(
new MatchNoDocsQuery(),
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_NO_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
agg -> assertEquals(0, agg.getBuckets().size()),
ValueType.STRING // with type hint
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_NO_DOCS,
ValueType.STRING // with type hint
);
}

public void testMatchAllDocs() throws IOException {
Query query = new MatchAllDocsQuery();

testSearchCase(query, dataset, aggregation -> aggregation.field(KEYWORD_FIELD), agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
},
null // without type hint
testSearchCase(
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_ALL_DOCS_QUERY,
dataset,
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_ALL_DOCS,
null // without type hint
);

testSearchCase(query, dataset, aggregation -> aggregation.field(KEYWORD_FIELD), agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
},
ValueType.STRING // with type hint
testSearchCase(
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_ALL_DOCS_QUERY,
dataset,
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_ALL_DOCS,
ValueType.STRING // with type hint
);
}

private void testSearchCase(
TriConsumer<Document, String, String> addField,
Query query,
List<String> dataset,
Consumer<TermsAggregationBuilder> configure,
Expand All @@ -123,7 +136,7 @@ private void testSearchCase(
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
for (String value : dataset) {
document.add(new SortedSetDocValuesField(KEYWORD_FIELD, new BytesRef(value)));
addField.apply(document, KEYWORD_FIELD, value);
indexWriter.addDocument(document);
document.clear();
}
Expand All @@ -147,5 +160,4 @@ private void testSearchCase(
}
}
}

}
Loading
Loading