Skip to content

Commit

Permalink
Follow-up comments for traffic resilience features (#2915)
Browse files Browse the repository at this point in the history
Motivation:

Minor comments for #2911.
  • Loading branch information
idelpivnitskiy authored May 9, 2024
1 parent baac8d6 commit 7bb8b6d
Show file tree
Hide file tree
Showing 29 changed files with 78 additions and 94 deletions.
5 changes: 1 addition & 4 deletions servicetalk-capacity-limiter-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ apply plugin: "io.servicetalk.servicetalk-gradle-plugin-internal-library"

dependencies {
implementation platform(project(":servicetalk-dependencies"))
api project(":servicetalk-concurrent-api")
api project(":servicetalk-transport-api")
api project(":servicetalk-context-api")

implementation project(":servicetalk-annotations")
implementation project(":servicetalk-concurrent-internal")
implementation project(":servicetalk-utils-internal")
implementation "com.google.code.findbugs:jsr305"
implementation "org.slf4j:slf4j-api"

testImplementation enforcedPlatform("org.junit:junit-bom:$junit5Version")
testImplementation testFixtures(project(":servicetalk-concurrent-internal"))
testImplementation project(":servicetalk-concurrent-test-internal")
testImplementation project(":servicetalk-test-resources")
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.hamcrest:hamcrest"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* represent a constant rate (i.e., has no notion of time).
* <p>
* The solution is based on the <a href="https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease">
* AIMD feedback control algorithm</a>
* AIMD feedback control algorithm</a>.
*/
final class AimdCapacityLimiter implements CapacityLimiter {

Expand Down Expand Up @@ -214,7 +214,6 @@ private static final class DefaultTicket implements Ticket, LimiterState {
this.remaining = remaining;
}

@Nullable
@Override
public LimiterState state() {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
*/
package io.servicetalk.apple.capacity.limiter.api;

import io.servicetalk.utils.internal.DurationUtils;
import io.servicetalk.utils.internal.NumberUtils;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.DurationUtils.ensureNonNegative;
import static io.servicetalk.utils.internal.NumberUtils.ensureBetweenZeroAndOneExclusive;
import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;
import static io.servicetalk.utils.internal.NumberUtils.ensureRange;
import static java.lang.Integer.MAX_VALUE;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -138,7 +136,7 @@ public AimdCapacityLimiterBuilder backoffRatio(final float onDrop, final float o
* @return {@code this}.
*/
public AimdCapacityLimiterBuilder increment(final float increment) {
this.increment = NumberUtils.ensurePositive(increment, "increment");
this.increment = ensurePositive(increment, "increment");
return this;
}

Expand All @@ -151,7 +149,7 @@ public AimdCapacityLimiterBuilder increment(final float increment) {
* @return {@code this}.
*/
public AimdCapacityLimiterBuilder cooldown(final Duration duration) {
this.cooldown = DurationUtils.ensureNonNegative(duration, "cooldown");
this.cooldown = ensureNonNegative(duration, "cooldown");
return this;
}

Expand Down Expand Up @@ -190,9 +188,8 @@ public CapacityLimiter build() {
timeSource);
}

@Nonnull
private String name() {
return name == null ? AimdCapacityLimiter.class.getSimpleName() + "_" + SEQ_GEN.incrementAndGet() : name;
return name == null ? AimdCapacityLimiter.class.getSimpleName() + '-' + SEQ_GEN.incrementAndGet() : name;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public int ignored() {
};

private AllowAllCapacityLimiter() {
// Singleton
}

@Override
Expand All @@ -64,6 +65,6 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co

@Override
public String toString() {
return AllowAllCapacityLimiter.class.getSimpleName();
return name();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
* {@link Classification} is used as hint from the user of the importance of the incoming request, and are not
* guaranteed to have an influence to the decision if the {@link CapacityLimiter} doesn't support them or chooses to
* ignore them.
*
*/
public interface CapacityLimiter {

Expand All @@ -67,7 +66,7 @@ public interface CapacityLimiter {
* {@link ContextMap context}.
*
* @param classification A class tha represents the <i>importance</i> of a request, to be evaluated upon permit.
* @param context Contextual metadata supported for evaluation from the call-site. This, in an Http context
* @param context Contextual metadata supported for evaluation from the call-site. This, in an HTTP context
* would typically be the {@code HttpRequest#context()}.
* @return {@link Ticket} when capacity is enough to satisfy the demand or {@code null} when not.
* @see CapacityLimiter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public static CapacityLimiter composite(final List<CapacityLimiter> providers) {
* Returns a {@link CapacityLimiter} that will reject all requests till the current pending request count is equal
* or less to the passed {@code capacity}.
* This {@link CapacityLimiter} takes into consideration the {@link Classification} of a given request and will
* variate the effective {@code capacity} according to the {@link Classification#priority() weight} before
* variate the effective {@code capacity} according to the {@link Classification#priority() priority} before
* attempting to grant access to the request. The effective {@code capacity} will never be more than the given
* {@code capacity}.
* <p>
* Requests with {@link Classification#priority() weight} equal to or greater than {@code 100} will enjoy
* the full capacity (100%), while requests with {@link Classification#priority() weight} less than {@code 100}
* Requests with {@link Classification#priority() priority} equal to or greater than {@code 100} will enjoy
* the full capacity (100%), while requests with {@link Classification#priority() priority} less than {@code 100}
* will be mapped to a percentage point of the given {@code capacity} and be granted access only if the {@code
* consumed capacity} is less than that percentage.
* <br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

/**
* Classification of requests.
* <p>
* In the context of capacity, classification can be used to allow prioritization of requests under
* certain conditions. When a system is load-shedding, it can still choose to accommodate important demand.
* The classification is not a feature supported by all {@link CapacityLimiter}s but rather the
Expand All @@ -34,8 +35,10 @@ public interface Classification {
/**
* The priority should be a positive number between 0 and 100 (inclusive), which hints to a {@link CapacityLimiter}
* the importance of a {@link Classification}.
* <p>
* Higher value represents the most important {@link Classification}, while lower value represents less important
* {@link Classification}.
*
* @return A positive value between 0 and 100 (inclusive) that hints importance of a request to a
* {@link CapacityLimiter}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
Expand All @@ -32,23 +33,24 @@
final class CompositeCapacityLimiter implements CapacityLimiter {

private final List<CapacityLimiter> providers;
private final String namesCsv;
private final String name;

CompositeCapacityLimiter(final List<CapacityLimiter> providers) {
if (requireNonNull(providers).isEmpty()) {
throw new IllegalArgumentException("Empty capacity limiters.");
}
this.providers = new ArrayList<>(providers);
this.namesCsv = providers.stream().map(CapacityLimiter::name).collect(joining(", "));
this.name = CompositeCapacityLimiter.class.getSimpleName() + "[ " +
providers.stream().map(CapacityLimiter::name).collect(joining(", ")) + " ]";
}

@Override
public String name() {
return CompositeCapacityLimiter.class.getSimpleName() + "[ " + namesCsv + " ]";
return name;
}

@Override
public Ticket tryAcquire(final Classification classification, final ContextMap context) {
public Ticket tryAcquire(final Classification classification, @Nullable final ContextMap context) {
Ticket[] results = null;
int idx = 0;
for (CapacityLimiter provider : providers) {
Expand All @@ -72,7 +74,7 @@ public Ticket tryAcquire(final Classification classification, final ContextMap c
return compositeResult(results);
}

private int completed(Ticket[] results) {
private static int completed(Ticket[] results) {
int remaining = 1;
for (Ticket ticket : results) {
if (ticket == null) {
Expand All @@ -86,7 +88,7 @@ private int completed(Ticket[] results) {
return remaining;
}

private int failed(Throwable cause, Ticket[] results) {
private static int failed(Throwable cause, Ticket[] results) {
int remaining = 1;
for (Ticket ticket : results) {
if (ticket == null) {
Expand All @@ -100,7 +102,7 @@ private int failed(Throwable cause, Ticket[] results) {
return remaining;
}

private int dropped(Ticket[] results) {
private static int dropped(Ticket[] results) {
int remaining = 1;
for (Ticket ticket : results) {
if (ticket == null) {
Expand All @@ -114,7 +116,7 @@ private int dropped(Ticket[] results) {
return remaining;
}

private int cancelled(Ticket[] results) {
private static int cancelled(Ticket[] results) {
int remaining = 1;
for (Ticket ticket : results) {
if (ticket == null) {
Expand All @@ -128,33 +130,33 @@ private int cancelled(Ticket[] results) {
return remaining;
}

private Ticket compositeResult(final Ticket[] tickets) {
private static Ticket compositeResult(final Ticket[] tickets) {
return new Ticket() {
@Override
public LimiterState state() {
// Targeting the most specific one (assuming an order of rate-limiter, customer-quota-limiter
// Targeting the most specific one (assuming an order of rate-limiter, customer-quota-limiter).
// In the future we could make this configurable if proven useful.
return tickets[tickets.length - 1].state();
}

@Override
public int completed() {
return CompositeCapacityLimiter.this.completed(tickets);
return CompositeCapacityLimiter.completed(tickets);
}

@Override
public int failed(final Throwable cause) {
return CompositeCapacityLimiter.this.failed(cause, tickets);
return CompositeCapacityLimiter.failed(cause, tickets);
}

@Override
public int dropped() {
return CompositeCapacityLimiter.this.dropped(tickets);
return CompositeCapacityLimiter.dropped(tickets);
}

@Override
public int ignored() {
return CompositeCapacityLimiter.this.cancelled(tickets);
return CompositeCapacityLimiter.cancelled(tickets);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public String name() {
}

@Override
public Ticket tryAcquire(final Classification classification, final ContextMap meta) {
final int weight = min(max(classification.priority(), 0), 100);
final int effectiveLimit = (capacity * weight) / 100;
public Ticket tryAcquire(final Classification classification, @Nullable final ContextMap meta) {
final int priority = min(max(classification.priority(), 0), 100);
final int effectiveLimit = (capacity * priority) / 100;
for (;;) {
final int currPending = pending;
if (currPending == effectiveLimit &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.servicetalk.apple.capacity.limiter.api;

import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -46,7 +45,7 @@ public FixedCapacityLimiterBuilder name(final String name) {
/**
* Defines the fixed capacity for the {@link CapacityLimiter}.
* Concurrent requests above this figure will be rejected. Requests with particular
* {@link Classification#priority() weight} will be respected and the total capacity for them will be adjusted
* {@link Classification#priority() priority} will be respected and the total capacity for them will be adjusted
* accordingly.
* @param capacity The max allowed concurrent requests that this {@link CapacityLimiter} should allow.
* @return {@code this}.
Expand Down Expand Up @@ -81,9 +80,8 @@ public CapacityLimiter build() {
return new FixedCapacityLimiter(name(), capacity, observer);
}

@Nonnull
private String name() {
return name == null ? FixedCapacityLimiter.class.getSimpleName() + "_" + SEQ_GEN.incrementAndGet() : name;
return name == null ? FixedCapacityLimiter.class.getSimpleName() + '-' + SEQ_GEN.incrementAndGet() : name;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.LongSupplier;
import javax.annotation.Nullable;

import static java.lang.Double.isNaN;
import static java.lang.Math.max;
Expand Down Expand Up @@ -122,7 +123,7 @@ public String name() {
}

@Override
public Ticket tryAcquire(final Classification classification, final ContextMap meta) {
public Ticket tryAcquire(final Classification classification, @Nullable final ContextMap meta) {
int newPending;
int newLimit;

Expand Down Expand Up @@ -153,7 +154,7 @@ private int updateLimit(final long timestampNs, final double shortLatencyMillis,
// When positive gradient, and limit already above initial,
// avoid increasing the limit when we are far from meeting it - i.e. blast radius.
final boolean isPositiveSuspended = !isNaN(gradient) &&
(gradient > 1.0 && limit > initial && suspendLimitInc.test(pending, limit));
gradient > 1.0 && limit > initial && suspendLimitInc.test(pending, limit);
// When negative gradient, and consumption not close to limit,
// avoid decreasing the limit. Low RPS & noisy environments (RTT deviations > 500ms) tend to bring the limit
// down to "min" even though there aren't many pending requests to justify the decision.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.function.BiPredicate;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.apple.capacity.limiter.api.GradientCapacityLimiterProfiles.DEFAULT_INITIAL_LIMIT;
Expand Down Expand Up @@ -176,8 +175,7 @@ public GradientCapacityLimiterBuilder backoffRatio(final float onDrop, final flo
* @return {@code this}.
*/
public GradientCapacityLimiterBuilder headroom(final BiFunction<Double, Double, Double> headroom) {
requireNonNull(headroom);
this.headroom = headroom;
this.headroom = requireNonNull(headroom);
return this;
}

Expand Down Expand Up @@ -243,8 +241,7 @@ public GradientCapacityLimiterBuilder limitUpdateInterval(final Duration duratio
* @return {@code this}.
*/
public GradientCapacityLimiterBuilder minGradient(final float minGradient) {
ensureBetweenZeroAndOne(minGradient, "minGradient");
this.minGradient = minGradient;
this.minGradient = ensureBetweenZeroAndOne(minGradient, "minGradient");
return this;
}

Expand All @@ -258,8 +255,7 @@ public GradientCapacityLimiterBuilder minGradient(final float minGradient) {
* @return {@code this}.
*/
public GradientCapacityLimiterBuilder maxGradient(final float maxPositiveGradient) {
ensureGreaterThan(maxPositiveGradient, 1.0f, "maxGradient");
this.maxGradient = maxPositiveGradient;
this.maxGradient = ensureGreaterThan(maxPositiveGradient, 1.0f, "maxGradient");
return this;
}

Expand Down Expand Up @@ -327,9 +323,8 @@ public CapacityLimiter build() {
suspendLimitInc, suspendLimitDec, headroom, timeSource);
}

@Nonnull
private String name() {
return name == null ? GradientCapacityLimiter.class.getSimpleName() + "_" + SEQ_GEN.incrementAndGet() : name;
return name == null ? GradientCapacityLimiter.class.getSimpleName() + '-' + SEQ_GEN.incrementAndGet() : name;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* tracker, allowing a way to enhance the algorithm's behavior according to the observations.
* Implementations must provide thread-safety guarantees.
*/
@FunctionalInterface
interface LatencyTracker {

/**
Expand Down
Loading

0 comments on commit 7bb8b6d

Please sign in to comment.