Skip to content

Commit

Permalink
Remove aggregation's postCollect phase
Browse files Browse the repository at this point in the history
After elastic#63811 it became clear to me that `postCollect` is kind of
dangerous and not all that useful. So this removes it.

The trouble with `postCollect` is that it all happened right after we
finished calling `collect` on the `LeafBucketCollectors` but before we
built the aggregation results. But in elastic#63811 we found out that we can't
call `postCollect` on the children of `parent` or `child` aggregators
until we know which *which* aggregation results we're building.

So this removes `postCollect` and moves all of the things we did at
post-collect phase into `buildAggregations` or into hooks called in
those methods.
  • Loading branch information
nik9000 committed Oct 21, 2020
1 parent fc96ee9 commit 31bbf53
Show file tree
Hide file tree
Showing 41 changed files with 32 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException {
}

@Override
public void postCollection() throws IOException {
// Delaying until beforeBuildingBuckets
}

@Override
protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {
protected void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException {
IndexReader indexReader = context().searcher().getIndexReader();
for (LeafReaderContext ctx : indexReader.leaves()) {
Scorer childDocsScorer = outFilter.scorer(ctx);
Expand Down Expand Up @@ -160,14 +155,13 @@ public int docID() {
* structure that maps a primitive long to a list of primitive
* longs.
*/
for (long owningBucketOrd: ordsToCollect) {
if (collectionStrategy.exists(owningBucketOrd, globalOrdinal)) {
collectBucket(sub, docId, owningBucketOrd);
for (long o: bucketOrdsToCollect) {
if (collectionStrategy.exists(o, globalOrdinal)) {
collectBucket(sub, docId, o);
}
}
}
}
super.postCollection(); // Run post collection after collecting the sub-aggs
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,5 @@ public ScoreMode scoreMode() {

@Override
public void preCollection() throws IOException {}

@Override
public void postCollection() throws IOException {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,18 @@
public class AggregationProfilerIT extends ESIntegTestCase {
private static final String BUILD_LEAF_COLLECTOR = AggregationTimingType.BUILD_LEAF_COLLECTOR.toString();
private static final String COLLECT = AggregationTimingType.COLLECT.toString();
private static final String POST_COLLECTION = AggregationTimingType.POST_COLLECTION.toString();
private static final String INITIALIZE = AggregationTimingType.INITIALIZE.toString();
private static final String BUILD_AGGREGATION = AggregationTimingType.BUILD_AGGREGATION.toString();
private static final String REDUCE = AggregationTimingType.REDUCE.toString();
private static final Set<String> BREAKDOWN_KEYS = Set.of(
INITIALIZE,
BUILD_LEAF_COLLECTOR,
COLLECT,
POST_COLLECTION,
BUILD_AGGREGATION,
REDUCE,
INITIALIZE + "_count",
BUILD_LEAF_COLLECTOR + "_count",
COLLECT + "_count",
POST_COLLECTION + "_count",
BUILD_AGGREGATION + "_count",
REDUCE + "_count"
);
Expand Down Expand Up @@ -330,7 +327,6 @@ public void testDiversifiedAggProfile() {
assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(diversifyBreakdown.get(COLLECT), greaterThan(0L));
assertThat(diversifyBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(diversifyBreakdown.get(REDUCE), equalTo(0L));
assertThat(diversifyAggResult.getDebugInfo(), equalTo(Map.of(DEFERRED, List.of("max"))));
Expand All @@ -347,7 +343,6 @@ public void testDiversifiedAggProfile() {
assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(diversifyBreakdown.get(COLLECT), greaterThan(0L));
assertThat(diversifyBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand Down Expand Up @@ -391,7 +386,6 @@ public void testComplexProfile() {
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(histoBreakdown.get(COLLECT), greaterThan(0L));
assertThat(histoBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(histoBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(histoBreakdown.get(REDUCE), equalTo(0L));
Map<String, Object> histoDebugInfo = histoAggResult.getDebugInfo();
Expand All @@ -413,7 +407,6 @@ public void testComplexProfile() {
assertThat(tagsBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(tagsBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(tagsBreakdown.get(REDUCE), equalTo(0L));
assertRemapTermsDebugInfo(tagsAggResult);
Expand All @@ -432,7 +425,6 @@ public void testComplexProfile() {
assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(avgBreakdown.get(COLLECT), greaterThan(0L));
assertThat(avgBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(avgBreakdown.get(REDUCE), equalTo(0L));
assertThat(avgAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -448,7 +440,6 @@ public void testComplexProfile() {
assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(maxBreakdown.get(COLLECT), greaterThan(0L));
assertThat(maxBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -464,7 +455,6 @@ public void testComplexProfile() {
assertThat(stringsBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(stringsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(stringsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(stringsBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(stringsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(stringsBreakdown.get(REDUCE), equalTo(0L));
assertRemapTermsDebugInfo(stringsAggResult);
Expand All @@ -483,7 +473,6 @@ public void testComplexProfile() {
assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(avgBreakdown.get(COLLECT), greaterThan(0L));
assertThat(avgBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(avgBreakdown.get(REDUCE), equalTo(0L));
assertThat(avgAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -499,7 +488,6 @@ public void testComplexProfile() {
assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(maxBreakdown.get(COLLECT), greaterThan(0L));
assertThat(maxBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -516,7 +504,6 @@ public void testComplexProfile() {
assertThat(tagsBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(tagsBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(tagsBreakdown.get(REDUCE), equalTo(0L));
assertRemapTermsDebugInfo(tagsAggResult);
Expand All @@ -535,7 +522,6 @@ public void testComplexProfile() {
assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(avgBreakdown.get(COLLECT), greaterThan(0L));
assertThat(avgBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(avgBreakdown.get(REDUCE), equalTo(0L));
assertThat(avgAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -551,7 +537,6 @@ public void testComplexProfile() {
assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(maxBreakdown.get(COLLECT), greaterThan(0L));
assertThat(maxBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ public void execute(SearchContext context) {
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : context.aggregations().aggregators()) {
try {
aggregator.postCollection();
aggregations.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ public interface BucketComparator {

/**
* Build the results of this aggregation.
* @param owningBucketOrds the ordinals of the buckets that we want to
* @param ordsToCollect the ordinals of the buckets that we want to
* collect from this aggregation
* @return the results for each ordinal, in the same order as the array
* of ordinals
*/
public abstract InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException;
public abstract InternalAggregation[] buildAggregations(long[] ordsToCollect) throws IOException;

/**
* Build the result of this aggregation if it is at the "top level"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@ public void preCollection() throws IOException {
badState();
}

@Override
public void postCollection() throws IOException {
badState();
}
@Override
public ScoreMode scoreMode() {
badState();
Expand Down Expand Up @@ -248,19 +244,6 @@ public SearchContext context() {
return context;
}

/**
* Called after collection of all document is done.
* <p>
* Warning: this is not final only to allow the parent join aggregator
* to delay this until building buckets.
*/
@Override
public void postCollection() throws IOException {
// post-collect this agg before subs to make it possible to buffer and then replay in postCollection()
doPostCollection();
collectableSubAggregators.postCollection();
}

/** Called upon release of the aggregator. */
@Override
public void close() {
Expand All @@ -274,12 +257,6 @@ public void close() {
/** Release instance-specific data. */
protected void doClose() {}

/**
* Can be overridden by aggregator implementation to be called back when the collection phase ends.
*/
protected void doPostCollection() throws IOException {
}

protected final InternalAggregations buildEmptySubAggregations() {
List<InternalAggregation> aggs = new ArrayList<>();
for (Aggregator aggregator : subAggregators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ public void preCollection() throws IOException {
// no-op
}
@Override
public void postCollection() throws IOException {
// no-op
}
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
Expand All @@ -58,10 +54,4 @@ public ScoreMode scoreMode() {
* Pre collection callback.
*/
public abstract void preCollection() throws IOException;

/**
* Post-collection callback.
*/
public abstract void postCollection() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,6 @@ public void preCollection() throws IOException {
}
}

@Override
public void postCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.postCollection();
}
}

@Override
public String toString() {
return Arrays.toString(collectors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ static class Entry {
protected PackedLongValues.Builder docDeltasBuilder;
protected PackedLongValues.Builder bucketsBuilder;
protected long maxBucket = -1;
protected boolean finished = false;
protected LongHash selectedBuckets;

/**
Expand Down Expand Up @@ -136,20 +135,12 @@ public void preCollection() throws IOException {
collector.preCollection();
}

@Override
public void postCollection() throws IOException {
finishLeaf();
finished = true;
}

/**
* Replay the wrapped collector, but only on a selection of buckets.
*/
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
if (finished == false) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}
finishLeaf();
if (this.selectedBuckets != null) {
throw new IllegalStateException("Already been replayed");
}
Expand Down Expand Up @@ -201,7 +192,6 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
// continue with the following leaf
}
}
collector.postCollection();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ public final int bucketDocCount(long bucketOrd) {
}

/**
* Hook to allow taking an action before building buckets.
* Hook to allow taking an action before building child agg result.
*/
protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {}
protected void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException {}

/**
* Build the results of the sub-aggregations of the buckets at each of
Expand All @@ -186,7 +186,7 @@ protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {}
* array of ordinals
*/
protected final InternalAggregations[] buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException {
beforeBuildingBuckets(bucketOrdsToCollect);
prepareSubAggs(bucketOrdsToCollect);
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
for (int i = 0; i < subAggregators.length; i++) {
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.internal.SearchContext;

Expand Down Expand Up @@ -91,9 +91,9 @@ protected boolean shouldDefer(Aggregator aggregator) {
}

@Override
protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {
protected final void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException {
if (recordingWrapper != null) {
recordingWrapper.prepareSelectedBuckets(ordsToCollect);
recordingWrapper.prepareSelectedBuckets(bucketOrdsToCollect);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,6 @@ public void preCollection() throws IOException {
"Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
}

@Override
public void postCollection() throws IOException {
throw new IllegalStateException(
"Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
}

@Override
public Aggregator resolveSortPath(PathElement next, Iterator<PathElement> path) {
return in.resolveSortPath(next, path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,11 @@ protected void doPreCollection() throws IOException {
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
}

@Override
protected void doPostCollection() throws IOException {
finishLeaf();
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
// Composite aggregator must be at the top of the aggregation tree
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
finishLeaf();
if (deferredCollectors != NO_OP_COLLECTOR) {
// Replay all documents that contain at least one top bucket (collected during the first pass).
runDeferredCollections();
Expand Down Expand Up @@ -487,7 +483,6 @@ private void runDeferredCollections() throws IOException {
collector.collect(docID);
}
}
deferredCollectors.postCollection();
}

/**
Expand Down
Loading

0 comments on commit 31bbf53

Please sign in to comment.