Skip to content

Commit

Permalink
Different accept methods to make MultiBucketConsumer thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
neetikasinghal committed Jul 27, 2023
1 parent 9507e6d commit 5153c45
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.common.settings.Setting;
Expand All @@ -42,6 +43,8 @@
import org.opensearch.search.aggregations.bucket.BucketsAggregator;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntConsumer;

/**
Expand Down Expand Up @@ -130,6 +133,9 @@ public static class MultiBucketConsumer implements IntConsumer {
// aggregations execute in a single thread so no atomic here
private int count;
private int callCount = 0;
private LongAdder callCount2 = new LongAdder();
private boolean circuitBreakerTripped;
private AtomicBoolean circuitBreakerTripped3 = new AtomicBoolean(false);

public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
this.limit = limit;
Expand Down Expand Up @@ -160,6 +166,105 @@ public void accept(int value) {
}
}

/*
* Has a synchronized block in the method, which makes call count as thread safe
* and essentially trips the circuit breaker for the other threads if one of the threads has
* already tripped
* */
public void accept1(int value) {
if (value != 0) {
count += value;
if (count > limit) {
throw new TooManyBucketsException(
"Trying to create too many buckets. Must be less than or equal to: ["
+ limit
+ "] but was ["
+ count
+ "]. This limit can be set by changing the ["
+ MAX_BUCKET_SETTING.getKey()
+ "] cluster level setting.",
limit
);
}
}
synchronized (this) {
callCount++;
// check parent circuit breaker every 1024 calls
if ((callCount & 0x3) == 0 || circuitBreakerTripped) {
try {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
} catch (CircuitBreakingException e) {
circuitBreakerTripped = true;
throw e;
}
}
}
}

/*
* Makes callCount as a LongAdder variable making it thread safe but this doesn't
* trip the CB on the other threads if one of the thread already has a circuit breaker
* */
public void accept2(int value) {
if (value != 0) {
count += value;
if (count > limit) {
throw new TooManyBucketsException(
"Trying to create too many buckets. Must be less than or equal to: ["
+ limit
+ "] but was ["
+ count
+ "]. This limit can be set by changing the ["
+ MAX_BUCKET_SETTING.getKey()
+ "] cluster level setting.",
limit
);
}
}
callCount2.increment();
// check parent circuit breaker every 1024 calls
if ((callCount2.sum() & 0x3) == 0) {

This comment has been minimized.

Copy link
@sohami

sohami Jul 28, 2023

Collaborator

this will not work with LongAdder as sum is eventual sum and may never hit 1024 value, it can be 1023 and then jump to 1025 before this check happens.

breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}
}
/*
* Makes callCount as a LongAdder variable making it thread safe and also has an additional
* Atomic variable that tries to trip the CB for the other threads in case for one of the threads CB has already
* tripped. This doesn't guarantee that all the threads would essentially trip CB after the first thread tripped
* as there can be thread that could come for a check in the interval between circuitBreakerTripped3 variable is set
* vs circuitBreakerTripped3 variable is tried on get
* */
public void accept3(int value) {
if (value != 0) {
count += value;
if (count > limit) {
throw new TooManyBucketsException(
"Trying to create too many buckets. Must be less than or equal to: ["
+ limit
+ "] but was ["
+ count
+ "]. This limit can be set by changing the ["
+ MAX_BUCKET_SETTING.getKey()
+ "] cluster level setting.",
limit
);
}
}
callCount2.increment();
if(circuitBreakerTripped3.get()) {
throw new CircuitBreakingException("test", CircuitBreaker.Durability.PERMANENT);
}
// check parent circuit breaker every 1024 calls
if ((callCount2.sum() & 0x3) == 0) {

This comment has been minimized.

Copy link
@sohami

sohami Jul 28, 2023

Collaborator

same issue as above for LongAdder based implementation

try {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
} catch (CircuitBreakingException e) {
circuitBreakerTripped3.set(true);
throw e;
}
}
}

public void reset() {
this.count = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.search.aggregations.bucket;

import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
Expand All @@ -41,6 +43,7 @@
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -51,9 +54,15 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;

@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
public class BucketsAggregatorTests extends AggregatorTestCase {

public BucketsAggregator buildMergeAggregator() throws IOException {
Expand Down Expand Up @@ -139,4 +148,88 @@ public void testBucketMergeAndDelete() throws IOException {
assertEquals(mergeAggregator.getDocCounts().get(i), i == 5 ? sum : 0);
}
}

@Repeat(iterations = 1000, useConstantSeed = true)

This comment has been minimized.

Copy link
@sohami

sohami Jul 28, 2023

Collaborator

we should run the call to accept method in loop as well. That way you will see the contention between threads per testRun. Then probably take the mean of all the test run times.

public void testMultiConsumerAcceptOne() throws Exception {
CircuitBreaker breaker = mock(CircuitBreaker.class);
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
breaker
);

when(breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets")).thenThrow(CircuitBreakingException.class);
int numberOfThreads = 5;
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(numberOfThreads);
long start = System.currentTimeMillis();
for (int i = 0; i < numberOfThreads; i++) {
service.submit(() -> {
try {
multiBucketConsumer.accept1(0);
} catch (CircuitBreakingException e) {
System.out.println("CircuitBreaker was thrown");
}
latch.countDown();
});
}
long end = System.currentTimeMillis();

This comment has been minimized.

Copy link
@sohami

sohami Jul 28, 2023

Collaborator

this will get printed even before the threads have completed the execution since test thread will move on after submitting threads in the executor service.

latch.await();
System.out.println("DEBUG: accept1 took " + (end - start) + " MilliSeconds");
}

@Repeat(iterations = 1000, useConstantSeed = true)
public void testMultiConsumerAcceptFunctionTwo() throws InterruptedException {
CircuitBreaker breaker = mock(CircuitBreaker.class);
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
breaker
);

when(breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets")).thenThrow(CircuitBreakingException.class);
int numberOfThreads = 5;
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(numberOfThreads);
long start = System.currentTimeMillis();
for (int i = 0; i < numberOfThreads; i++) {
service.submit(() -> {
try {
multiBucketConsumer.accept2(0);
} catch (CircuitBreakingException e) {
System.out.println("CircuitBreaker was thrown");
}
latch.countDown();
});
}
long end = System.currentTimeMillis();
latch.await();
System.out.println("DEBUG: accept2 took " + (end - start) + " MilliSeconds");
}

@Repeat(iterations = 1000, useConstantSeed = true)
public void testMultiConsumerAcceptFunctionThree() throws InterruptedException {
CircuitBreaker breaker = mock(CircuitBreaker.class);
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
breaker
);

when(breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets")).thenThrow(CircuitBreakingException.class);
int numberOfThreads = 5;
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(numberOfThreads);
long start = System.currentTimeMillis();
for (int i = 0; i < numberOfThreads; i++) {
service.submit(() -> {
try {
multiBucketConsumer.accept3(0);
} catch (CircuitBreakingException e) {
System.out.println("CircuitBreaker was thrown");
}
latch.countDown();
});
}
long end = System.currentTimeMillis();
latch.await();
System.out.println("DEBUG: accept3 took " + (end - start) + " MilliSeconds");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ protected Settings featureFlagSettings() {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
featureSettings.put(FeatureFlags.TELEMETRY_SETTING.getKey(), true);
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true);
return featureSettings.build();
}

Expand Down

0 comments on commit 5153c45

Please sign in to comment.