Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple pipeline reductions from final agg reduction #45796

Merged
merged 11 commits into from
Dec 5, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testReduceRandom() {
MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(bigArrays, mockScriptService, true);
InternalMatrixStats reduced = (InternalMatrixStats) shardResults.get(0).reduce(shardResults, context);
InternalMatrixStats reduced = (InternalMatrixStats) shardResults.get(0).doReduce(shardResults, context);
multiPassStats.assertNearlyEqual(reduced.getResults());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
}
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
InternalAggregations.reduce(aggregationsList, reduceContext);
InternalAggregations.topLevelReduce(aggregationsList, reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
reducedCompletionSuggestions);
Expand Down Expand Up @@ -617,7 +617,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == bufferSize) {
if (hasAggs) {
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ SearchResponse getMergedResponse(Clusters clusters) {
SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats);
setSuggestShardIndex(shards, groupedSuggestions);
Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true));
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContextFunction.apply(true));
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
//make failures ordering consistent between ordinary search and CCS by looking at the shard they come from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,24 @@ public String getName() {
return name;
}

/**
* Creates the output from all pipeline aggs that this aggregation is associated with. Should only
* be called after all aggregations have been fully reduced
*/
public InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
assert reduceContext.isFinalReduce();
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
reducedAggs = pipelineAggregator.reduce(reducedAggs, reduceContext);
}
return reducedAggs;
}

/**
* Reduces the given aggregations to a single one and returns it. In <b>most</b> cases, the assumption will be the all given
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
* try reusing an existing instance (typically the first in the given list) to save on redundant object
* construction.
*/
public final InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
InternalAggregation aggResult = doReduce(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
aggResult = pipelineAggregator.reduce(aggResult, reduceContext);
}
}
return aggResult;
}

public abstract InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* An internal implementation of {@link Aggregations}.
Expand Down Expand Up @@ -91,10 +92,47 @@ public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
return topLevelPipelineAggregators;
}

@SuppressWarnings("unchecked")
private List<InternalAggregation> getInternalAggregations() {
return (List<InternalAggregation>) aggregations;
}

/**
* Begin the reduction process. This should be the entry point for the "first" reduction, e.g. called by
* SearchPhaseController or anywhere else that wants to initiate a reduction. It _should not_ be called
* as an intermediate reduction step (e.g. in the middle of an aggregation tree).
*
* This method first reduces the aggregations, and if it is the final reduce, then reduce the pipeline
* aggregations (both embedded parent/sibling as well as top-level sibling pipelines)
*/
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
InternalAggregations reduced = reduce(aggregationsList, context);
if (reduced == null) {
return null;
}

if (context.isFinalReduce()) {
List<InternalAggregation> reducedInternalAggs = reduced.getInternalAggregations();
reducedInternalAggs = reducedInternalAggs.stream()
.map(agg -> agg.reducePipelines(agg, context))
.collect(Collectors.toList());

List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
InternalAggregation newAgg
= pipelineAggregator.doReduce(new InternalAggregations(reducedInternalAggs), context);
reducedInternalAggs.add(newAgg);
}
return new InternalAggregations(reducedInternalAggs);
}
return reduced;
}

/**
* Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first
* {@link InternalAggregations} object found in the list.
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
* Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled
* separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)}
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
if (aggregationsList.isEmpty()) {
Expand All @@ -120,16 +158,9 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
// If all aggs are unmapped, the agg that leads the reduction will just return itself
aggregations.sort(INTERNAL_AGG_COMPARATOR);
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(aggregations, context));
reducedAggregations.add(first.doReduce(aggregations, context));
}

if (context.isFinalReduce()) {
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context);
reducedAggregations.add(newAgg);
}
return new InternalAggregations(reducedAggregations);
}
return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -73,7 +74,7 @@ protected InternalMultiBucketAggregation(StreamInput in) throws IOException {
protected abstract B reduceBucket(List<B> buckets, ReduceContext context);

@Override
public abstract List<? extends InternalBucket> getBuckets();
public abstract List<B> getBuckets();

@Override
public Object getProperty(List<String> path) {
Expand Down Expand Up @@ -141,6 +142,30 @@ public static int countInnerBucket(Aggregation agg) {
return size;
}

/**
* Unlike {@link InternalAggregation#reducePipelines(InternalAggregation, ReduceContext)}, a multi-bucket
* agg needs to first reduce the buckets (and their parent pipelines) before allowing sibling pipelines
* to materialize
*/
@Override
public final InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
assert reduceContext.isFinalReduce();
List<B> materializedBuckets = reducePipelineBuckets(reduceContext);
return super.reducePipelines(create(materializedBuckets), reduceContext);
}

private List<B> reducePipelineBuckets(ReduceContext reduceContext) {
List<B> reducedBuckets = new ArrayList<>();
for (B bucket : getBuckets()) {
List<InternalAggregation> aggs = new ArrayList<>();
for (Aggregation agg : bucket.getAggregations()) {
aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext));
}
reducedBuckets.add(createBucket(new InternalAggregations(aggs), bucket));
}
return reducedBuckets;
}

