Skip to content

Commit 6e09aea

Browse files
committed
Nested aggregator: Fix handling of multiple buckets being emitted for the same parent doc id.
This bug was introduced by #8454 which allowed the childFilter to only be consumed once. By adding the child docid buffering multiple buckets can now be emitted by the same doc id. This child docid buffering only happens in the scope of the current root document, so the amount of child doc ids buffered is small. Closes #9317 Closes #9346
1 parent 2893afc commit 6e09aea

File tree

2 files changed

+141
-11
lines changed

2 files changed

+141
-11
lines changed

src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.elasticsearch.search.aggregations.bucket.nested;
2020

21+
import com.carrotsearch.hppc.IntArrayList;
22+
import com.carrotsearch.hppc.IntObjectOpenHashMap;
2123
import org.apache.lucene.index.AtomicReaderContext;
2224
import org.apache.lucene.search.DocIdSet;
2325
import org.apache.lucene.search.DocIdSetIterator;
@@ -46,9 +48,12 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
4648

4749
private DocIdSetIterator childDocs;
4850
private FixedBitSet parentDocs;
49-
5051
private AtomicReaderContext reader;
5152

53+
private FixedBitSet rootDocs;
54+
private int currentRootDoc = -1;
55+
private final IntObjectOpenHashMap<IntArrayList> childDocIdBuffers = new IntObjectOpenHashMap<>();
56+
5257
public NestedAggregator(String name, AggregatorFactories factories, ObjectMapper objectMapper, AggregationContext aggregationContext, Aggregator parentAggregator) {
5358
super(name, factories, aggregationContext, parentAggregator);
5459
this.parentAggregator = parentAggregator;
@@ -79,6 +84,7 @@ public void setNextReader(AtomicReaderContext reader) {
7984
} else {
8085
childDocs = childDocIdSet.iterator();
8186
}
87+
rootDocs = context.searchContext().fixedBitSetFilterCache().getFixedBitSetFilter(NonNestedDocsFilter.INSTANCE).getDocIdSet(reader, null);
8288
} catch (IOException ioe) {
8389
throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe);
8490
}
@@ -109,22 +115,22 @@ public void collect(int parentDoc, long bucketOrd) throws IOException {
109115
parentDocs = parentFilter.getDocIdSet(reader, null);
110116
}
111117

112-
int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
113-
int childDocId;
114-
if (childDocs.docID() > prevParentDoc) {
115-
childDocId = childDocs.docID();
116-
} else {
117-
childDocId = childDocs.advance(prevParentDoc + 1);
118-
}
119-
120118
int numChildren = 0;
121-
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
119+
IntArrayList iterator = getChildren(parentDoc);
120+
final int[] buffer = iterator.buffer;
121+
final int size = iterator.size();
122+
for (int i = 0; i < size; i++) {
122123
numChildren++;
123-
collectBucketNoCounts(childDocId, bucketOrd);
124+
collectBucketNoCounts(buffer[i], bucketOrd);
124125
}
125126
incrementBucketDocCount(bucketOrd, numChildren);
126127
}
127128

129+
@Override
130+
protected void doClose() {
131+
childDocIdBuffers.clear();
132+
}
133+
128134
@Override
129135
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
130136
return new InternalNested(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal));
@@ -183,4 +189,42 @@ public InternalAggregation buildEmptyAggregation() {
183189
}
184190
}
185191
}
192+
193+
// The aggs framework can collect buckets for the same parent doc id more than once and because the children docs
194+
// can only be consumed once we need to buffer the child docs. We only need to buffer child docs in the scope
195+
// of the current root doc.
196+
197+
// Examples:
198+
// 1) nested agg wrapped is by terms agg and multiple buckets per document are emitted
199+
// 2) Multiple nested fields are defined. A nested agg joins back to another nested agg via the reverse_nested agg.
200+
// For each child in the first nested agg the second nested agg gets invoked with the same buckets / docids
201+
private IntArrayList getChildren(final int parentDocId) throws IOException {
202+
int rootDocId = rootDocs.nextSetBit(parentDocId);
203+
if (currentRootDoc == rootDocId) {
204+
final IntArrayList childDocIdBuffer = childDocIdBuffers.get(parentDocId);
205+
if (childDocIdBuffer != null) {
206+
return childDocIdBuffer;
207+
} else {
208+
// here we translate the parent doc to a list of its nested docs,
209+
// and then collect buckets for every one of them so they'll be collected
210+
final IntArrayList newChildDocIdBuffer = new IntArrayList();
211+
childDocIdBuffers.put(parentDocId, newChildDocIdBuffer);
212+
int prevParentDoc = parentDocs.prevSetBit(parentDocId - 1);
213+
int childDocId;
214+
if (childDocs.docID() > prevParentDoc) {
215+
childDocId = childDocs.docID();
216+
} else {
217+
childDocId = childDocs.advance(prevParentDoc + 1);
218+
}
219+
for (; childDocId < parentDocId; childDocId = childDocs.nextDoc()) {
220+
newChildDocIdBuffer.add(childDocId);
221+
}
222+
return newChildDocIdBuffer;
223+
}
224+
} else {
225+
this.currentRootDoc = rootDocId;
226+
childDocIdBuffers.clear();
227+
return getChildren(parentDocId);
228+
}
229+
}
186230
}

