Skip to content

Commit

Permalink
Enforce requirements around limit enforcement
Browse files Browse the repository at this point in the history
  • Loading branch information
carterkozak committed Nov 15, 2024
1 parent 5f0e766 commit b6e5c14
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.palantir.dialogue.futures.DialogueFutures;
import com.palantir.logsafe.Safe;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
Expand Down Expand Up @@ -156,8 +157,9 @@ Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request req
// Optimistically avoid the queue in the fast path.
// Queuing adds contention between threads and should be avoided unless we need to shed load.
if (queueSizeEstimate.get() <= 0) {
LimitEnforcement limitEnforcement = limitEnforcement();
Optional<ListenableFuture<Response>> maybeResult =
delegate.maybeExecute(endpoint, request, limitEnforcement());
delegate.maybeExecute(endpoint, request, limitEnforcement);
if (maybeResult.isPresent()) {
inFlight.incrementAndGet();
ListenableFuture<Response> result = maybeResult.get();
Expand All @@ -167,6 +169,8 @@ Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request req
queuedTime.update(0, TimeUnit.NANOSECONDS);
}
return maybeResult;
} else if (!limitEnforcement.enforceLimits()) {
return Optional.of(Futures.immediateFailedFuture(limitEnforcementExpectationFailure(endpoint)));
}
}

Expand Down Expand Up @@ -261,8 +265,9 @@ private boolean scheduleNextTask() {
}
try (CloseableSpan ignored = queueHead.span().attach()) {
Endpoint endpoint = queueHead.endpoint();
LimitEnforcement limitEnforcement = limitEnforcement();
Optional<ListenableFuture<Response>> maybeResponse =
delegate.maybeExecute(endpoint, queueHead.request(), limitEnforcement());
delegate.maybeExecute(endpoint, queueHead.request(), limitEnforcement);

if (maybeResponse.isPresent()) {
inFlight.incrementAndGet();
Expand All @@ -287,6 +292,17 @@ private boolean scheduleNextTask() {
}
});
return true;
} else if (!limitEnforcement.enforceLimits()) {
decrementQueueSize();
queueHead.span().complete(QueuedChannelTagTranslator.INSTANCE, this);
queueHead.timer().stop();
queuedResponse.setException(limitEnforcementExpectationFailure(queueHead.endpoint()));
log.warn(
"Failed to make a request bypassing concurrency limits, which should not be possible",
SafeArg.of("channel", channelName),
SafeArg.of("service", endpoint.serviceName()),
SafeArg.of("endpoint", endpoint.endpointName()));
return true;
} else {
if (!queuedCalls.offerFirst(queueHead)) {
// Should never happen, ConcurrentLinkedDeque has no maximum size
Expand Down Expand Up @@ -328,6 +344,16 @@ private LimitEnforcement limitEnforcement() {
return inFlight.get() <= 0 ? LimitEnforcement.DANGEROUS_BYPASS_LIMITS : LimitEnforcement.DEFAULT_ENABLED;
}

private SafeIllegalStateException limitEnforcementExpectationFailure(Endpoint endpoint) {
return new SafeIllegalStateException(
"A request which explicitly bypassed rate limits failed to execute, which "
+ "violates the requirements of the QueuedChannel. Please report this to "
+ "the Dialogue maintainers!",
SafeArg.of("channel", channelName),
SafeArg.of("service", endpoint.serviceName()),
SafeArg.of("endpoint", endpoint.endpointName()));
}

@Override
public String toString() {
return "QueuedChannel{queueSizeEstimate="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.palantir.dialogue.TestResponse;
import com.palantir.dialogue.core.QueuedChannel.QueuedChannelInstrumentation;
import com.palantir.dialogue.futures.DialogueFutures;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.tracing.TestTracing;
import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry;
import java.time.Duration;
Expand Down Expand Up @@ -482,4 +483,69 @@ public void testQueuedResponseAvoidsExecutingCancelled() {
.as("The queued future is cancelled, and shouldn't be re-submitted")
.hasSize(3);
}

@Test
public void testInitialRequestIsIllegallyLimited_initialRequest() {
// This LimitedChannel ignores the LimitEnforcement parameter, which is not allowed
LimitedChannel delegateChannel = (_endpoint, _request, _limitEnforcement) -> Optional.empty();
QueuedChannel queued = createQueue(delegateChannel);
ListenableFuture<Response> response =
queued.maybeExecute(TestEndpoint.GET, Request.builder().build()).get();
assertThat(response)
.failsWithin(Duration.ZERO)
.withThrowableThat()
.havingRootCause()
.isInstanceOf(SafeIllegalStateException.class)
.withMessageContaining("A request which explicitly bypassed rate limits failed to execute");
}

@Test
public void testInitialRequestIsIllegallyLimited_queuedRequest() {
List<Optional<SettableFuture<Response>>> settableResponses = new CopyOnWriteArrayList<>();
AtomicBoolean ignoreLimitEnforcement = new AtomicBoolean(false);
LimitedChannel delegateChannel = (_endpoint, _request, limitEnforcement) -> {
Optional<SettableFuture<Response>> result = Optional.empty();
if (!ignoreLimitEnforcement.get() && !limitEnforcement.enforceLimits()) {
result = Optional.of(SettableFuture.create());
}
settableResponses.add(result);
return result.map(item -> item);
};
QueuedChannelInstrumentation instrumentation = QueuedChannel.channelInstrumentation(
DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()), "channel");
QueuedChannel queued = new QueuedChannel(delegateChannel, "channel", "queue-type", instrumentation, 100_000);

assertThat(settableResponses).isEmpty();
assertThat(instrumentation.requestsQueued().getCount()).isZero();

// Initial request is expected to be allowed in all cases, to allow the queue to be processed
assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build()))
.hasValueSatisfying(item -> assertThat(item).isNotDone());
assertThat(settableResponses).hasSize(1);
assertThat(settableResponses.get(0))
.hasValueSatisfying(item -> assertThat(item).isNotDone());

assertThat(instrumentation.requestsQueued().getCount()).isZero();

// Now that we have a request in flight, we can queue one:
Optional<ListenableFuture<Response>> queuedResponse =
queued.maybeExecute(TestEndpoint.GET, Request.builder().build());
assertThat(queuedResponse).hasValueSatisfying(item -> assertThat(item).isNotDone());
assertThat(instrumentation.requestsQueued().getCount()).isOne();

assertThat(settableResponses).hasSize(3);

ignoreLimitEnforcement.set(true);
// Complete the ongoing request, allowing the queued request to be processed
settableResponses.get(0).get().set(new TestResponse().code(200));

assertThat(queuedResponse).hasValueSatisfying(item -> assertThat(item)
.failsWithin(Duration.ZERO)
.withThrowableThat()
.havingRootCause()
.isInstanceOf(SafeIllegalStateException.class)
.withMessageContaining("A request which explicitly bypassed rate limits failed to execute"));

assertThat(instrumentation.requestsQueued().getCount()).isZero();
}
}

0 comments on commit b6e5c14

Please sign in to comment.