From fc64e4ff6fc172e2445290b411bbec7e41806ea8 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 19 Jan 2015 01:33:49 +0100 Subject: [PATCH] Nested aggregator: Fix handling of multiple buckets being emitted for the same parent doc id. This bug was introduced by #8454 which allowed the childFilter to only be consumed once. By adding the child docid buffering multiple buckets can now be emitted by the same doc id. This child docid buffering only happens in the scope of the current root document, so the amount of child doc ids buffered is small. Closes #9317 Closes #9346 --- .../bucket/nested/NestedAggregator.java | 66 +++++++++++--- .../aggregations/bucket/NestedTests.java | 86 +++++++++++++++++++ 2 files changed, 141 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java index 2f067a965bb1a..bed63d7d2ddcc 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.search.aggregations.bucket.nested; +import com.carrotsearch.hppc.IntArrayList; +import com.carrotsearch.hppc.IntObjectOpenHashMap; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.search.DocIdSet; import org.apache.lucene.search.DocIdSetIterator; @@ -46,9 +48,12 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo private DocIdSetIterator childDocs; private FixedBitSet parentDocs; - private AtomicReaderContext reader; + private FixedBitSet rootDocs; + private int currentRootDoc = -1; + private final IntObjectOpenHashMap childDocIdBuffers = new IntObjectOpenHashMap<>(); + public NestedAggregator(String name, AggregatorFactories factories, ObjectMapper objectMapper, AggregationContext aggregationContext, Aggregator parentAggregator) { super(name, factories, aggregationContext, parentAggregator); this.parentAggregator = parentAggregator; @@ -79,6 +84,7 @@ public void setNextReader(AtomicReaderContext reader) { } else { childDocs = childDocIdSet.iterator(); } + rootDocs = context.searchContext().fixedBitSetFilterCache().getFixedBitSetFilter(NonNestedDocsFilter.INSTANCE).getDocIdSet(reader, null); } catch (IOException ioe) { throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe); } @@ -109,22 +115,22 @@ public void collect(int parentDoc, long bucketOrd) throws IOException { parentDocs = parentFilter.getDocIdSet(reader, null); } - int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1); - int childDocId; - if (childDocs.docID() > prevParentDoc) { - childDocId = childDocs.docID(); - } else { - childDocId = childDocs.advance(prevParentDoc + 1); - } - int numChildren = 0; - for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) { + IntArrayList iterator = getChildren(parentDoc); + final int[] buffer = iterator.buffer; + final int size = iterator.size(); + for (int i = 0; i < size; i++) { numChildren++; - collectBucketNoCounts(childDocId, bucketOrd); + collectBucketNoCounts(buffer[i], bucketOrd); } incrementBucketDocCount(bucketOrd, numChildren); } + @Override + protected void doClose() { + childDocIdBuffers.clear(); + } + @Override public InternalAggregation buildAggregation(long owningBucketOrdinal) { return new InternalNested(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal)); @@ -183,4 +189,42 @@ public InternalAggregation buildEmptyAggregation() { } } } + + // The aggs framework can collect buckets for the same parent doc id more than once and because the children docs + // can only be consumed once we need to buffer the child docs. We only need to buffer child docs in the scope + // of the current root doc. + + // Examples: + // 1) nested agg wrapped is by terms agg and multiple buckets per document are emitted + // 2) Multiple nested fields are defined. A nested agg joins back to another nested agg via the reverse_nested agg. + // For each child in the first nested agg the second nested agg gets invoked with the same buckets / docids + private IntArrayList getChildren(final int parentDocId) throws IOException { + int rootDocId = rootDocs.nextSetBit(parentDocId); + if (currentRootDoc == rootDocId) { + final IntArrayList childDocIdBuffer = childDocIdBuffers.get(parentDocId); + if (childDocIdBuffer != null) { + return childDocIdBuffer; + } else { + // here we translate the parent doc to a list of its nested docs, + // and then collect buckets for every one of them so they'll be collected + final IntArrayList newChildDocIdBuffer = new IntArrayList(); + childDocIdBuffers.put(parentDocId, newChildDocIdBuffer); + int prevParentDoc = parentDocs.prevSetBit(parentDocId - 1); + int childDocId; + if (childDocs.docID() > prevParentDoc) { + childDocId = childDocs.docID(); + } else { + childDocId = childDocs.advance(prevParentDoc + 1); + } + for (; childDocId < parentDocId; childDocId = childDocs.nextDoc()) { + newChildDocIdBuffer.add(childDocId); + } + return newChildDocIdBuffer; + } + } else { + this.currentRootDoc = rootDocId; + childDocIdBuffers.clear(); + return getChildren(parentDocId); + } + } } diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java index 570d24da08e5b..b13738671bd63 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java @@ -449,4 +449,90 @@ public void testParentFilterResolvedCorrectly() throws Exception { tags = nestedTags.getAggregations().get("tag"); assertThat(tags.getBuckets().size(), equalTo(0)); // and this must be empty } + + @Test + public void nestedSameDocIdProcessedMultipleTime() throws Exception { + assertAcked( + prepareCreate("idx4") + .setSettings(ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)) + .addMapping("product", "categories", "type=string", "name", "type=string", "property", "type=nested") + ); + + client().prepareIndex("idx4", "product", "1").setSource(jsonBuilder().startObject() + .field("name", "product1") + .field("categories", "1", "2", "3", "4") + .startArray("property") + .startObject().field("id", 1).endObject() + .startObject().field("id", 2).endObject() + .startObject().field("id", 3).endObject() + .endArray() + .endObject()).get(); + client().prepareIndex("idx4", "product", "2").setSource(jsonBuilder().startObject() + .field("name", "product2") + .field("categories", "1", "2") + .startArray("property") + .startObject().field("id", 1).endObject() + .startObject().field("id", 5).endObject() + .startObject().field("id", 4).endObject() + .endArray() + .endObject()).get(); + refresh(); + + SearchResponse response = client().prepareSearch("idx4").setTypes("product") + .addAggregation(terms("category").field("categories").subAggregation( + nested("property").path("property").subAggregation( + terms("property_id").field("property.id") + ) + )) + .get(); + assertNoFailures(response); + assertHitCount(response, 2); + + Terms category = response.getAggregations().get("category"); + assertThat(category.getBuckets().size(), equalTo(4)); + + Terms.Bucket bucket = category.getBucketByKey("1"); + assertThat(bucket.getDocCount(), equalTo(2l)); + Nested property = bucket.getAggregations().get("property"); + assertThat(property.getDocCount(), equalTo(6l)); + Terms propertyId = property.getAggregations().get("property_id"); + assertThat(propertyId.getBuckets().size(), equalTo(5)); + assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l)); + assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l)); + + bucket = category.getBucketByKey("2"); + assertThat(bucket.getDocCount(), equalTo(2l)); + property = bucket.getAggregations().get("property"); + assertThat(property.getDocCount(), equalTo(6l)); + propertyId = property.getAggregations().get("property_id"); + assertThat(propertyId.getBuckets().size(), equalTo(5)); + assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l)); + assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l)); + + bucket = category.getBucketByKey("3"); + assertThat(bucket.getDocCount(), equalTo(1l)); + property = bucket.getAggregations().get("property"); + assertThat(property.getDocCount(), equalTo(3l)); + propertyId = property.getAggregations().get("property_id"); + assertThat(propertyId.getBuckets().size(), equalTo(3)); + assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l)); + + bucket = category.getBucketByKey("4"); + assertThat(bucket.getDocCount(), equalTo(1l)); + property = bucket.getAggregations().get("property"); + assertThat(property.getDocCount(), equalTo(3l)); + propertyId = property.getAggregations().get("property_id"); + assertThat(propertyId.getBuckets().size(), equalTo(3)); + assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l)); + } }