Skip to content

Commit

Permalink
common: jittered backoff implementation (#3791)
Browse files Browse the repository at this point in the history
Implements fully jittered exponential backoff algorithm and refactors router retry timer to use this.
Risk Level: Low
Testing: Added automated tests
Docs Changes: N/A
Release Notes: N/A

Signed-off-by: Rama <rama.rao@salesforce.com>
  • Loading branch information
ramaraochavali authored and htuch committed Jul 11, 2018
1 parent df7a291 commit b1f870a
Show file tree
Hide file tree
Showing 18 changed files with 110 additions and 95 deletions.
1 change: 1 addition & 0 deletions source/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ envoy_cc_library(
deps = [
":assert_lib",
"//include/envoy/common:backoff_strategy_interface",
"//include/envoy/runtime:runtime_interface",
],
)

Expand Down
34 changes: 13 additions & 21 deletions source/common/common/backoff_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,21 @@

namespace Envoy {

ExponentialBackOffStrategy::ExponentialBackOffStrategy(uint64_t initial_interval,
uint64_t max_interval, double multiplier)
: initial_interval_(initial_interval), max_interval_(max_interval), multiplier_(multiplier),
current_interval_(0) {
ASSERT(multiplier_ > 1.0);
ASSERT(initial_interval_ <= max_interval_);
ASSERT(initial_interval_ * multiplier_ <= max_interval_);
JitteredBackOffStrategy::JitteredBackOffStrategy(uint64_t base_interval, uint64_t max_interval,
Runtime::RandomGenerator& random)
: base_interval_(base_interval), max_interval_(max_interval), random_(random) {
ASSERT(base_interval_ <= max_interval_);
}

uint64_t ExponentialBackOffStrategy::nextBackOffMs() { return computeNextInterval(); }

void ExponentialBackOffStrategy::reset() { current_interval_ = 0; }

uint64_t ExponentialBackOffStrategy::computeNextInterval() {
if (current_interval_ == 0) {
current_interval_ = initial_interval_;
} else if (current_interval_ >= max_interval_) {
current_interval_ = max_interval_;
} else {
uint64_t new_interval = current_interval_;
new_interval = ceil(new_interval * multiplier_);
current_interval_ = new_interval > max_interval_ ? max_interval_ : new_interval;
uint64_t JitteredBackOffStrategy::nextBackOffMs() {
const uint64_t multiplier = (1 << current_retry_) - 1;
const uint64_t base_backoff = multiplier * base_interval_;
if (base_backoff <= max_interval_) {
current_retry_++;
}
return current_interval_;
return std::min(random_.random() % base_backoff, max_interval_);
}

void JitteredBackOffStrategy::reset() { current_retry_ = 1; }

} // namespace Envoy
25 changes: 15 additions & 10 deletions source/common/common/backoff_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,35 @@
#include <memory>

#include "envoy/common/backoff_strategy.h"
#include "envoy/runtime/runtime.h"

#include "common/common/assert.h"

namespace Envoy {

/**
* Implementation of BackOffStrategy that increases the back off period for each retry attempt. When
* the interval has reached the max interval, it is no longer increased.
* Implementation of BackOffStrategy that uses a fully jittered exponential backoff algorithm.
*/
class ExponentialBackOffStrategy : public BackOffStrategy {
class JitteredBackOffStrategy : public BackOffStrategy {

public:
ExponentialBackOffStrategy(uint64_t initial_interval, uint64_t max_interval, double multiplier);
/**
* Use this constructor if max_interval need to be enforced.
* @param base_interval the base_interval to be used for next backoff computation.
* @param max_interval if the computed next backoff is more than this, this will be returned.
* @param random the random generator
*/
JitteredBackOffStrategy(uint64_t base_interval, uint64_t max_interval,
Runtime::RandomGenerator& random);

// BackOffStrategy methods
uint64_t nextBackOffMs() override;
void reset() override;

private:
uint64_t computeNextInterval();

const uint64_t initial_interval_;
const uint64_t max_interval_;
const double multiplier_;
uint64_t current_interval_;
const uint64_t base_interval_;
const uint64_t max_interval_{};
uint64_t current_retry_{1};
Runtime::RandomGenerator& random_;
};
} // namespace Envoy
12 changes: 6 additions & 6 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ namespace Config {
GrpcMuxImpl::GrpcMuxImpl(const envoy::api::v2::core::Node& node, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method,
MonotonicTimeSource& time_source)
Runtime::RandomGenerator& random, MonotonicTimeSource& time_source)
: node_(node), async_client_(std::move(async_client)), service_method_(service_method),
time_source_(time_source) {
random_(random), time_source_(time_source) {
retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
backoff_strategy_ptr_ = std::make_unique<ExponentialBackOffStrategy>(
RETRY_INITIAL_DELAY_MS, RETRY_MAX_DELAY_MS, MULTIPLIER);
backoff_strategy_ = std::make_unique<JitteredBackOffStrategy>(RETRY_INITIAL_DELAY_MS,
RETRY_MAX_DELAY_MS, random_);
}

GrpcMuxImpl::~GrpcMuxImpl() {
Expand All @@ -31,7 +31,7 @@ GrpcMuxImpl::~GrpcMuxImpl() {
void GrpcMuxImpl::start() { establishNewStream(); }

void GrpcMuxImpl::setRetryTimer() {
retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_ptr_->nextBackOffMs()));
retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
}

void GrpcMuxImpl::establishNewStream() {
Expand Down Expand Up @@ -159,7 +159,7 @@ void GrpcMuxImpl::onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) {

void GrpcMuxImpl::onReceiveMessage(std::unique_ptr<envoy::api::v2::DiscoveryResponse>&& message) {
// Reset here so that it starts with fresh backoff interval on next disconnect.
backoff_strategy_ptr_->reset();
backoff_strategy_->reset();

const std::string& type_url = message->type_url();
ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url, message->version_info());
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class GrpcMuxImpl : public GrpcMux,
public:
GrpcMuxImpl(const envoy::api::v2::core::Node& node, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method,
Runtime::RandomGenerator& random,
MonotonicTimeSource& time_source = ProdMonotonicTimeSource::instance_);
~GrpcMuxImpl();

Expand All @@ -45,7 +46,6 @@ class GrpcMuxImpl : public GrpcMux,
// TODO(htuch): Make this configurable or some static.
const uint32_t RETRY_INITIAL_DELAY_MS = 500;
const uint32_t RETRY_MAX_DELAY_MS = 30000; // Do not cross more than 30s
const double MULTIPLIER = 2;

private:
void setRetryTimer();
Expand Down Expand Up @@ -103,8 +103,9 @@ class GrpcMuxImpl : public GrpcMux,
// Envoy's dependendency ordering.
std::list<std::string> subscriptions_;
Event::TimerPtr retry_timer_;
Runtime::RandomGenerator& random_;
MonotonicTimeSource& time_source_;
BackOffStrategyPtr backoff_strategy_ptr_;
BackOffStrategyPtr backoff_strategy_;
};

class NullGrpcMuxImpl : public GrpcMux {
Expand Down
4 changes: 2 additions & 2 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ template <class ResourceType>
class GrpcSubscriptionImpl : public Config::Subscription<ResourceType> {
public:
GrpcSubscriptionImpl(const envoy::api::v2::core::Node& node, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher,
Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random,
const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats)
: grpc_mux_(node, std::move(async_client), dispatcher, service_method),
: grpc_mux_(node, std::move(async_client), dispatcher, service_method, random),
grpc_mux_subscription_(grpc_mux_, stats) {}

// Config::Subscription
Expand Down
4 changes: 2 additions & 2 deletions source/common/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class SubscriptionFactory {
Config::Utility::factoryForGrpcApiConfigSource(cm.grpcAsyncClientManager(),
config.api_config_source(), scope)
->create(),
dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method),
stats));
dispatcher, random,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), stats));
break;
}
default:
Expand Down
1 change: 1 addition & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ envoy_cc_library(
"//include/envoy/runtime:runtime_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/common:assert_lib",
"//source/common/common:backoff_lib",
"//source/common/common:utility_lib",
"//source/common/grpc:common_lib",
"//source/common/http:codes_lib",
Expand Down
13 changes: 5 additions & 8 deletions source/common/router/retry_state_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,20 @@ RetryStateImpl::RetryStateImpl(const RetryPolicy& route_policy, Http::HeaderMap&
// Merge in the route policy.
retry_on_ |= route_policy.retryOn();
retries_remaining_ = std::max(retries_remaining_, route_policy.numRetries());
const uint32_t base = runtime_.snapshot().getInteger("upstream.base_retry_backoff_ms", 25);
// Cap the max interval to 10 times the base interval to ensure reasonable backoff intervals.
backoff_strategy_ = std::make_unique<JitteredBackOffStrategy>(base, base * 10, random_);
}

RetryStateImpl::~RetryStateImpl() { resetRetry(); }

void RetryStateImpl::enableBackoffTimer() {
// TODO(ramaraochavali): Implement JitteredExponentialBackOff and refactor this.
// We use a fully jittered exponential backoff algorithm.
current_retry_++;
uint32_t multiplier = (1 << current_retry_) - 1;
uint64_t base = runtime_.snapshot().getInteger("upstream.base_retry_backoff_ms", 25);
uint64_t timeout = random_.random() % (base * multiplier);

if (!retry_timer_) {
retry_timer_ = dispatcher_.createTimer([this]() -> void { callback_(); });
}

retry_timer_->enableTimer(std::chrono::milliseconds(timeout));
// We use a fully jittered exponential backoff algorithm.
retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
}

uint32_t RetryStateImpl::parseRetryOn(absl::string_view config) {
Expand Down
4 changes: 3 additions & 1 deletion source/common/router/retry_state_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "envoy/runtime/runtime.h"
#include "envoy/upstream/upstream.h"

#include "common/common/backoff_strategy.h"

#include "absl/strings/string_view.h"
#include "absl/types/optional.h"

Expand Down Expand Up @@ -55,10 +57,10 @@ class RetryStateImpl : public RetryState {
Event::Dispatcher& dispatcher_;
uint32_t retry_on_{};
uint32_t retries_remaining_{1};
uint32_t current_retry_{};
DoRetryCallback callback_;
Event::TimerPtr retry_timer_;
Upstream::ResourcePriority priority_;
BackOffStrategyPtr backoff_strategy_;
};

} // namespace Router
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots
->create(),
main_thread_dispatcher,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources")));
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_));
} else {
ads_mux_.reset(new Config::NullGrpcMuxImpl());
}
Expand Down
1 change: 1 addition & 0 deletions test/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ envoy_cc_test(
srcs = ["backoff_strategy_test.cc"],
deps = [
"//source/common/common:backoff_lib",
"//test/mocks/runtime:runtime_mocks",
],
)