src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,4 +449,90 @@ public void testParentFilterResolvedCorrectly() throws Exception {
449449
tags = nestedTags.getAggregations().get("tag");
450450
assertThat(tags.getBuckets().size(), equalTo(0)); // and this must be empty
451451
}
452+
453+
@Test
454+
public void nestedSameDocIdProcessedMultipleTime() throws Exception {
455+
assertAcked(
456+
prepareCreate("idx4")
457+
.setSettings(ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
458+
.addMapping("product", "categories", "type=string", "name", "type=string", "property", "type=nested")
459+
);
460+
461+
client().prepareIndex("idx4", "product", "1").setSource(jsonBuilder().startObject()
462+
.field("name", "product1")
463+
.field("categories", "1", "2", "3", "4")
464+
.startArray("property")
465+
.startObject().field("id", 1).endObject()
466+
.startObject().field("id", 2).endObject()
467+
.startObject().field("id", 3).endObject()
468+
.endArray()
469+
.endObject()).get();
470+
client().prepareIndex("idx4", "product", "2").setSource(jsonBuilder().startObject()
471+
.field("name", "product2")
472+
.field("categories", "1", "2")
473+
.startArray("property")
474+
.startObject().field("id", 1).endObject()
475+
.startObject().field("id", 5).endObject()
476+
.startObject().field("id", 4).endObject()
477+
.endArray()
478+
.endObject()).get();
479+
refresh();
480+
481+
SearchResponse response = client().prepareSearch("idx4").setTypes("product")
482+
.addAggregation(terms("category").field("categories").subAggregation(
483+
nested("property").path("property").subAggregation(
484+
terms("property_id").field("property.id")
485+
)
486+
))
487+
.get();
488+
assertNoFailures(response);
489+
assertHitCount(response, 2);
490+
491+
Terms category = response.getAggregations().get("category");
492+
assertThat(category.getBuckets().size(), equalTo(4));
493+
494+
Terms.Bucket bucket = category.getBucketByKey("1");
495+
assertThat(bucket.getDocCount(), equalTo(2l));
496+
Nested property = bucket.getAggregations().get("property");
497+
assertThat(property.getDocCount(), equalTo(6l));
498+
Terms propertyId = property.getAggregations().get("property_id");
499+
assertThat(propertyId.getBuckets().size(), equalTo(5));
500+
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l));
501+
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
502+
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
503+
assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l));
504+
assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l));
505+
506+
bucket = category.getBucketByKey("2");
507+
assertThat(bucket.getDocCount(), equalTo(2l));
508+
property = bucket.getAggregations().get("property");
509+
assertThat(property.getDocCount(), equalTo(6l));
510+
propertyId = property.getAggregations().get("property_id");
511+
assertThat(propertyId.getBuckets().size(), equalTo(5));
512+
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l));
513+
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
514+
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
515+
assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l));
516+
assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l));
517+
518+
bucket = category.getBucketByKey("3");
519+
assertThat(bucket.getDocCount(), equalTo(1l));
520+
property = bucket.getAggregations().get("property");
521+
assertThat(property.getDocCount(), equalTo(3l));
522+
propertyId = property.getAggregations().get("property_id");
523+
assertThat(propertyId.getBuckets().size(), equalTo(3));
524+
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l));
525+
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
526+
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
527+
528+
bucket = category.getBucketByKey("4");
529+
assertThat(bucket.getDocCount(), equalTo(1l));
530+
property = bucket.getAggregations().get("property");
531+
assertThat(property.getDocCount(), equalTo(3l));
532+
propertyId = property.getAggregations().get("property_id");
533+
assertThat(propertyId.getBuckets().size(), equalTo(3));
534+
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l));
535+
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
536+
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
537+
}
452538
}

0 commit comments

Comments
 (0)