diff --git a/changelog/@unreleased/pr-1307.v2.yml b/changelog/@unreleased/pr-1307.v2.yml new file mode 100644 index 000000000..ecfe4fbfd --- /dev/null +++ b/changelog/@unreleased/pr-1307.v2.yml @@ -0,0 +1,5 @@ +type: improvement +improvement: + description: 'Use #compareAndSet when checking limits.' + links: + - https://github.com/palantir/dialogue/pull/1307 diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.java index ec616fcf5..41c4549f6 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.java @@ -16,6 +16,7 @@ package com.palantir.dialogue.core; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.FutureCallback; import com.palantir.dialogue.Response; @@ -69,17 +70,28 @@ final class CautiousIncreaseAggressiveDecreaseConcurrencyLimiter { * ignore/dropped/success depending on the success or failure state of the response. * */ Optional acquire() { - int currentInFlight = getInflight(); - - // We don't want to hand out a permit if there are 4 inflight and a limit of 4.1, as this will immediately send - // our inflight number to 5, which is clearly above the limit. Instead, we wait until there is capacity for - // one whole request before handing out a permit. In the worst-case scenario with zero inflight and a limit of - // 1, we'll still hand out a permit - if (currentInFlight <= getLimit() - 1) { - int inFlightSnapshot = inFlight.incrementAndGet(); - return Optional.of(new Permit(inFlightSnapshot)); + + // Capture the limit field reference once to avoid work in a tight loop. The JIT cannot + // reliably optimize out references to final fields due to the potential for reflective + // modification. + AtomicInteger localInFlight = inFlight; + + // We don't want to hand out a permit if there are 4 inflight and a limit of 4.1, as this will immediately + // send our inflight number to 5, which is clearly above the limit. + // Instead, we wait until there is capacity for one whole request before handing out a permit. + // In the worst-case scenario with zero inflight and a limit of 1, we'll still hand out a permit. + int currentLimit = (int) getLimit(); + while (true) { + int currentInFlight = localInFlight.get(); + if (currentInFlight >= currentLimit) { + return Optional.empty(); + } + + int newInFlight = currentInFlight + 1; + if (inFlight.compareAndSet(currentInFlight, newInFlight)) { + return Optional.of(new Permit(newInFlight)); + } } - return Optional.empty(); } enum Behavior { @@ -144,6 +156,11 @@ final class Permit implements PermitControl, FutureCallback { this.inFlightSnapshot = inFlightSnapshot; } + @VisibleForTesting + int inFlightSnapshot() { + return inFlightSnapshot; + } + @Override public void onSuccess(Response result) { behavior.onSuccess(result, this); diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiterTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiterTest.java index bf552f9be..a236ab120 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiterTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/CautiousIncreaseAggressiveDecreaseConcurrencyLimiterTest.java @@ -20,10 +20,23 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; import com.palantir.dialogue.Response; import com.palantir.dialogue.core.CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Behavior; +import com.palantir.dialogue.core.CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Permit; import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; import org.assertj.core.data.Percentage; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -37,7 +50,7 @@ private static CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter(Beha @ParameterizedTest @EnumSource(Behavior.class) - void acquire_returnsPermitssWhileInflightPermitLimitNotReached(Behavior behavior) { + void acquire_returnsPermitsWhileInflightPermitLimitNotReached(Behavior behavior) { CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(behavior); double max = limiter.getLimit(); Optional latestPermit = null; @@ -55,6 +68,26 @@ void acquire_returnsPermitssWhileInflightPermitLimitNotReached(Behavior behavior assertThat(limiter.acquire()).isPresent(); } + @ParameterizedTest + @EnumSource(Behavior.class) + void acquire_doesNotAcquirePartialPermits(Behavior behavior) { + CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(behavior); + + double max = limiter.getLimit(); + Optional latestPermit = null; + for (int i = 0; i < max; ++i) { + latestPermit = limiter.acquire(); + assertThat(latestPermit).isPresent(); + } + + latestPermit.get().success(); + assertThat(limiter.getLimit()).isEqualTo(20.05); + + // Now we can only acquire one extra permit, not 2 + assertThat(limiter.acquire()).isPresent(); + assertThat(limiter.acquire()).isEmpty(); + } + @ParameterizedTest @EnumSource(Behavior.class) public void ignore_releasesPermit(Behavior behavior) { @@ -211,4 +244,51 @@ public void onFailure_ignoresForNonIoExceptions(Behavior behavior) { limiter.acquire().get().onFailure(exception); assertThat(limiter.getLimit()).isEqualTo(max); } + + @Test + public void acquire_doesNotReleaseMorePermitsThanLimit() throws ExecutionException, InterruptedException { + CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL); + + int max = (int) limiter.getLimit(); + Optional latestPermit; + for (int i = 0; i < max - 1; ++i) { + latestPermit = limiter.acquire(); + assertThat(latestPermit).isPresent(); + } + + latestPermit = limiter.acquire(); + assertThat(latestPermit).isPresent(); + assertThat(limiter.acquire()).isEmpty(); + latestPermit.get().ignore(); + + // Now let's have some threads fight for that last remaining permit. + int numTasks = 8; + int numIterations = 10_000; + CountDownLatch latch = new CountDownLatch(numTasks); + ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numTasks)); + List> futures = new ArrayList<>(); + try { + IntStream.range(0, numTasks).forEach(_ignore -> { + futures.add(executor.submit(() -> { + latch.countDown(); + Uninterruptibles.awaitUninterruptibly(latch); + + for (int i = 0; i < numIterations; i++) { + Optional acquire = limiter.acquire(); + if (acquire.isPresent()) { + assertThat(acquire.get().inFlightSnapshot()).isLessThanOrEqualTo(max); + acquire.get().ignore(); + } + } + })); + }); + + Futures.whenAllSucceed(futures) + .run(() -> {}, MoreExecutors.directExecutor()) + .get(); + } finally { + assertThat(MoreExecutors.shutdownAndAwaitTermination(executor, Duration.ofSeconds(5))) + .isTrue(); + } + } }