Skip to content

Commit

Permalink
Make MultiBucketConsumerService thread safe to use across slices duri…
Browse files Browse the repository at this point in the history
…ng search (opensearch-project#9047)

Signed-off-by: Neetika Singhal <neetiks@amazon.com>
Signed-off-by: Ivan Brusic <ivan.brusic@flocksafety.com>
  • Loading branch information
neetikasinghal authored and brusic committed Sep 25, 2023
1 parent df6d9a5 commit e352fdd
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 7 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807))
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))
- [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005))

- Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047))
### Deprecated

### Removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.common.settings.Setting;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.search.aggregations.bucket.BucketsAggregator;

import java.io.IOException;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntConsumer;

/**
Expand Down Expand Up @@ -127,13 +129,36 @@ public static class MultiBucketConsumer implements IntConsumer {
private final int limit;
private final CircuitBreaker breaker;

// aggregations execute in a single thread so no atomic here
// aggregations execute in a single thread for both sequential
// and concurrent search, so no atomic here
private int count;
private int callCount = 0;

// will be updated by multiple threads in concurrent search
// hence making it as LongAdder
private final LongAdder callCount;
private volatile boolean circuitBreakerTripped;
private final int availProcessors;

public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
this.limit = limit;
this.breaker = breaker;
callCount = new LongAdder();
availProcessors = Runtime.getRuntime().availableProcessors();
}

// only visible for testing
protected MultiBucketConsumer(
int limit,
CircuitBreaker breaker,
LongAdder callCount,
boolean circuitBreakerTripped,
int availProcessors
) {
this.limit = limit;
this.breaker = breaker;
this.callCount = callCount;
this.circuitBreakerTripped = circuitBreakerTripped;
this.availProcessors = availProcessors;
}

@Override
Expand All @@ -153,10 +178,27 @@ public void accept(int value) {
);
}
}
// check parent circuit breaker every 1024 calls
callCount++;
if ((callCount & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
callCount.increment();
// tripping the circuit breaker for other threads in case of concurrent search
// if the circuit breaker has tripped for one of the threads already, more info
// can be found on: https://github.com/opensearch-project/OpenSearch/issues/7785
if (circuitBreakerTripped) {
throw new CircuitBreakingException(
"Circuit breaker for this consumer has already been tripped by previous invocations. "
+ "This can happen in case of concurrent segment search when multiple threads are "
+ "executing the request and one of the thread has already tripped the circuit breaker",
breaker.getDurability()
);
}
// check parent circuit breaker every 1024 to (1024 + available processors) calls
long sum = callCount.sum();
if ((sum >= 1024) && (sum & 0x3FF) <= availProcessors) {
try {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
} catch (CircuitBreakingException e) {
circuitBreakerTripped = true;
throw e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.mockito.Mockito;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.atomic.LongAdder;

import static org.opensearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;

public class MultiBucketConsumerTests extends OpenSearchTestCase {

public void testMultiConsumerAcceptWhenCBTripped() {
CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class);
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
breaker,
new LongAdder(),
true,
1
);
// exception is thrown upfront since the circuit breaker has already tripped
expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(0));
Mockito.verify(breaker, Mockito.times(0)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}

public void testMultiConsumerAcceptToTripCB() {
CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class);
LongAdder callCount = new LongAdder();
for (int i = 0; i < 1024; i++) {
callCount.increment();
}
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
breaker,
callCount,
false,
2
);
// circuit breaker check is performed as the value of call count would be 1025 which is still in range
Mockito.when(breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets")).thenThrow(CircuitBreakingException.class);
expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(0));
Mockito.verify(breaker, Mockito.times(1)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}

public void testMultiConsumerAccept() {
CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class);
LongAdder callCount = new LongAdder();
for (int i = 0; i < 1100; i++) {
callCount.increment();
}
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
breaker,
callCount,
false,
1
);
// no exception is thrown as the call count value is not in the expected range and CB is not checked
multiBucketConsumer.accept(0);
Mockito.verify(breaker, Mockito.times(0)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}
}

0 comments on commit e352fdd

Please sign in to comment.