public abstract static class InternalBucket implements Bucket, Writeable {

public Object getProperty(String containingAggName, List<String> path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
/**
* An aggregation service that creates instances of {@link MultiBucketConsumer}.
* The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
* in {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
* in {@link Aggregator#buildAggregation} and {@link InternalAggregation#doReduce}.
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
*/
public class MultiBucketConsumerService {
Expand Down Expand Up @@ -90,7 +90,7 @@ protected void metadataToXContent(XContentBuilder builder, Params params) throws
* An {@link IntConsumer} that throws a {@link TooManyBucketsException}
* when the sum of the provided values is above the limit (`search.max_buckets`).
* It is used by aggregators to limit the number of bucket creation during
* {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
* {@link Aggregator#buildAggregation} and {@link InternalAggregation#doReduce}.
*/
public static class MultiBucketConsumer implements IntConsumer {
private final int limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
}

if (spare == null) {
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
}
spare.bucketOrd = bucketOrd;
copy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public long getSubsetSize() {
return subsetSize;
}

// TODO we should refactor to remove this, since buckets should be immutable after they are generated.
// This can lead to confusing bugs if the bucket is re-created (via createBucket() or similar) without
// the score
void updateScore(SignificanceHeuristic significanceHeuristic) {
score = significanceHeuristic.getScore(subsetDf, subsetSize, supersetDf, supersetSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,9 @@ static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
long term;

Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations,
DocValueFormat format) {
DocValueFormat format, double score) {
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
this.term = term;
}

Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations,
double score) {
this(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, null);
this.score = score;
}

Expand Down Expand Up @@ -134,7 +129,7 @@ public SignificantLongTerms create(List<SignificantLongTerms.Bucket> buckets) {
@Override
public Bucket createBucket(InternalAggregations aggregations, SignificantLongTerms.Bucket prototype) {
return new Bucket(prototype.subsetDf, prototype.subsetSize, prototype.supersetDf, prototype.supersetSize, prototype.term,
aggregations, prototype.format);
aggregations, prototype.format, prototype.score);
}

@Override
Expand All @@ -151,6 +146,6 @@ protected Bucket[] createBucketsArray(int size) {
@Override
Bucket createBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize,
InternalAggregations aggregations, SignificantLongTerms.Bucket prototype) {
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, prototype.term, aggregations, format);
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, prototype.term, aggregations, format, prototype.score);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public SignificantLongTerms buildAggregation(long owningBucketOrdinal) throws IO
continue;
}
if (spare == null) {
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format);
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format, 0);
}
spare.term = bucketOrds.get(i);
spare.subsetDf = docCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ public static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
BytesRef termBytes;

public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations,
DocValueFormat format) {
DocValueFormat format, double score) {
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
this.termBytes = term;
this.score = score;
}

/**
Expand All @@ -69,12 +70,6 @@ public void writeTo(StreamOutput out) throws IOException {
aggregations.writeTo(out);
}

public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize,
InternalAggregations aggregations, double score, DocValueFormat format) {
this(term, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
this.score = score;
}

@Override
public Number getKeyAsNumber() {
// this method is needed for scripted numeric aggregations
Expand Down Expand Up @@ -139,7 +134,7 @@ public SignificantStringTerms create(List<SignificantStringTerms.Bucket> buckets
@Override
public Bucket createBucket(InternalAggregations aggregations, SignificantStringTerms.Bucket prototype) {
return new Bucket(prototype.termBytes, prototype.subsetDf, prototype.subsetSize, prototype.supersetDf, prototype.supersetSize,
aggregations, prototype.format);
aggregations, prototype.format, prototype.score);
}

@Override
Expand All @@ -156,6 +151,6 @@ protected Bucket[] createBucketsArray(int size) {
@Override
Bucket createBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize,
InternalAggregations aggregations, SignificantStringTerms.Bucket prototype) {
return new Bucket(prototype.termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
return new Bucket(prototype.termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format, prototype.score);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
}

if (spare == null) {
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
}

bucketOrds.get(i, spare.termBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
}

if (spare == null) {
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
}

bucketOrds.get(i, spare.termBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testNonFinalReduceTopLevelPipelineAggs() {
List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms),
topLevelPipelineAggs));
InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false);
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContext);
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContext);
assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size());
assertEquals(1, reducedAggs.aggregations.size());
}
Expand All @@ -78,11 +78,11 @@ public void testFinalReduceTopLevelPipelineAggs() {
if (randomBoolean()) {
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms),
Collections.singletonList(siblingPipelineAggregator));
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), reduceContext);
} else {
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms),
Collections.singletonList(siblingPipelineAggregator));
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), reduceContext);
}
assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size());
assertEquals(2, reducedAggs.aggregations.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testReduceSame() throws IOException {
for (int i = 0; i < numSame; i++) {
toReduce.add(result);
}
InternalComposite finalReduce = (InternalComposite) result.reduce(toReduce,
InternalComposite finalReduce = (InternalComposite) result.doReduce(toReduce,
new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, true));
assertThat(finalReduce.getBuckets().size(), equalTo(result.getBuckets().size()));
Iterator<InternalComposite.InternalBucket> expectedIt = result.getBuckets().iterator();
Expand Down
Loading