diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java index 2f067a965bb1a..bed63d7d2ddcc 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.search.aggregations.bucket.nested; +import com.carrotsearch.hppc.IntArrayList; +import com.carrotsearch.hppc.IntObjectOpenHashMap; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.search.DocIdSet; import org.apache.lucene.search.DocIdSetIterator; @@ -46,9 +48,12 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo private DocIdSetIterator childDocs; private FixedBitSet parentDocs; - private AtomicReaderContext reader; + private FixedBitSet rootDocs; + private int currentRootDoc = -1; + private final IntObjectOpenHashMap childDocIdBuffers = new IntObjectOpenHashMap<>(); + public NestedAggregator(String name, AggregatorFactories factories, ObjectMapper objectMapper, AggregationContext aggregationContext, Aggregator parentAggregator) { super(name, factories, aggregationContext, parentAggregator); this.parentAggregator = parentAggregator; @@ -79,6 +84,7 @@ public void setNextReader(AtomicReaderContext reader) { } else { childDocs = childDocIdSet.iterator(); } + rootDocs = context.searchContext().fixedBitSetFilterCache().getFixedBitSetFilter(NonNestedDocsFilter.INSTANCE).getDocIdSet(reader, null); } catch (IOException ioe) { throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe); } @@ -109,22 +115,22 @@ public void collect(int parentDoc, long bucketOrd) throws IOException { parentDocs = parentFilter.getDocIdSet(reader, null); } - int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1); - int childDocId; - if (childDocs.docID() > prevParentDoc) { - childDocId = childDocs.docID(); - } else { - childDocId = childDocs.advance(prevParentDoc + 1); - } - int numChildren = 0; - for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) { + IntArrayList iterator = getChildren(parentDoc); + final int[] buffer = iterator.buffer; + final int size = iterator.size(); + for (int i = 0; i < size; i++) { numChildren++; - collectBucketNoCounts(childDocId, bucketOrd); + collectBucketNoCounts(buffer[i], bucketOrd); } incrementBucketDocCount(bucketOrd, numChildren); } + @Override + protected void doClose() { + childDocIdBuffers.clear(); + } + @Override public InternalAggregation buildAggregation(long owningBucketOrdinal) { return new InternalNested(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal)); @@ -183,4 +189,42 @@ public InternalAggregation buildEmptyAggregation() { } } } + + // The aggs framework can collect buckets for the same parent doc id more than once and because the children docs + // can only be consumed once we need to buffer the child docs. We only need to buffer child docs in the scope + // of the current root doc. + + // Examples: + // 1) nested agg wrapped is by terms agg and multiple buckets per document are emitted + // 2) Multiple nested fields are defined. A nested agg joins back to another nested agg via the reverse_nested agg. + // For each child in the first nested agg the second nested agg gets invoked with the same buckets / docids + private IntArrayList getChildren(final int parentDocId) throws IOException { + int rootDocId = rootDocs.nextSetBit(parentDocId); + if (currentRootDoc == rootDocId) { + final IntArrayList childDocIdBuffer = childDocIdBuffers.get(parentDocId); + if (childDocIdBuffer != null) { + return childDocIdBuffer; + } else { + // here we translate the parent doc to a list of its nested docs, + // and then collect buckets for every one of them so they'll be collected + final IntArrayList newChildDocIdBuffer = new IntArrayList(); + childDocIdBuffers.put(parentDocId, newChildDocIdBuffer); + int prevParentDoc = parentDocs.prevSetBit(parentDocId - 1); + int childDocId; + if (childDocs.docID() > prevParentDoc) { + childDocId = childDocs.docID(); + } else { + childDocId = childDocs.advance(prevParentDoc + 1); + } + for (; childDocId < parentDocId; childDocId = childDocs.nextDoc()) { + newChildDocIdBuffer.add(childDocId); + } + return newChildDocIdBuffer; + } + } else { + this.currentRootDoc = rootDocId; + childDocIdBuffers.clear(); + return getChildren(parentDocId); + } + } } diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java index 570d24da08e5b..b13738671bd63 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/NestedTests.java @@ -449,4 +449,90 @@ public void testParentFilterResolvedCorrectly() throws Exception { tags = nestedTags.getAggregations().get("tag"); assertThat(tags.getBuckets().size(), equalTo(0)); // and this must be empty } + + @Test + public void nestedSameDocIdProcessedMultipleTime() throws Exception { + assertAcked( + prepareCreate("idx4") + .setSettings(ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)) + .addMapping("product", "categories", "type=string", "name", "type=string", "property", "type=nested") + ); + + client().prepareIndex("idx4", "product", "1").setSource(jsonBuilder().startObject() + .field("name", "product1") + .field("categories", "1", "2", "3", "4") + .startArray("property") + .startObject().field("id", 1).endObject() + .startObject().field("id", 2).endObject() + .startObject().field("id", 3).endObject() + .endArray() + .endObject()).get(); + client().prepareIndex("idx4", "product", "2").setSource(jsonBuilder().startObject() + .field("name", "product2") + .field("categories", "1", "2") + .startArray("property") + .startObject().field("id", 1).endObject() + .startObject().field("id", 5).endObject() + .startObject().field("id", 4).endObject() + .endArray() + .endObject()).get(); + refresh(); + + SearchResponse response = client().prepareSearch("idx4").setTypes("product") + .addAggregation(terms("category").field("categories").subAggregation( + nested("property").path("property").subAggregation( + terms("property_id").field("property.id") + ) + )) + .get(); + assertNoFailures(response); + assertHitCount(response, 2); + + Terms category = response.getAggregations().get("category"); + assertThat(category.getBuckets().size(), equalTo(4)); + + Terms.Bucket bucket = category.getBucketByKey("1"); + assertThat(bucket.getDocCount(), equalTo(2l)); + Nested property = bucket.getAggregations().get("property"); + assertThat(property.getDocCount(), equalTo(6l)); + Terms propertyId = property.getAggregations().get("property_id"); + assertThat(propertyId.getBuckets().size(), equalTo(5)); + assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l)); + assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l)); + + bucket = category.getBucketByKey("2"); + assertThat(bucket.getDocCount(), equalTo(2l)); + property = bucket.getAggregations().get("property"); + assertThat(property.getDocCount(), equalTo(6l)); + propertyId = property.getAggregations().get("property_id"); + assertThat(propertyId.getBuckets().size(), equalTo(5)); + assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l)); + assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l)); + + bucket = category.getBucketByKey("3"); + assertThat(bucket.getDocCount(), equalTo(1l)); + property = bucket.getAggregations().get("property"); + assertThat(property.getDocCount(), equalTo(3l)); + propertyId = property.getAggregations().get("property_id"); + assertThat(propertyId.getBuckets().size(), equalTo(3)); + assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l)); + + bucket = category.getBucketByKey("4"); + assertThat(bucket.getDocCount(), equalTo(1l)); + property = bucket.getAggregations().get("property"); + assertThat(property.getDocCount(), equalTo(3l)); + propertyId = property.getAggregations().get("property_id"); + assertThat(propertyId.getBuckets().size(), equalTo(3)); + assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l)); + assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l)); + } }