Skip to content

Commit

Permalink
Speed up reducing auto_date_histo with a time zone (#57933)
Browse files Browse the repository at this point in the history
When reducing `auto_date_histogram` we were using `Rounding#round`
which is quite a bit more expensive than
```
Rounding.Prepared prepared = rounding.prepare(min, max);
long result = prepared.round(date);
```
when rounding to a non-fixed time zone like `America/New_York`. This
stops using the former and starts using the latter.

Relates to #56124
  • Loading branch information
nik9000 authored Jun 10, 2020
1 parent a791d67 commit 992ab28
Showing 1 changed file with 84 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,17 @@ private BucketReduceResult reduceBuckets(List<InternalAggregation> aggregations,
// First we need to find the highest level rounding used across all the
// shards
int reduceRoundingIdx = 0;
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
for (InternalAggregation aggregation : aggregations) {
int aggRoundingIdx = ((InternalAutoDateHistogram) aggregation).bucketInfo.roundingIdx;
if (aggRoundingIdx > reduceRoundingIdx) {
reduceRoundingIdx = aggRoundingIdx;
InternalAutoDateHistogram agg = ((InternalAutoDateHistogram) aggregation);
reduceRoundingIdx = Math.max(agg.bucketInfo.roundingIdx, reduceRoundingIdx);
if (false == agg.buckets.isEmpty()) {
min = Math.min(min, agg.buckets.get(0).key);
max = Math.max(max, agg.buckets.get(agg.buckets.size() - 1).key);
}
}
// This rounding will be used to reduce all the buckets
RoundingInfo reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx];
Rounding reduceRounding = reduceRoundingInfo.rounding;
Rounding.Prepared reduceRounding = prepare(reduceRoundingIdx, min, max);

final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
@Override
Expand Down Expand Up @@ -351,21 +353,33 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
}
}

return mergeBucketsIfNeeded(reducedBuckets, reduceRoundingIdx, reduceRoundingInfo, reduceContext);
return mergeBucketsIfNeeded(
new BucketReduceResult(reducedBuckets, reduceRoundingIdx, 1, reduceRounding, min, max),
reduceContext
);
}

private BucketReduceResult mergeBucketsIfNeeded(List<Bucket> reducedBuckets, int reduceRoundingIdx, RoundingInfo reduceRoundingInfo,
ReduceContext reduceContext) {
while (reducedBuckets.size() > (targetBuckets * reduceRoundingInfo.getMaximumInnerInterval())
&& reduceRoundingIdx < bucketInfo.roundingInfos.length - 1) {
reduceRoundingIdx++;
reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx];
reducedBuckets = mergeBuckets(reducedBuckets, reduceRoundingInfo.rounding, reduceContext);
}
return new BucketReduceResult(reducedBuckets, reduceRoundingInfo, reduceRoundingIdx, 1);
private BucketReduceResult mergeBucketsIfNeeded(BucketReduceResult firstPassResult, ReduceContext reduceContext) {
int idx = firstPassResult.roundingIdx;
RoundingInfo info = bucketInfo.roundingInfos[idx];
List<Bucket> buckets = firstPassResult.buckets;
Rounding.Prepared prepared = firstPassResult.preparedRounding;
while (buckets.size() > (targetBuckets * info.getMaximumInnerInterval())
&& idx < bucketInfo.roundingInfos.length - 1) {
idx++;
info = bucketInfo.roundingInfos[idx];
prepared = prepare(idx, firstPassResult.min, firstPassResult.max);
buckets = mergeBuckets(buckets, prepared, reduceContext);
}
return new BucketReduceResult(buckets, idx, 1, prepared, firstPassResult.min, firstPassResult.max);
}

private Rounding.Prepared prepare(int idx, long min, long max) {
Rounding rounding = bucketInfo.roundingInfos[idx].rounding;
return min <= max ? rounding.prepare(min, max) : rounding.prepareForUnknown();
}

private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRounding, ReduceContext reduceContext) {
private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding.Prepared reduceRounding, ReduceContext reduceContext) {
List<Bucket> mergedBuckets = new ArrayList<>();

List<Bucket> sameKeyedBuckets = new ArrayList<>();
Expand Down Expand Up @@ -405,35 +419,51 @@ protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
}

