Skip to content

Commit 9ff84a8

Browse files
authored
Handle deleted documents for filter rewrite subaggregation optimization (#19643)
Signed-off-by: Ankit Jain <jainankitk@apache.org>
1 parent b1d1e33 commit 9ff84a8

File tree

5 files changed

+77
-8
lines changed

5 files changed

+77
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1414
- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005))
1515
- Add BindableServices extension point to transport-grpc-spi ([#19304](https://github.com/opensearch-project/OpenSearch/pull/19304))
1616
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
17+
- Handle deleted documents for filter rewrite subaggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643))
1718
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
1819
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))
1920
- Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629))

server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,9 @@ public boolean tryOptimize(
115115
return false;
116116
}
117117

118-
if (leafCtx.reader().hasDeletions()) return false;
118+
// Since we explicitly create bitset of matching docIds for each bucket
119+
// in case of sub-aggregations, deleted documents can be filtered out
120+
if (leafCtx.reader().hasDeletions() && hasSubAgg == false) return false;
119121

120122
PointValues values = leafCtx.reader().getPointValues(aggregatorBridge.fieldType.name());
121123
if (values == null) return false;

server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/PointTreeTraversal.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,13 @@ private static void intersectWithRanges(PointValues.IntersectVisitor visitor, Po
9191

9292
switch (r) {
9393
case CELL_INSIDE_QUERY:
94-
collector.countNode((int) pointTree.size());
9594
if (collector.hasSubAgg()) {
95+
// counter for top level agg is handled by sub agg collect
9696
pointTree.visitDocIDs(visitor);
9797
} else {
98+
// count node should be invoked only in absence of
99+
// sub agg to not include the delete documents
100+
collector.countNode((int) pointTree.size());
98101
collector.visitInner();
99102
}
100103
break;
@@ -128,9 +131,10 @@ public void visit(DocIdSetIterator iterator) throws IOException {
128131
@Override
129132
public void visit(int docID, byte[] packedValue) throws IOException {
130133
visitPoints(packedValue, () -> {
131-
collector.count();
132134
if (collector.hasSubAgg()) {
133135
collector.collectDocId(docID);
136+
} else {
137+
collector.count();
134138
}
135139
});
136140
}
@@ -140,9 +144,10 @@ public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOExcept
140144
visitPoints(packedValue, () -> {
141145
// note: iterator can only iterate once
142146
for (int doc = iterator.nextDoc(); doc != NO_MORE_DOCS; doc = iterator.nextDoc()) {
143-
collector.count();
144147
if (collector.hasSubAgg()) {
145148
collector.collectDocId(doc);
149+
} else {
150+
collector.count();
146151
}
147152
}
148153
});

server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.index.LeafReaderContext;
1414
import org.apache.lucene.search.DocIdSetIterator;
1515
import org.apache.lucene.util.BitDocIdSet;
16+
import org.apache.lucene.util.Bits;
1617
import org.apache.lucene.util.FixedBitSet;
1718
import org.opensearch.search.aggregations.BucketCollector;
1819
import org.opensearch.search.aggregations.LeafBucketCollector;
@@ -37,6 +38,7 @@ public class SubAggRangeCollector extends SimpleRangeCollector {
3738
private final BucketCollector collectableSubAggregators;
3839
private final LeafReaderContext leafCtx;
3940

41+
private final Bits liveDocs;
4042
private final FixedBitSet bitSet;
4143
private final BitDocIdSet bitDocIdSet;
4244

@@ -53,6 +55,7 @@ public SubAggRangeCollector(
5355
this.getBucketOrd = getBucketOrd;
5456
this.collectableSubAggregators = subAggCollectorParam.collectableSubAggregators();
5557
this.leafCtx = subAggCollectorParam.leafCtx();
58+
this.liveDocs = leafCtx.reader().getLiveDocs();
5659
int numDocs = leafCtx.reader().maxDoc();
5760
bitSet = new FixedBitSet(numDocs);
5861
bitDocIdSet = new BitDocIdSet(bitSet);
@@ -63,14 +66,38 @@ public boolean hasSubAgg() {
6366
return true;
6467
}
6568

69+
private boolean isDocLive(int docId) {
70+
return liveDocs == null || liveDocs.get(docId);
71+
}
72+
73+
@Override
74+
public void countNode(int count) {
75+
throw new UnsupportedOperationException("countNode should be unreachable");
76+
}
77+
78+
@Override
79+
public void count() {
80+
throw new UnsupportedOperationException("countNode should be unreachable");
81+
}
82+
6683
@Override
6784
public void collectDocId(int docId) {
68-
bitSet.set(docId);
85+
if (isDocLive(docId)) {
86+
counter++;
87+
bitSet.set(docId);
88+
}
6989
}
7090

7191
@Override
7292
public void collectDocIdSet(DocIdSetIterator iter) throws IOException {
73-
bitSet.or(iter);
93+
// Explicitly OR iter intoBitSet to filter out deleted docs
94+
iter.nextDoc();
95+
for (int doc = iter.docID(); doc < DocIdSetIterator.NO_MORE_DOCS; doc = iter.nextDoc()) {
96+
if (isDocLive(doc)) {
97+
counter++;
98+
bitSet.set(doc);
99+
}
100+
}
74101
}
75102

76103
@Override

server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010

1111
import org.apache.lucene.document.Field;
1212
import org.apache.lucene.document.LongField;
13+
import org.apache.lucene.document.LongPoint;
1314
import org.apache.lucene.index.DirectoryReader;
1415
import org.apache.lucene.index.IndexWriter;
1516
import org.apache.lucene.index.IndexWriterConfig;
17+
import org.apache.lucene.search.BooleanClause;
18+
import org.apache.lucene.search.BooleanQuery;
1619
import org.apache.lucene.search.IndexSearcher;
1720
import org.apache.lucene.search.MatchAllDocsQuery;
1821
import org.apache.lucene.search.Query;
@@ -74,6 +77,7 @@ public class FilterRewriteSubAggTests extends AggregatorTestCase {
7477
new TestDoc(2, Instant.parse("2020-03-01T01:00:00Z")),
7578
new TestDoc(3, Instant.parse("2020-03-01T02:00:00Z")),
7679
new TestDoc(4, Instant.parse("2020-03-01T03:00:00Z")),
80+
new TestDoc(4, Instant.parse("2020-03-01T04:00:00Z"), true),
7781
new TestDoc(5, Instant.parse("2020-03-01T04:00:00Z")),
7882
new TestDoc(6, Instant.parse("2020-03-01T04:00:00Z"))
7983
);
@@ -112,7 +116,7 @@ public void testDateHisto() throws IOException {
112116
dateFieldName
113117
).calendarInterval(DateHistogramInterval.HOUR).subAggregation(AggregationBuilders.stats(statsAggName).field(longFieldName));
114118

115-
InternalDateHistogram result = executeAggregation(DEFAULT_DATA, dateHistogramAggregationBuilder, true);
119+
InternalDateHistogram result = executeAggregation(DEFAULT_DATA, dateHistogramAggregationBuilder, false);
116120

117121
// Verify results
118122
List<? extends InternalDateHistogram.Bucket> buckets = result.getBuckets();
@@ -337,11 +341,35 @@ private Directory setupIndex(List<TestDoc> docs, boolean random) throws IOExcept
337341
for (TestDoc doc : docs) {
338342
indexWriter.addDocument(doc.toDocument());
339343
}
344+
345+
indexWriter.commit();
346+
}
347+
348+
try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig().setCodec(TestUtil.getDefaultCodec()))) {
349+
for (TestDoc doc : docs) {
350+
if (doc.deleted) {
351+
BooleanQuery.Builder booleanQueryBuilder = new BooleanQuery.Builder();
352+
booleanQueryBuilder.add(LongPoint.newRangeQuery(longFieldName, doc.metric, doc.metric), BooleanClause.Occur.MUST);
353+
booleanQueryBuilder.add(
354+
LongField.newRangeQuery(
355+
dateFieldName,
356+
dateFieldType.parse(doc.timestamp.toString()),
357+
dateFieldType.parse(doc.timestamp.toString())
358+
),
359+
BooleanClause.Occur.MUST
360+
);
361+
indexWriter.deleteDocuments(booleanQueryBuilder.build());
362+
}
363+
}
364+
365+
indexWriter.commit();
340366
}
341367
} else {
342368
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
343369
for (TestDoc doc : docs) {
344-
indexWriter.addDocument(doc.toDocument());
370+
if (!doc.deleted) {
371+
indexWriter.addDocument(doc.toDocument());
372+
}
345373
}
346374
}
347375
}
@@ -413,10 +441,16 @@ private InternalAggregation.ReduceContext createReduceContext(
413441
private class TestDoc {
414442
private final long metric;
415443
private final Instant timestamp;
444+
private final boolean deleted;
416445

417446
public TestDoc(long metric, Instant timestamp) {
447+
this(metric, timestamp, false);
448+
}
449+
450+
public TestDoc(long metric, Instant timestamp, boolean deleted) {
418451
this.metric = metric;
419452
this.timestamp = timestamp;
453+
this.deleted = deleted;
420454
}
421455

422456
public ParseContext.Document toDocument() {

0 commit comments

Comments
 (0)