Skip to content

Commit

Permalink
use AIMD limiter class as the state object
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Laub committed Nov 12, 2024
1 parent 909a2cd commit 1c7f30f
Showing 1 changed file with 11 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
import java.util.Optional;
import java.util.stream.LongStream;
import org.immutables.value.Value;

/**
* A channel that monitors the successes and failures of requests in order to determine the number of concurrent
Expand All @@ -40,19 +39,22 @@
final class ConcurrencyLimitedChannel implements LimitedChannel {
private static final SafeLogger log = SafeLoggerFactory.get(ConcurrencyLimitedChannel.class);

private static final ChannelState.Key<ConcurrencyLimitedChannelState> STATE_HOLDER_KEY =
new ChannelState.Key<>(ConcurrencyLimitedChannelState.class, ConcurrencyLimitedChannel::createState);
private static final ChannelState.Key<CautiousIncreaseAggressiveDecreaseConcurrencyLimiter>
HOST_SPECIFIC_STATE_KEY = new ChannelState.Key<>(
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class,
ConcurrencyLimitedChannel::createHostSpecificState);

private final NeverThrowChannel delegate;
private final CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter;
private final String channelNameForLogging;

static LimitedChannel createForHost(Config cf, Channel channel, int uriIndex, ChannelState hostSpecificState) {
TaggedMetricRegistry metrics = cf.clientConf().taggedMetricRegistry();
ConcurrencyLimitedChannelState state = hostSpecificState.getState(STATE_HOLDER_KEY);
ConcurrencyLimitedChannelInstrumentation instrumentation = new HostConcurrencyLimitedChannelInstrumentation(
cf.channelName(), uriIndex, state.hostLimiter(), metrics);
return new ConcurrencyLimitedChannel(channel, state.hostLimiter(), instrumentation);
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter =
hostSpecificState.getState(HOST_SPECIFIC_STATE_KEY);
ConcurrencyLimitedChannelInstrumentation instrumentation =
new HostConcurrencyLimitedChannelInstrumentation(cf.channelName(), uriIndex, limiter, metrics);
return new ConcurrencyLimitedChannel(channel, limiter, instrumentation);
}

/**
Expand All @@ -75,10 +77,8 @@ static LimitedChannel createForEndpoint(Channel channel, String channelName, int
this.channelNameForLogging = instrumentation.channelNameForLogging();
}

static ConcurrencyLimitedChannelState createState() {
return ImmutableConcurrencyLimitedChannelState.builder()
.hostLimiter(new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL))
.build();
static CautiousIncreaseAggressiveDecreaseConcurrencyLimiter createHostSpecificState() {
return new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL);
}

@Override
Expand Down Expand Up @@ -184,9 +184,4 @@ public String channelNameForLogging() {
return channelNameForLogging;
}
}

@Value.Immutable
interface ConcurrencyLimitedChannelState {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter hostLimiter();
}
}

0 comments on commit 1c7f30f

Please sign in to comment.