private static class BucketReduceResult {
List<Bucket> buckets;
RoundingInfo roundingInfo;
int roundingIdx;
long innerInterval;

BucketReduceResult(List<Bucket> buckets, RoundingInfo roundingInfo, int roundingIdx, long innerInterval) {
final List<Bucket> buckets;
final int roundingIdx;
final long innerInterval;
final Rounding.Prepared preparedRounding;
final long min;
final long max;

BucketReduceResult(
List<Bucket> buckets,
int roundingIdx,
long innerInterval,
Rounding.Prepared preparedRounding,
long min,
long max
) {
this.buckets = buckets;
this.roundingInfo = roundingInfo;
this.roundingIdx = roundingIdx;
this.innerInterval = innerInterval;
this.preparedRounding = preparedRounding;
this.min = min;
this.max = max;
}
}

private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, ReduceContext reduceContext) {
List<Bucket> list = currentResult.buckets;
private BucketReduceResult addEmptyBuckets(BucketReduceResult current, ReduceContext reduceContext) {
List<Bucket> list = current.buckets;
if (list.isEmpty()) {
return currentResult;
}
int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, currentResult.roundingIdx,
bucketInfo.roundingInfos, targetBuckets);
RoundingInfo roundingInfo = bucketInfo.roundingInfos[roundingIdx];
Rounding rounding = roundingInfo.rounding;
return current;
}
int roundingIdx = getAppropriateRounding(
list.get(0).key,
list.get(list.size() - 1).key,
current.roundingIdx,
bucketInfo.roundingInfos,
targetBuckets
);
Rounding.Prepared rounding = current.roundingIdx == roundingIdx
? current.preparedRounding
: prepare(roundingIdx, current.min, current.max);
// merge buckets using the new rounding
list = mergeBuckets(list, rounding, reduceContext);

Bucket lastBucket = null;
ListIterator<Bucket> iter = list.listIterator();
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(Collections.singletonList(bucketInfo.emptySubAggregations),
reduceContext);
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(bucketInfo.emptySubAggregations), reduceContext);

// Add the empty buckets within the data,
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6
Expand All @@ -449,7 +479,7 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red
}
lastBucket = iter.next();
}
return new BucketReduceResult(list, roundingInfo, roundingIdx, currentResult.innerInterval);
return new BucketReduceResult(list, roundingIdx, 1, rounding, current.min, current.max);
}

static int getAppropriateRounding(long minKey, long maxKey, int roundingIdx,
Expand Down Expand Up @@ -501,8 +531,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext);

// Adding empty buckets may have tipped us over the target so merge the buckets again if needed
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx,
reducedBucketsResult.roundingInfo, reduceContext);
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult, reduceContext);

// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
Expand All @@ -515,44 +544,48 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
getMetadata(), reducedBucketsResult.innerInterval);
}

private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult reducedBucketsResult,
ReduceContext reduceContext) {
List<Bucket> buckets = reducedBucketsResult.buckets;
RoundingInfo roundingInfo = reducedBucketsResult.roundingInfo;
int roundingIdx = reducedBucketsResult.roundingIdx;
private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult current, ReduceContext reduceContext) {
List<Bucket> buckets = current.buckets;
RoundingInfo roundingInfo = bucketInfo.roundingInfos[current.roundingIdx];
if (buckets.size() > targetBuckets) {
for (int interval : roundingInfo.innerIntervals) {
int resultingBuckets = buckets.size() / interval;
if (buckets.size() % interval != 0) {
resultingBuckets++;
}
if (resultingBuckets <= targetBuckets) {
return mergeConsecutiveBuckets(buckets, interval, roundingIdx, roundingInfo, reduceContext);
return mergeConsecutiveBuckets(current, interval, reduceContext);
}
}
}
return reducedBucketsResult;
return current;
}

private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets, int mergeInterval, int roundingIdx,
RoundingInfo roundingInfo, ReduceContext reduceContext) {
private BucketReduceResult mergeConsecutiveBuckets(BucketReduceResult current, int mergeInterval, ReduceContext reduceContext) {
List<Bucket> mergedBuckets = new ArrayList<>();
List<Bucket> sameKeyedBuckets = new ArrayList<>();

double key = roundingInfo.rounding.round(reducedBuckets.get(0).key);
for (int i = 0; i < reducedBuckets.size(); i++) {
Bucket bucket = reducedBuckets.get(i);
double key = current.preparedRounding.round(current.buckets.get(0).key);
for (int i = 0; i < current.buckets.size(); i++) {
Bucket bucket = current.buckets.get(i);
if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) {
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
sameKeyedBuckets.clear();
key = roundingInfo.rounding.round(bucket.key);
key = current.preparedRounding.round(bucket.key);
}
sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations));
}
if (sameKeyedBuckets.isEmpty() == false) {
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
}
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
return new BucketReduceResult(
mergedBuckets,
current.roundingIdx,
mergeInterval,
current.preparedRounding,
current.min,
current.max
);
}

@Override
Expand Down

0 comments on commit 992ab28

Please sign in to comment.