Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency limiter uses #compareAndSet. #1307

Merged
merged 7 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-1307.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: 'Use #compareAndSet when checking limits.'
links:
- https://github.com/palantir/dialogue/pull/1307
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.palantir.dialogue.core;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.FutureCallback;
import com.palantir.dialogue.Response;
Expand Down Expand Up @@ -69,17 +70,28 @@ final class CautiousIncreaseAggressiveDecreaseConcurrencyLimiter {
* ignore/dropped/success depending on the success or failure state of the response.
* */
Optional<Permit> acquire() {
int currentInFlight = getInflight();

// We don't want to hand out a permit if there are 4 inflight and a limit of 4.1, as this will immediately send
// our inflight number to 5, which is clearly above the limit. Instead, we wait until there is capacity for
// one whole request before handing out a permit. In the worst-case scenario with zero inflight and a limit of
// 1, we'll still hand out a permit
if (currentInFlight <= getLimit() - 1) {
int inFlightSnapshot = inFlight.incrementAndGet();
return Optional.of(new Permit(inFlightSnapshot));

// Capture the limit field reference once to avoid work in a tight loop. The JIT cannot
// reliably optimize out references to final fields due to the potential for reflective
// modification.
AtomicInteger localInFlight = inFlight;

// We don't want to hand out a permit if there are 4 inflight and a limit of 4.1, as this will immediately
// send our inflight number to 5, which is clearly above the limit.
// Instead, we wait until there is capacity for one whole request before handing out a permit.
// In the worst-case scenario with zero inflight and a limit of 1, we'll still hand out a permit.
int currentLimit = (int) getLimit();
while (true) {
int currentInFlight = localInFlight.get();
if (currentInFlight >= currentLimit) {
return Optional.empty();
}

int newInFlight = currentInFlight + 1;
if (inFlight.compareAndSet(currentInFlight, newInFlight)) {
return Optional.of(new Permit(newInFlight));
}
}
return Optional.empty();
}

enum Behavior {
Expand Down Expand Up @@ -144,6 +156,11 @@ final class Permit implements PermitControl, FutureCallback<Response> {
this.inFlightSnapshot = inFlightSnapshot;
}

@VisibleForTesting
int inFlightSnapshot() {
return inFlightSnapshot;
}

@Override
public void onSuccess(Response result) {
behavior.onSuccess(result, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,23 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.palantir.dialogue.Response;
import com.palantir.dialogue.core.CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Behavior;
import com.palantir.dialogue.core.CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Permit;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import org.assertj.core.data.Percentage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -37,7 +50,7 @@ private static CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter(Beha

@ParameterizedTest
@EnumSource(Behavior.class)
void acquire_returnsPermitssWhileInflightPermitLimitNotReached(Behavior behavior) {
void acquire_returnsPermitsWhileInflightPermitLimitNotReached(Behavior behavior) {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(behavior);
double max = limiter.getLimit();
Optional<CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Permit> latestPermit = null;
Expand All @@ -55,6 +68,26 @@ void acquire_returnsPermitssWhileInflightPermitLimitNotReached(Behavior behavior
assertThat(limiter.acquire()).isPresent();
}

@ParameterizedTest
@EnumSource(Behavior.class)
void acquire_doesNotAcquirePartialPermits(Behavior behavior) {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(behavior);

double max = limiter.getLimit();
Optional<CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Permit> latestPermit = null;
for (int i = 0; i < max; ++i) {
latestPermit = limiter.acquire();
assertThat(latestPermit).isPresent();
}

latestPermit.get().success();
assertThat(limiter.getLimit()).isEqualTo(20.05);

// Now we can only acquire one extra permit, not 2
assertThat(limiter.acquire()).isPresent();
assertThat(limiter.acquire()).isEmpty();
}

@ParameterizedTest
@EnumSource(Behavior.class)
public void ignore_releasesPermit(Behavior behavior) {
Expand Down Expand Up @@ -211,4 +244,51 @@ public void onFailure_ignoresForNonIoExceptions(Behavior behavior) {
limiter.acquire().get().onFailure(exception);
assertThat(limiter.getLimit()).isEqualTo(max);
}

@Test
public void acquire_doesNotReleaseMorePermitsThanLimit() throws ExecutionException, InterruptedException {
jkozlowski marked this conversation as resolved.
Show resolved Hide resolved
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);

int max = (int) limiter.getLimit();
Optional<CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Permit> latestPermit;
for (int i = 0; i < max - 1; ++i) {
latestPermit = limiter.acquire();
assertThat(latestPermit).isPresent();
}

latestPermit = limiter.acquire();
assertThat(latestPermit).isPresent();
assertThat(limiter.acquire()).isEmpty();
latestPermit.get().ignore();

// Now let's have some threads fight for that last remaining permit.
int numTasks = 8;
int numIterations = 10_000;
CountDownLatch latch = new CountDownLatch(numTasks);
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numTasks));
List<ListenableFuture<?>> futures = new ArrayList<>();
try {
IntStream.range(0, numTasks).forEach(_ignore -> {
futures.add(executor.submit(() -> {
latch.countDown();
Uninterruptibles.awaitUninterruptibly(latch);

for (int i = 0; i < numIterations; i++) {
Optional<Permit> acquire = limiter.acquire();
if (acquire.isPresent()) {
assertThat(acquire.get().inFlightSnapshot()).isLessThanOrEqualTo(max);
acquire.get().ignore();
}
}
}));
});

Futures.whenAllSucceed(futures)
.run(() -> {}, MoreExecutors.directExecutor())
.get();
} finally {
assertThat(MoreExecutors.shutdownAndAwaitTermination(executor, Duration.ofSeconds(5)))
.isTrue();
}
}
}