Expand Down
82 changes: 42 additions & 40 deletions test/common/common/backoff_strategy_test.cc
Original file line number Diff line number Diff line change
@@ -1,57 +1,59 @@
#include "common/common/backoff_strategy.h"

#include "test/mocks/runtime/mocks.h"

#include "gtest/gtest.h"

using testing::NiceMock;
using testing::Return;

namespace Envoy {

TEST(BackOffStrategyTest, ExponentialBackOffBasicTest) {
ExponentialBackOffStrategy exponential_back_off(10, 100, 2);
EXPECT_EQ(10, exponential_back_off.nextBackOffMs());
EXPECT_EQ(20, exponential_back_off.nextBackOffMs());
EXPECT_EQ(40, exponential_back_off.nextBackOffMs());
EXPECT_EQ(80, exponential_back_off.nextBackOffMs());
}
TEST(BackOffStrategyTest, JitteredBackOffBasicFlow) {
NiceMock<Runtime::MockRandomGenerator> random;
ON_CALL(random, random()).WillByDefault(Return(27));

TEST(BackOffStrategyTest, ExponentialBackOffFractionalMultiplier) {
ExponentialBackOffStrategy exponential_back_off(10, 50, 1.5);
EXPECT_EQ(10, exponential_back_off.nextBackOffMs());
EXPECT_EQ(15, exponential_back_off.nextBackOffMs());
EXPECT_EQ(23, exponential_back_off.nextBackOffMs());
EXPECT_EQ(35, exponential_back_off.nextBackOffMs());
EXPECT_EQ(50, exponential_back_off.nextBackOffMs());
EXPECT_EQ(50, exponential_back_off.nextBackOffMs());
JitteredBackOffStrategy jittered_back_off(25, 30, random);
EXPECT_EQ(2, jittered_back_off.nextBackOffMs());
EXPECT_EQ(27, jittered_back_off.nextBackOffMs());
}

TEST(BackOffStrategyTest, ExponentialBackOffMaxIntervalReached) {
ExponentialBackOffStrategy exponential_back_off(10, 100, 2);
EXPECT_EQ(10, exponential_back_off.nextBackOffMs());
EXPECT_EQ(20, exponential_back_off.nextBackOffMs());
EXPECT_EQ(40, exponential_back_off.nextBackOffMs());
EXPECT_EQ(80, exponential_back_off.nextBackOffMs());
EXPECT_EQ(100, exponential_back_off.nextBackOffMs()); // Should return Max here
EXPECT_EQ(100, exponential_back_off.nextBackOffMs()); // Should return Max here
}
TEST(BackOffStrategyTest, JitteredBackOffBasicReset) {
NiceMock<Runtime::MockRandomGenerator> random;
ON_CALL(random, random()).WillByDefault(Return(27));

TEST(BackOffStrategyTest, ExponentialBackOfReset) {
ExponentialBackOffStrategy exponential_back_off(10, 100, 2);
EXPECT_EQ(10, exponential_back_off.nextBackOffMs());
EXPECT_EQ(20, exponential_back_off.nextBackOffMs());
JitteredBackOffStrategy jittered_back_off(25, 30, random);
EXPECT_EQ(2, jittered_back_off.nextBackOffMs());
EXPECT_EQ(27, jittered_back_off.nextBackOffMs());

exponential_back_off.reset();
EXPECT_EQ(10, exponential_back_off.nextBackOffMs()); // Should start from start
jittered_back_off.reset();
EXPECT_EQ(2, jittered_back_off.nextBackOffMs()); // Should start from start
}

TEST(BackOffStrategyTest, ExponentialBackOfResetAfterMaxReached) {
ExponentialBackOffStrategy exponential_back_off(10, 100, 2);
EXPECT_EQ(10, exponential_back_off.nextBackOffMs());
EXPECT_EQ(20, exponential_back_off.nextBackOffMs());
EXPECT_EQ(40, exponential_back_off.nextBackOffMs());
EXPECT_EQ(80, exponential_back_off.nextBackOffMs());
EXPECT_EQ(100, exponential_back_off.nextBackOffMs()); // Should return Max here
TEST(BackOffStrategyTest, JitteredBackOffWithMaxInterval) {
NiceMock<Runtime::MockRandomGenerator> random;
ON_CALL(random, random()).WillByDefault(Return(1024));

JitteredBackOffStrategy jittered_back_off(5, 100, random);
EXPECT_EQ(4, jittered_back_off.nextBackOffMs());
EXPECT_EQ(4, jittered_back_off.nextBackOffMs());
EXPECT_EQ(9, jittered_back_off.nextBackOffMs());
EXPECT_EQ(49, jittered_back_off.nextBackOffMs());
EXPECT_EQ(94, jittered_back_off.nextBackOffMs());
EXPECT_EQ(94, jittered_back_off.nextBackOffMs()); // Should return Max here
}

exponential_back_off.reset();
TEST(BackOffStrategyTest, JitteredBackOffWithMaxIntervalReset) {
NiceMock<Runtime::MockRandomGenerator> random;
ON_CALL(random, random()).WillByDefault(Return(1024));

EXPECT_EQ(10, exponential_back_off.nextBackOffMs()); // Should start from start
}
JitteredBackOffStrategy jittered_back_off(5, 100, random);
EXPECT_EQ(4, jittered_back_off.nextBackOffMs());
EXPECT_EQ(4, jittered_back_off.nextBackOffMs());
EXPECT_EQ(9, jittered_back_off.nextBackOffMs());
EXPECT_EQ(49, jittered_back_off.nextBackOffMs());

jittered_back_off.reset();
EXPECT_EQ(4, jittered_back_off.nextBackOffMs()); // Should start from start
}
} // namespace Envoy
1 change: 1 addition & 0 deletions test/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ envoy_cc_test(
"//test/mocks/config:config_mocks",
"//test/mocks/event:event_mocks",
"//test/mocks/grpc:grpc_mocks",
"//test/mocks/runtime:runtime_mocks",
"//test/test_common:logging_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/api/v2:discovery_cc",
Expand Down
5 changes: 4 additions & 1 deletion test/common/config/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "test/mocks/config/mocks.h"
#include "test/mocks/event/mocks.h"
#include "test/mocks/grpc/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/test_common/logging.h"
#include "test/test_common/utility.h"

Expand Down Expand Up @@ -44,7 +45,7 @@ class GrpcMuxImplTest : public testing::Test {
dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
time_source_));
random_, time_source_));
}

void expectSendMessage(const std::string& type_url,
Expand Down Expand Up @@ -72,6 +73,7 @@ class GrpcMuxImplTest : public testing::Test {

envoy::api::v2::core::Node node_;
NiceMock<Event::MockDispatcher> dispatcher_;
Runtime::MockRandomGenerator random_;
Grpc::MockAsyncClient* async_client_;
Event::MockTimer* timer_;
Event::TimerCb timer_cb_;
Expand Down Expand Up @@ -112,6 +114,7 @@ TEST_F(GrpcMuxImplTest, ResetStream) {
expectSendMessage("baz", {"z"}, "");
grpc_mux_->start();

EXPECT_CALL(random_, random());
EXPECT_CALL(*timer_, enableTimer(_));
grpc_mux_->onRemoteClose(Grpc::Status::GrpcStatus::Canceled, "");
EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_));
Expand Down
Loading

0 comments on commit b1f870a

Please sign in to comment.