Skip to content

Commit

Permalink
reduce crufy from the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
carterkozak committed Feb 25, 2020
1 parent 554928f commit b697561
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,17 @@

import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,45 +46,21 @@ final class BlacklistingChannel implements LimitedChannel {

private static final Logger log = LoggerFactory.getLogger(BlacklistingChannel.class);
private static final int NUM_PROBATION_REQUESTS = 5;
/*
* Shared single thread executor is reused between all blacklisting channels. If it becomes oversaturated
* we may wait longer than expected before resuming requests to blacklisted channels, but this is an
* edge case where things are already operating in a degraded state.
*/
private static final Supplier<ScheduledExecutorService> sharedScheduler = Suppliers.memoize(
() -> Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
.setNameFormat("dialogue-BlacklistingChannel-scheduler-%d")
.setDaemon(false)
.build()));

private final LimitedChannel delegate;
private final Duration duration;
private final Ticker ticker;
private final LimitedChannelListener listener;
private final ScheduledExecutorService scheduler;
private final AtomicReference<BlacklistState> channelBlacklistState;

BlacklistingChannel(LimitedChannel delegate, Duration duration, LimitedChannelListener listener) {
this(delegate, duration, listener, Ticker.systemTicker(), sharedScheduler.get());
BlacklistingChannel(LimitedChannel delegate, Duration duration) {
this(delegate, duration, Ticker.systemTicker());
}

@VisibleForTesting
BlacklistingChannel(LimitedChannel delegate, Duration duration, LimitedChannelListener listener, Ticker ticker) {
this(delegate, duration, listener, ticker, sharedScheduler.get());
}

@VisibleForTesting
BlacklistingChannel(
LimitedChannel delegate,
Duration duration,
LimitedChannelListener listener,
Ticker ticker,
ScheduledExecutorService scheduler) {
BlacklistingChannel(LimitedChannel delegate, Duration duration, Ticker ticker) {
this.delegate = delegate;
this.duration = duration;
this.ticker = ticker;
this.listener = listener;
this.scheduler = scheduler;
this.channelBlacklistState = new AtomicReference<>();
}

Expand All @@ -112,15 +81,12 @@ public String toString() {

private final class BlacklistState {
private final AtomicReference<Probation> probation = new AtomicReference<>();
private final ScheduledFuture<?> scheduledFuture;
private final long blacklistUntilNanos;
private final int probationPermits;

BlacklistState(Duration duration, int probationPermits) {
this.blacklistUntilNanos = ticker.read() + duration.toNanos();
this.probationPermits = probationPermits;
this.scheduledFuture =
scheduler.schedule(this::maybeBeginProbation, duration.toNanos(), TimeUnit.NANOSECONDS);
}

Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) {
Expand Down Expand Up @@ -148,8 +114,6 @@ Optional<Probation> maybeBeginProbation() {
if (log.isDebugEnabled()) {
log.debug("Channel {} is entering probation", UnsafeArg.of("channel", delegate));
}
listener.onChannelReady();
scheduledFuture.cancel(false);
}
return Optional.ofNullable(probation.get());
}
Expand All @@ -161,7 +125,7 @@ void markSuccess() {
if (maybeProbation != null && maybeProbation.checkIfProbationIsComplete()) {
log.debug("Clearing probation state");
if (channelBlacklistState.compareAndSet(this, null)) {
listener.onChannelReady();
log.debug("Channel is no longer blacklisted");
} else {
log.debug("Blacklist state has already been updated");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
* Similar to {@link com.palantir.dialogue.Channel}, but may not actually execute the call (eg: when the channel is
* overloaded). Semantics match {@link com.palantir.dialogue.Channel} aside from returning an
* {@link Optional optional response}.
* Limited channels must limit exclusively based on the state of the {@link com.palantir.dialogue.Channel}, not
* the {@link Endpoint} or {@link Request} arguments, otherwise the caller (generally a {@link QueuedChannel})
* may prevent <i>all</i> requests from proceeding.
*/
interface LimitedChannel {
Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class BlacklistingChannelTest {

@BeforeEach
public void before() {
channel = new BlacklistingChannel(delegate, BLACKLIST_DURATION, () -> {}, ticker);
channel = new BlacklistingChannel(delegate, BLACKLIST_DURATION, ticker);

futureResponse = SettableFuture.create();
when(delegate.maybeExecute(endpoint, request)).thenReturn(Optional.of(futureResponse));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private static Channel concurrencyLimiterBlacklistRoundRobin(
List<LimitedChannel> limitedChannels = channels.stream()
.map(addConcurrencyLimiter(sim))
.map(addFixedLimiter(sim))
.map(c -> new BlacklistingChannel(c, Duration.ofSeconds(1), () -> {}, sim.clock(), sim.scheduler()))
.map(c -> new BlacklistingChannel(c, Duration.ofSeconds(1), sim.clock()))
.collect(Collectors.toList());
LimitedChannel limited1 = new RoundRobinChannel(limitedChannels);
return queuedChannelAndRetrying(sim, limited1);
Expand Down

0 comments on commit b697561

Please sign in to comment.