Skip to content

Commit

Permalink
Latency tracking utilities to support LB L7 breakers (#2794)
Browse files Browse the repository at this point in the history
* Latency tracking utilities to support LB L7 breakers

Motivation

Support latency tracking APIs and default EWMA based decay-able solutions to allow implementation of richer L7 circuit breakers for the load balancers.

Modification

Introduced the LatencyTracker API along with a default implementation.


Co-authored-by: Bryce Anderson <bryce_anderson@apple.com>
  • Loading branch information
tkountis and bryce-anderson authored Jan 10, 2024
1 parent cb8405d commit 8a8e3ab
Show file tree
Hide file tree
Showing 4 changed files with 380 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.IntBinaryOperator;
import java.util.function.LongSupplier;

import static java.lang.Integer.MAX_VALUE;
import static java.lang.Integer.MIN_VALUE;
import static java.lang.Math.ceil;
import static java.lang.Math.exp;
import static java.lang.Math.log;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;

/**
* Latency tracker using exponential weighted moving average based on the work by Andreas Eckner and subsequent
* modifications under PeakEWMA in <a href= "https://github.com/twitter/finagle"> finagle</a>.
*
* @see <a href="http://www.eckner.com/papers/Algorithms%20for%20Unevenly%20Spaced%20Time%20Series.pdf"> Eckner
* (2019) Algorithms for Unevenly Spaced Time Series: Moving Averages and Other Rolling Operators (4.2 pp. 10)</a>
*/
final class DefaultLatencyTracker implements LatencyTracker {
private static final AtomicIntegerFieldUpdater<DefaultLatencyTracker> pendingUpdater =
newUpdater(DefaultLatencyTracker.class, "pending");
private static final long MAX_MS_TO_NS = NANOSECONDS.convert(MAX_VALUE, MILLISECONDS);
private static final long DEFAULT_CANCEL_PENALTY = 5L;
private static final long DEFAULT_ERROR_PENALTY = 10L;
/**
* Mean lifetime, exponential decay. inverted tau
*/
private final double invTau;
private final LongSupplier currentTimeSupplier;
private final long cancelPenalty;
private final long errorPenalty;

/**
* Last inserted value to compute weight.
*/
private long lastTimeNanos;
/**
* Current weighted average.
*/
private int ewma;
private volatile int pending;

DefaultLatencyTracker(final long halfLifeNanos, final LongSupplier currentTimeSupplier) {
this(halfLifeNanos, currentTimeSupplier, DEFAULT_CANCEL_PENALTY, DEFAULT_ERROR_PENALTY);
}

DefaultLatencyTracker(final long halfLifeNanos, final LongSupplier currentTimeSupplier,
long cancelPenaly, long errorPenaly) {
if (halfLifeNanos <= 0) {
throw new IllegalArgumentException("halfLifeNanos: " + halfLifeNanos + " (expected >0)");
}
this.invTau = Math.pow((halfLifeNanos / log(2)), -1);
this.currentTimeSupplier = currentTimeSupplier;
this.lastTimeNanos = currentTimeSupplier.getAsLong();
this.cancelPenalty = cancelPenaly;
this.errorPenalty = errorPenaly;
}

@Override
public long beforeStart() {
pendingUpdater.incrementAndGet(this);
return currentTimeSupplier.getAsLong();
}

@Override
public void observeSuccess(final long startTimeNanos) {
pendingUpdater.decrementAndGet(this);
calculateAndStore((ewma, currentLatency) -> currentLatency, startTimeNanos);
}

@Override
public void observeCancel(final long startTimeNanos) {
pendingUpdater.decrementAndGet(this);
calculateAndStore(this::cancelPenalty, startTimeNanos);
}

@Override
public void observeError(final long startTimeNanos) {
pendingUpdater.decrementAndGet(this);
calculateAndStore(this::errorPenalty, startTimeNanos);
}

@Override
public int score() {
final int currentEWMA = calculateAndStore((ewma, lastTimeNanos) -> 0, 0);
final int cPending = pendingUpdater.get(this);
if (currentEWMA == 0) {
// If EWMA has decayed to 0 (or isn't yet initialized) and there are no pending requests we return the
// maximum score to increase the likelihood this entity is selected. If there are pending requests we
// don't yet know the latency characteristics so we return the minimum score to decrease the
// likelihood this entity is selected.
return cPending == 0 ? 0 : MIN_VALUE;
}

// Add penalty for pending requests to account for "unaccounted" load.
// Penalty is the observed latency if known, else an arbitrarily high value which makes entities for which
// no latency data has yet been received (eg: request sent but not received), un-selectable.
final int pendingPenalty = (int) min(MAX_VALUE, (long) cPending * currentEWMA);
// Since we are measuring latencies and lower latencies are better, we turn the score as negative such that
// lower the latency, higher the score.
return MAX_VALUE - currentEWMA <= pendingPenalty ? MIN_VALUE : -(currentEWMA + pendingPenalty);
}

private int cancelPenalty(int currentEWMA, int currentLatency) {
// There is no significance to the choice of this multiplier (other than it is half of the error penalty)
// and it is selected to gather empirical evidence as the algorithm is evaluated.
return applyPenalty(currentEWMA, currentLatency, cancelPenalty);
}

private int errorPenalty(int currentEWMA, int currentLatency) {
// There is no significance to the choice of this multiplier (other than it is double of the cancel penalty)
// and it is selected to gather empirical evidence as the algorithm is evaluated.
return applyPenalty(currentEWMA, currentLatency, errorPenalty);
}

private static int applyPenalty(int currentEWMA, int currentLatency, long penalty) {
// Relatively large latencies will have a bigger impact on the penalty, while smaller latencies (e.g. premature
// cancel/error) rely on the penalty.
return (int) min(MAX_VALUE, max(currentEWMA, currentLatency) * penalty);
}

private synchronized int calculateAndStore(final IntBinaryOperator latencyInitializer, long startTimeNanos) {
final int nextEWMA;
// We capture the current time inside the synchronized block to exploit the monotonic time source
// properties which prevent the time duration from going negative. This will result in a latency penalty
// as concurrency increases, but is a trade-off for simplicity.
final long currentTimeNanos = currentTimeSupplier.getAsLong();
// When modifying the EWMA and lastTime we read/write both values in a synchronized block as they are
// tightly coupled in the EWMA formula below.
final int currentEWMA = ewma;
final int currentLatency = latencyInitializer.applyAsInt(ewma, nanoToMillis(currentTimeNanos - startTimeNanos));
// Note the currentLatency cannot be <0 or else the EWMA equation properties are violated
// (e.g. "degree of weighting decrease" is not in [0, 1]).
assert currentLatency >= 0;

// Peak EWMA from finagle for the score to be extremely sensitive to higher than normal latencies.
if (currentLatency > currentEWMA) {
nextEWMA = currentLatency;
} else {
final double tmp = (currentTimeNanos - lastTimeNanos) * invTau;
final double w = exp(-tmp);
nextEWMA = (int) ceil(currentEWMA * w + currentLatency * (1d - w));
}
lastTimeNanos = currentTimeNanos;
ewma = nextEWMA;
return nextEWMA;
}

private static int nanoToMillis(long nanos) {
return (int) MILLISECONDS.convert(min(nanos, MAX_MS_TO_NS), NANOSECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.ScoreSupplier;

import java.time.Duration;
import java.util.function.LongSupplier;

/**
* A tracker of latency of an action over time.
*/
interface LatencyTracker extends ScoreSupplier {

/**
* Invoked before each start of the action for which latency is to be tracked.
*
* @return Current time in nanoseconds.
*/
long beforeStart();

/**
* Records a successful completion of the action for which latency is to be tracked.
*
* @param beforeStartTimeNs return value from {@link #beforeStart()}.
*/
void observeSuccess(long beforeStartTimeNs);

/**
* Records cancellation of the action for which latency is to be tracked.
*
* @param beforeStartTimeNs return value from {@link #beforeStart()}.
*/
void observeCancel(long beforeStartTimeNs);

/**
* Records a failed completion of the action for which latency is to be tracked.
*
* @param beforeStartTimeNs return value from {@link #beforeStart()}.
*/
void observeError(long beforeStartTimeNs);

/**
* Create a latency tracker.
*
* @param measurementHalfLife The half-life decay hint period for the tracker.
* This sets the half-life for which past results will be "forgotten" so that newer
* data has exponentially more weight than historical data.
* @param currentTimeSupplier A wall-time supplier.
*/
static LatencyTracker newTracker(final Duration measurementHalfLife, final LongSupplier currentTimeSupplier) {
return new DefaultLatencyTracker(measurementHalfLife.toNanos(), currentTimeSupplier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import static java.util.Objects.requireNonNull;

/**
* A two-level latency tracker, namely root and leaf.
* Each tracking interaction influences both levels, but reporting operations (i.e., {@link #score()}) will only
* consult the leaf.
*/
final class RootSwayingLeafLatencyTracker implements LatencyTracker {

private final LatencyTracker root;
private final LatencyTracker leaf;
RootSwayingLeafLatencyTracker(final LatencyTracker root, final LatencyTracker leaf) {
this.root = requireNonNull(root);
this.leaf = requireNonNull(leaf);
}

@Override
public long beforeStart() {
// Tracks both levels
final long timestamp = root.beforeStart();
leaf.beforeStart();
return timestamp;
}

@Override
public void observeSuccess(final long beforeStartTimeNs) {
// Tracks both levels
root.observeSuccess(beforeStartTimeNs);
leaf.observeSuccess(beforeStartTimeNs);
}

@Override
public void observeCancel(final long beforeStartTimeNs) {
// Tracks both levels
root.observeCancel(beforeStartTimeNs);
leaf.observeCancel(beforeStartTimeNs);
}

@Override
public void observeError(final long beforeStartTimeNs) {
// Tracks both levels
root.observeError(beforeStartTimeNs);
leaf.observeError(beforeStartTimeNs);
}

@Override
public int score() {
// Reports the level score
return leaf.score();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;

import static io.servicetalk.loadbalancer.LatencyTracker.newTracker;
import static java.lang.System.nanoTime;
import static java.time.Duration.ofSeconds;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class LatencyTrackerTest {

@Test
void test() {
final LongUnaryOperator nextValueProvider = mock(LongUnaryOperator.class);
when(nextValueProvider.applyAsLong(anyLong())).thenAnswer(__ -> ofSeconds(1).toNanos());
final LongSupplier currentTimeSupplier = new TestClock(nextValueProvider);

final LatencyTracker latencyTracker = newTracker(Duration.ofSeconds(1), currentTimeSupplier);
Assertions.assertEquals(0, latencyTracker.score());

// upon success score
long start = latencyTracker.beforeStart();
latencyTracker.observeSuccess(start);
Assertions.assertEquals(-500, latencyTracker.score());

// error penalty
start = latencyTracker.beforeStart();
latencyTracker.observeError(start);
Assertions.assertEquals(-5000, latencyTracker.score());

// decay
when(nextValueProvider.applyAsLong(anyLong())).thenAnswer(__ -> ofSeconds(20).toNanos());
Assertions.assertEquals(-1, latencyTracker.score());
}

static final class TestClock implements LongSupplier {
private final LongUnaryOperator nextValueProvider;
private long lastValue = nanoTime();

TestClock(final LongUnaryOperator nextValueProvider) {
this.nextValueProvider = nextValueProvider;
}

@Override
public long getAsLong() {
lastValue += nextValueProvider.applyAsLong(lastValue);
return lastValue;
}
}
}

0 comments on commit 8a8e3ab

Please sign in to comment.