Skip to content

Commit

Permalink
QueuedChannel enforces its assumption around at least one in-flight r…
Browse files Browse the repository at this point in the history
…equest (#2422)

QueuedChannel enforces its assumption around at least one in-flight request
  • Loading branch information
carterkozak authored Nov 18, 2024
1 parent 52431f6 commit 7074d6e
Show file tree
Hide file tree
Showing 21 changed files with 638 additions and 343 deletions.
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-2422.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: improvement
improvement:
description: QueuedChannel enforces its assumption around at least one in-flight
request
links:
- https://github.com/palantir/dialogue/pull/2422
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.palantir.dialogue.futures.DialogueFutures;
import com.palantir.logsafe.Safe;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
Expand Down Expand Up @@ -65,7 +66,6 @@
*/
final class QueuedChannel implements Channel {
private static final SafeLogger log = SafeLoggerFactory.get(QueuedChannel.class);
private static final LimitEnforcement DO_NOT_SKIP_LIMITS = LimitEnforcement.DEFAULT_ENABLED;

private final Deque<DeferredCall> queuedCalls;
private final NeverThrowLimitedChannel delegate;
Expand All @@ -75,7 +75,10 @@ final class QueuedChannel implements Channel {

@Safe
private final String queueType;
// Tracks requests that are current executing in delegate and are not tracked in queuedCalls

// Inexpensive tracker for queuedCalls.size(), due to the high cost of
// ConcurrentLinkedDeque.size(). Our ProtectedConcurrentLinkedDeque subtype
// makes size() throw.
private final AtomicInteger queueSizeEstimate = new AtomicInteger(0);
private final int maxQueueSize;
private final Supplier<Counter> queueSizeCounter;
Expand All @@ -85,6 +88,9 @@ final class QueuedChannel implements Channel {
// avoid creating unnecessary data.
private volatile boolean shouldRecordQueueMetrics;

// Tracks requests that are current executing in delegate and are not tracked in queuedCalls
private final AtomicInteger inFlight = new AtomicInteger();

QueuedChannel(
LimitedChannel delegate,
@Safe String channelName,
Expand Down Expand Up @@ -151,16 +157,20 @@ Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request req
// Optimistically avoid the queue in the fast path.
// Queuing adds contention between threads and should be avoided unless we need to shed load.
if (queueSizeEstimate.get() <= 0) {
LimitEnforcement limitEnforcement = limitEnforcement();
Optional<ListenableFuture<Response>> maybeResult =
delegate.maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS);
delegate.maybeExecute(endpoint, request, limitEnforcement);
if (maybeResult.isPresent()) {
inFlight.incrementAndGet();
ListenableFuture<Response> result = maybeResult.get();
DialogueFutures.addDirectListener(result, this::onCompletion);
// While the queue was avoid, this is equivalent to spending zero time on the queue.
if (shouldRecordQueueMetrics) {
queuedTime.update(0, TimeUnit.NANOSECONDS);
}
return maybeResult;
} else if (!limitEnforcement.enforceLimits()) {
return Optional.of(Futures.immediateFailedFuture(limitEnforcementExpectationFailure(endpoint)));
}
}

Expand Down Expand Up @@ -199,6 +209,9 @@ Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request req
}

private void onCompletion() {
// decrementing inflight must occur prior to calling schedule, ensuring that
// schedule may be called after inflight is returned to zero.
inFlight.decrementAndGet();
schedule();
}

Expand Down Expand Up @@ -240,22 +253,28 @@ private boolean scheduleNextTask() {
if (queueHead == null) {
return false;
}
SettableFuture<Response> queuedResponse = queueHead.response();
// If the future has been completed (most likely via cancel) the call should not be queued.
// There's a race where cancel may be invoked between this check and execution, but the scheduled
// request will be quickly cancelled in that case.
if (queuedResponse.isDone()) {
if (queueHead.response().isDone()) {
decrementQueueSize();
queueHead.span().complete(QueuedChannelTagTranslator.INSTANCE, this);
queueHead.timer().stop();
return true;
}
return scheduleTaskFromQueue(queueHead);
}

private boolean scheduleTaskFromQueue(DeferredCall queueHead) {
SettableFuture<Response> queuedResponse = queueHead.response();
try (CloseableSpan ignored = queueHead.span().attach()) {
Endpoint endpoint = queueHead.endpoint();
LimitEnforcement limitEnforcement = limitEnforcement();
Optional<ListenableFuture<Response>> maybeResponse =
delegate.maybeExecute(endpoint, queueHead.request(), DO_NOT_SKIP_LIMITS);
delegate.maybeExecute(endpoint, queueHead.request(), limitEnforcement);

if (maybeResponse.isPresent()) {
inFlight.incrementAndGet();
decrementQueueSize();
ListenableFuture<Response> response = maybeResponse.get();
queueHead.span().complete(QueuedChannelTagTranslator.INSTANCE, this);
Expand All @@ -277,6 +296,17 @@ private boolean scheduleNextTask() {
}
});
return true;
} else if (!limitEnforcement.enforceLimits()) {
decrementQueueSize();
queueHead.span().complete(QueuedChannelTagTranslator.INSTANCE, this);
queueHead.timer().stop();
queuedResponse.setException(limitEnforcementExpectationFailure(queueHead.endpoint()));
log.warn(
"Failed to make a request bypassing concurrency limits, which should not be possible",
SafeArg.of("channel", channelName),
SafeArg.of("service", endpoint.serviceName()),
SafeArg.of("endpoint", endpoint.endpointName()));
return true;
} else {
if (!queuedCalls.offerFirst(queueHead)) {
// Should never happen, ConcurrentLinkedDeque has no maximum size
Expand Down Expand Up @@ -306,6 +336,28 @@ private boolean scheduleNextTask() {
}
}

/**
* This queue implementation requires at least one request to be executable at a time regardless of the underlying
* limiter design, because triggering the next schedule attempt is done when a request completes. If no requests
* are active, but requests are queued, we risk requests getting "stuck" indefinitely, until another request
* attempt is made, which is not guaranteed.
* So, when no requests are in flight in the delegate, we explicitly bypass limits to ensure at least one
*
*/
private LimitEnforcement limitEnforcement() {
return inFlight.get() <= 0 ? LimitEnforcement.DANGEROUS_BYPASS_LIMITS : LimitEnforcement.DEFAULT_ENABLED;
}

private SafeIllegalStateException limitEnforcementExpectationFailure(Endpoint endpoint) {
return new SafeIllegalStateException(
"A request which explicitly bypassed rate limits failed to execute, which "
+ "violates the requirements of the QueuedChannel. Please report this to "
+ "the Dialogue maintainers!",
SafeArg.of("channel", channelName),
SafeArg.of("service", endpoint.serviceName()),
SafeArg.of("endpoint", endpoint.endpointName()));
}

@Override
public String toString() {
return "QueuedChannel{queueSizeEstimate="
Expand All @@ -327,6 +379,9 @@ private class ForwardAndSchedule implements FutureCallback<Response> {

@Override
public void onSuccess(Response result) {
// decrementing inflight must occur prior to calling schedule, ensuring that
// schedule may be called after inflight is returned to zero.
inFlight.decrementAndGet();
if (!response.set(result)) {
result.close();
}
Expand All @@ -335,6 +390,9 @@ public void onSuccess(Response result) {

@Override
public void onFailure(Throwable throwable) {
// decrementing inflight must occur prior to calling schedule, ensuring that
// schedule may be called after inflight is returned to zero.
inFlight.decrementAndGet();
if (!response.setException(throwable)) {
if (throwable instanceof CancellationException) {
log.debug("Call was canceled", throwable);
Expand Down
Loading

0 comments on commit 7074d6e

Please sign in to comment.