Skip to content

Commit

Permalink
Use circuit breaker in InternalHistogram when adding empty buckets (o…
Browse files Browse the repository at this point in the history
…pensearch-project#14754) (opensearch-project#14844)

Signed-off-by: kkewwei <kkewwei@163.com>
  • Loading branch information
bowenlan-amzn authored and kkewwei committed Jul 24, 2024
1 parent 800fbef commit 88785f1
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Update help output for _cat ([#14722](https://github.com/opensearch-project/OpenSearch/pull/14722))
- Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches the index template ([#12891](https://github.com/opensearch-project/OpenSearch/pull/12891))
- Fix NPE in ReplicaShardAllocator ([#14385](https://github.com/opensearch-project/OpenSearch/pull/14385))
- Use circuit breaker in InternalHistogram when adding empty buckets ([#14754](https://github.com/opensearch-project/OpenSearch/pull/14754))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,25 +395,28 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
// fill with empty buckets
for (double key = round(emptyBucketInfo.minBound); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
reduceContext.consumeBucketsAndMaybeBreak(0);
}
} else {
Bucket first = list.get(iter.nextIndex());
if (Double.isFinite(emptyBucketInfo.minBound)) {
// fill with empty buckets until the first key
for (double key = round(emptyBucketInfo.minBound); key < first.key; key = nextKey(key)) {
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
reduceContext.consumeBucketsAndMaybeBreak(0);
}
}

// now adding the empty buckets within the actual data,
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6
// e.g. if the data series is [1,2,3,7] there are 3 empty buckets that will be created for 4,5,6
Bucket lastBucket = null;
do {
Bucket nextBucket = list.get(iter.nextIndex());
if (lastBucket != null) {
double key = nextKey(lastBucket.key);
while (key < nextBucket.key) {
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
reduceContext.consumeBucketsAndMaybeBreak(0);
key = nextKey(key);
}
assert key == nextBucket.key || Double.isNaN(nextBucket.key) : "key: " + key + ", nextBucket.key: " + nextBucket.key;
Expand All @@ -424,6 +427,7 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
// finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
for (double key = nextKey(lastBucket.key); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
reduceContext.consumeBucketsAndMaybeBreak(0);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@
package org.opensearch.search.aggregations.bucket.histogram;

import org.apache.lucene.tests.util.TestUtil;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.BucketOrder;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.ParsedMultiBucketAggregation;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.test.InternalAggregationTestCase;
import org.opensearch.test.InternalMultiBucketAggregationTestCase;

Expand All @@ -47,6 +52,8 @@
import java.util.Map;
import java.util.TreeMap;

import org.mockito.Mockito;

public class InternalHistogramTests extends InternalMultiBucketAggregationTestCase<InternalHistogram> {

private boolean keyed;
Expand Down Expand Up @@ -123,6 +130,42 @@ public void testHandlesNaN() {
);
}

public void testCircuitBreakerWhenAddEmptyBuckets() {
String name = randomAlphaOfLength(5);
double interval = 1;
double lowerBound = 1;
double upperBound = 1026;
List<InternalHistogram.Bucket> bucket1 = List.of(
new InternalHistogram.Bucket(lowerBound, 1, false, format, InternalAggregations.EMPTY)
);
List<InternalHistogram.Bucket> bucket2 = List.of(
new InternalHistogram.Bucket(upperBound, 1, false, format, InternalAggregations.EMPTY)
);
BucketOrder order = BucketOrder.key(true);
InternalHistogram.EmptyBucketInfo emptyBucketInfo = new InternalHistogram.EmptyBucketInfo(
interval,
0,
lowerBound,
upperBound,
InternalAggregations.EMPTY
);
InternalHistogram histogram1 = new InternalHistogram(name, bucket1, order, 0, emptyBucketInfo, format, false, null);
InternalHistogram histogram2 = new InternalHistogram(name, bucket2, order, 0, emptyBucketInfo, format, false, null);

CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class);
Mockito.when(breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets")).thenThrow(CircuitBreakingException.class);

MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(0, breaker);
InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction(
null,
null,
bucketConsumer,
PipelineAggregator.PipelineTree.EMPTY
);
expectThrows(CircuitBreakingException.class, () -> histogram1.reduce(List.of(histogram1, histogram2), reduceContext));
Mockito.verify(breaker, Mockito.times(1)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}

@Override
protected void assertReduced(InternalHistogram reduced, List<InternalHistogram> inputs) {
TreeMap<Double, Long> expectedCounts = new TreeMap<>();
Expand Down

0 comments on commit 88785f1

Please sign in to comment.