Skip to content

Commit

Permalink
upstream: desynchronize WRR picks. (#3271)
Browse files Browse the repository at this point in the history
This PR fixes two issues:

Avoid having synchronized behavior across the fleet by having a seed
applied to each WRR balancer instance.
Avoid having unweighted RR reset to same host on each refresh, biasing towards
earlier hosts in the schedule. This TODO still exists for weighted.
Risk Level: Medium (there will be observable traffic effects due to
changes in host pick schedule).
Testing: New unit tests, modified existing.


Signed-off-by: Harvey Tuch <htuch@google.com>
  • Loading branch information
htuch authored May 3, 2018
1 parent ec7d1af commit 872728d
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 49 deletions.
45 changes: 30 additions & 15 deletions source/common/upstream/load_balancer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,21 +412,16 @@ RoundRobinLoadBalancer::RoundRobinLoadBalancer(
Runtime::Loader& runtime, Runtime::RandomGenerator& random,
const envoy::api::v2::Cluster::CommonLbConfig& common_config)
: ZoneAwareLoadBalancerBase(priority_set, local_priority_set, stats, runtime, random,
common_config) {
common_config),
seed_(random_.random()) {
for (uint32_t priority = 0; priority < priority_set.hostSetsPerPriority().size(); ++priority) {
refresh(priority);
}
// We fully recompute the schedulers for a given host set here on membership change, which is
// consistent with what other LB implementations do (e.g. thread aware).
// TODO(htuch): By fully recomputing the host set schedulers on each membership update, we lose RR
// history, so on an EDS update, for example, we reset the schedule. This is likely a reasonable
// approximation when there are many more LB picks than host set updates. Otherwise, we will bias
// towards heavily weighted hosts further than weighted RR should allow. We could be a bit finer
// grained and only modify the schedulers for hosts that have changed health status, locality,
// been added/removed, etc. but this is quite a bit more complicated and will require some
// additional addMemberUpdateCb parameters to track efficiently. The other downside of a full
// recompute is that time complexity is O(n * log n), so we will need to do better at delta
// tracking to scale (see https://github.com/envoyproxy/envoy/issues/2874).
// The downside of a full recompute is that time complexity is O(n * log n),
// so we will need to do better at delta tracking to scale (see
// https://github.com/envoyproxy/envoy/issues/2874).
priority_set.addMemberUpdateCb(
[this](uint32_t priority, const HostVector&, const HostVector&) { refresh(priority); });
}
Expand All @@ -442,20 +437,40 @@ void RoundRobinLoadBalancer::refresh(uint32_t priority) {
}
}

// Compute schedule offset for unweighted before deleting the existing
// scheduler.
uint64_t unweighted_offset = 0;
if (hosts.size() > 0) {
// If we already have been balancing for this locality, continue where we
// left off; a rebuild with the same hosts will have the expected RR
// across the rebuild. Otherwise, start with the LB seed.
if (scheduler_.find(source) != scheduler_.end()) {
unweighted_offset = scheduler_[source].rr_index_;
} else {
unweighted_offset = seed_ % hosts.size();
}
}
// Nuke existing scheduler if it exists.
scheduler_[source] = Scheduler{};
scheduler_[source].weighted_ = weighted;
auto& scheduler = scheduler_[source] = Scheduler{};
scheduler.weighted_ = weighted;
if (weighted) {
// Populate scheduler with host list.
// TODO(htuch): We should add the ability to randomly offset into the host list to
// desynchronize the schedule across Envoys in large fleets.
for (const auto& host : hosts) {
// We use a fixed weight here. While the weight may change without
// notification, this will only be stale until this host is next picked,
// at which point it is reinserted into the EdfScheduler with its new
// weight in chooseHost().
scheduler_[source].edf_.add(host->weight(), host);
scheduler.edf_.add(host->weight(), host);
}
// Cycle through hosts to achieve the intended offset behavior.
// TODO(htuch): Consider how we can avoid biasing towards earlier hosts in the
// schedule across refreshes for the weighted case.
for (uint32_t i = 0; i < seed_ % hosts.size(); ++i) {
auto host = scheduler.edf_.pick();
scheduler.edf_.add(host->weight(), host);
}
} else {
scheduler.rr_index_ = unweighted_offset;
}
};
// Populate EdfSchedulers for each valid HostsSource value for the host set
Expand Down
10 changes: 9 additions & 1 deletion source/common/upstream/load_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ class ZoneAwareLoadBalancerBase : public LoadBalancerBase {
* simple index to acheive O(1) scheduling in that case.
* TODO(htuch): We use EDF at Google, but the EDF scheduler may be overkill if we don't want to
* support large ranges of weights or arbitrary precision floating weights, we could construct an
* explicit schedule, since m will be a small constant factor in O(m * n).
* explicit schedule, since m will be a small constant factor in O(m * n). This
* could also be done on a thread aware LB, avoiding creating multiple EDF
* instances.
*/
class RoundRobinLoadBalancer : public LoadBalancer, ZoneAwareLoadBalancerBase {
public:
Expand All @@ -238,6 +240,12 @@ class RoundRobinLoadBalancer : public LoadBalancer, ZoneAwareLoadBalancerBase {

// Scheduler for each valid HostsSource.
std::unordered_map<HostsSource, Scheduler, HostsSourceHash> scheduler_;
// Seed to allow us to desynchronize WRR balancers across a fleet. If we don't
// do this, multiple Envoys that receive an update at the same time (or even
// multiple RoundRobinLoadBalancers on the same host) will send requests to
// backends in roughly lock step, causing significant imbalance and potential
// overload.
const uint64_t seed_;
};

/**
Expand Down
6 changes: 3 additions & 3 deletions source/exe/main_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ MainCommonBase::MainCommonBase(OptionsImpl& options) : options_(options) {
Logger::Registry::initialize(options_.logLevel(), options_.logFormat(), log_lock);

stats_store_.reset(new Stats::ThreadLocalStoreImpl(restarter_->statsAllocator()));
server_.reset(new Server::InstanceImpl(options_, local_address, default_test_hooks_,
*restarter_, *stats_store_, access_log_lock,
component_factory_, *tls_));
server_.reset(new Server::InstanceImpl(
options_, local_address, default_test_hooks_, *restarter_, *stats_store_, access_log_lock,
component_factory_, std::make_unique<Runtime::RandomGeneratorImpl>(), *tls_));
break;
}
case Server::Mode::Validate:
Expand Down
7 changes: 5 additions & 2 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ namespace Server {
InstanceImpl::InstanceImpl(Options& options, Network::Address::InstanceConstSharedPtr local_address,
TestHooks& hooks, HotRestart& restarter, Stats::StoreRoot& store,
Thread::BasicLockable& access_log_lock,
ComponentFactory& component_factory, ThreadLocal::Instance& tls)
ComponentFactory& component_factory,
Runtime::RandomGeneratorPtr&& random_generator,
ThreadLocal::Instance& tls)
: options_(options), restarter_(restarter), start_time_(time(nullptr)),
original_start_time_(start_time_), stats_store_(store), thread_local_(tls),
api_(new Api::Impl(options.fileFlushIntervalMsec())), dispatcher_(api_->allocateDispatcher()),
singleton_manager_(new Singleton::ManagerImpl()),
handler_(new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher_)),
listener_component_factory_(*this), worker_factory_(thread_local_, *api_, hooks),
random_generator_(std::move(random_generator)), listener_component_factory_(*this),
worker_factory_(thread_local_, *api_, hooks),
dns_resolver_(dispatcher_->createDnsResolver({})),
access_log_manager_(*api_, *dispatcher_, access_log_lock, store), terminated_(false) {

Expand Down
6 changes: 3 additions & 3 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
InstanceImpl(Options& options, Network::Address::InstanceConstSharedPtr local_address,
TestHooks& hooks, HotRestart& restarter, Stats::StoreRoot& store,
Thread::BasicLockable& access_log_lock, ComponentFactory& component_factory,
ThreadLocal::Instance& tls);
Runtime::RandomGeneratorPtr&& random_generator, ThreadLocal::Instance& tls);

~InstanceImpl() override;

Expand All @@ -151,7 +151,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
HotRestart& hotRestart() override { return restarter_; }
Init::Manager& initManager() override { return init_manager_; }
ListenerManager& listenerManager() override { return *listener_manager_; }
Runtime::RandomGenerator& random() override { return random_generator_; }
Runtime::RandomGenerator& random() override { return *random_generator_; }
RateLimit::ClientPtr
rateLimitClient(const absl::optional<std::chrono::milliseconds>& timeout) override {
return config_->rateLimitClientFactory().create(timeout);
Expand Down Expand Up @@ -190,7 +190,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
std::unique_ptr<AdminImpl> admin_;
Singleton::ManagerPtr singleton_manager_;
Network::ConnectionHandlerPtr handler_;
Runtime::RandomGeneratorImpl random_generator_;
Runtime::RandomGeneratorPtr random_generator_;
Runtime::LoaderPtr runtime_loader_;
std::unique_ptr<Ssl::ContextManagerImpl> ssl_context_manager_;
ProdListenerComponentFactory listener_component_factory_;
Expand Down
32 changes: 32 additions & 0 deletions test/common/upstream/load_balancer_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,22 @@ TEST_P(RoundRobinLoadBalancerTest, Normal) {
EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr));
}

// Validate that the RNG seed influences pick order.
TEST_P(RoundRobinLoadBalancerTest, Seed) {
hostSet().healthy_hosts_ = {
makeTestHost(info_, "tcp://127.0.0.1:80"),
makeTestHost(info_, "tcp://127.0.0.1:81"),
makeTestHost(info_, "tcp://127.0.0.1:82"),
};
hostSet().hosts_ = hostSet().healthy_hosts_;
EXPECT_CALL(random_, random()).WillRepeatedly(Return(1));
init(false);
EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr));
EXPECT_EQ(hostSet().healthy_hosts_[2], lb_->chooseHost(nullptr));
EXPECT_EQ(hostSet().healthy_hosts_[0], lb_->chooseHost(nullptr));
EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr));
}

TEST_P(RoundRobinLoadBalancerTest, Locality) {
HostVectorSharedPtr hosts(new HostVector({makeTestHost(info_, "tcp://127.0.0.1:80"),
makeTestHost(info_, "tcp://127.0.0.1:81"),
Expand Down Expand Up @@ -424,6 +440,22 @@ TEST_P(RoundRobinLoadBalancerTest, Weighted) {
EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr));
}

// Validate that the RNG seed influences pick order when weighted RR.
TEST_P(RoundRobinLoadBalancerTest, WeightedSeed) {
hostSet().healthy_hosts_ = {makeTestHost(info_, "tcp://127.0.0.1:80", 1),
makeTestHost(info_, "tcp://127.0.0.1:81", 2)};
hostSet().hosts_ = hostSet().healthy_hosts_;
EXPECT_CALL(random_, random()).WillRepeatedly(Return(1));
init(false);
// Initial weights respected.
EXPECT_EQ(hostSet().healthy_hosts_[0], lb_->chooseHost(nullptr));
EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr));
EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr));
EXPECT_EQ(hostSet().healthy_hosts_[0], lb_->chooseHost(nullptr));
EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr));
EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr));
}

TEST_P(RoundRobinLoadBalancerTest, MaxUnhealthyPanic) {
hostSet().healthy_hosts_ = {makeTestHost(info_, "tcp://127.0.0.1:80"),
makeTestHost(info_, "tcp://127.0.0.1:81")};
Expand Down
8 changes: 4 additions & 4 deletions test/common/upstream/subset_lb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,8 @@ TEST_P(SubsetLoadBalancerTest, FallbackAnyEndpointAfterUpdate) {
HostSharedPtr added_host = makeHost("tcp://127.0.0.1:8000", {{"version", "1.0"}});
modifyHosts({added_host}, {host_set_.hosts_.back()});

EXPECT_EQ(host_set_.hosts_[0], lb_->chooseHost(nullptr));
EXPECT_EQ(added_host, lb_->chooseHost(nullptr));
EXPECT_EQ(host_set_.hosts_[0], lb_->chooseHost(nullptr));
}

TEST_F(SubsetLoadBalancerTest, FallbackDefaultSubset) {
Expand Down Expand Up @@ -898,7 +898,7 @@ TEST_P(SubsetLoadBalancerTest, ZoneAwareFallbackAfterUpdate) {

// Force request out of small zone.
EXPECT_CALL(random_, random()).WillOnce(Return(0)).WillOnce(Return(9999)).WillOnce(Return(2));
EXPECT_EQ(host_set_.healthy_hosts_per_locality_->get()[1][0], lb_->chooseHost(nullptr));
EXPECT_EQ(host_set_.healthy_hosts_per_locality_->get()[1][1], lb_->chooseHost(nullptr));
}

TEST_F(SubsetLoadBalancerTest, ZoneAwareFallbackDefaultSubset) {
Expand Down Expand Up @@ -1019,7 +1019,7 @@ TEST_P(SubsetLoadBalancerTest, ZoneAwareFallbackDefaultSubsetAfterUpdate) {

// Force request out of small zone.
EXPECT_CALL(random_, random()).WillOnce(Return(0)).WillOnce(Return(9999)).WillOnce(Return(2));
EXPECT_EQ(host_set_.healthy_hosts_per_locality_->get()[1][1], lb_->chooseHost(nullptr));
EXPECT_EQ(host_set_.healthy_hosts_per_locality_->get()[1][3], lb_->chooseHost(nullptr));
}

TEST_F(SubsetLoadBalancerTest, ZoneAwareBalancesSubsets) {
Expand Down Expand Up @@ -1138,7 +1138,7 @@ TEST_P(SubsetLoadBalancerTest, ZoneAwareBalancesSubsetsAfterUpdate) {

// Force request out of small zone.
EXPECT_CALL(random_, random()).WillOnce(Return(0)).WillOnce(Return(9999)).WillOnce(Return(2));
EXPECT_EQ(host_set_.healthy_hosts_per_locality_->get()[1][1], lb_->chooseHost(&context));
EXPECT_EQ(host_set_.healthy_hosts_per_locality_->get()[1][3], lb_->chooseHost(&context));
}

TEST_F(SubsetLoadBalancerTest, DescribeMetadata) {
Expand Down
7 changes: 4 additions & 3 deletions test/integration/integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ void BaseIntegrationTest::registerTestServerPorts(const std::vector<std::string>

void BaseIntegrationTest::createGeneratedApiTestServer(const std::string& bootstrap_path,
const std::vector<std::string>& port_names) {
test_server_ =
IntegrationTestServer::create(bootstrap_path, version_, pre_worker_start_test_steps_);
test_server_ = IntegrationTestServer::create(bootstrap_path, version_,
pre_worker_start_test_steps_, deterministic_);
if (config_helper_.bootstrap().static_resources().listeners_size() > 0) {
// Wait for listeners to be created before invoking registerTestServerPorts() below, as that
// needs to know about the bound listener ports.
Expand Down Expand Up @@ -356,7 +356,8 @@ void BaseIntegrationTest::createApiTestServer(const ApiFilesystemConfig& api_fil
void BaseIntegrationTest::createTestServer(const std::string& json_path,
const std::vector<std::string>& port_names) {
test_server_ = IntegrationTestServer::create(
TestEnvironment::temporaryFileSubstitute(json_path, port_map_, version_), version_, nullptr);
TestEnvironment::temporaryFileSubstitute(json_path, port_map_, version_), version_, nullptr,
deterministic_);
registerTestServerPorts(port_names);
}

Expand Down
4 changes: 4 additions & 0 deletions test/integration/integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ class BaseIntegrationTest : Logger::Loggable<Logger::Id::testing> {
void setUpstreamProtocol(FakeHttpConnection::Type protocol);
// Sets fake_upstreams_count_ and alters the upstream protocol in the config_helper_
void setUpstreamCount(uint32_t count) { fake_upstreams_count_ = count; }
// Make test more deterministic by using a fixed RNG value.
void setDeterministic() { deterministic_ = true; }

FakeHttpConnection::Type upstreamProtocol() const { return upstream_protocol_; }

Expand Down Expand Up @@ -205,6 +207,8 @@ class BaseIntegrationTest : Logger::Loggable<Logger::Id::testing> {
FakeHttpConnection::Type upstream_protocol_{FakeHttpConnection::Type::HTTP1};
// True if initialized() has been called.
bool initialized_{};
// True if test will use a fixed RNG value.
bool deterministic_{};
};

} // namespace Envoy
2 changes: 1 addition & 1 deletion test/integration/legacy_json_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ TEST_P(LegacyJsonIntegrationTest, TestServerXfc) {
param_map["set_current_client_cert_details"] = "";
std::string config = TestEnvironment::temporaryFileSubstitute(
"test/config/integration/server_xfcc.json", param_map, port_map_, version_);
IntegrationTestServer::create(config, version_, nullptr);
IntegrationTestServer::create(config, version_, nullptr, false);
}

TEST_P(LegacyJsonIntegrationTest, TestEchoServer) {
Expand Down
8 changes: 6 additions & 2 deletions test/integration/load_stats_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ namespace {
class LoadStatsIntegrationTest : public HttpIntegrationTest,
public testing::TestWithParam<Network::Address::IpVersion> {
public:
LoadStatsIntegrationTest() : HttpIntegrationTest(Http::CodecClient::Type::HTTP1, GetParam()) {}
LoadStatsIntegrationTest() : HttpIntegrationTest(Http::CodecClient::Type::HTTP1, GetParam()) {
// We rely on some fairly specific load balancing picks in this test, so
// determinizie the schedule.
setDeterministic();
}

void addEndpoint(envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoints,
uint32_t index, uint32_t& num_endpoints) {
Expand Down Expand Up @@ -346,7 +350,7 @@ TEST_P(LoadStatsIntegrationTest, Success) {
requestLoadStatsResponse({"cluster_0"});

for (uint32_t i = 0; i < 6; ++i) {
sendAndReceiveUpstream(i % 3);
sendAndReceiveUpstream((4 + i) % 3);
}

// No locality for priority=1 since there's no "winter" endpoints.
Expand Down
28 changes: 19 additions & 9 deletions test/integration/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,28 @@

#include "test/integration/integration.h"
#include "test/integration/utility.h"
#include "test/mocks/runtime/mocks.h"
#include "test/test_common/environment.h"

#include "gtest/gtest.h"

namespace Envoy {

IntegrationTestServerPtr
IntegrationTestServer::create(const std::string& config_path,
const Network::Address::IpVersion version,
std::function<void()> pre_worker_start_test_steps) {
IntegrationTestServerPtr IntegrationTestServer::create(
const std::string& config_path, const Network::Address::IpVersion version,
std::function<void()> pre_worker_start_test_steps, bool deterministic) {
IntegrationTestServerPtr server{new IntegrationTestServer(config_path)};
server->start(version, pre_worker_start_test_steps);
server->start(version, pre_worker_start_test_steps, deterministic);
return server;
}

void IntegrationTestServer::start(const Network::Address::IpVersion version,
std::function<void()> pre_worker_start_test_steps) {
std::function<void()> pre_worker_start_test_steps,
bool deterministic) {
ENVOY_LOG(info, "starting integration test server");
ASSERT(!thread_);
thread_.reset(new Thread::Thread([version, this]() -> void { threadRoutine(version); }));
thread_.reset(new Thread::Thread(
[version, deterministic, this]() -> void { threadRoutine(version, deterministic); }));

// If any steps need to be done prior to workers starting, do them now. E.g., xDS pre-init.
if (pre_worker_start_test_steps != nullptr) {
Expand Down Expand Up @@ -82,7 +84,8 @@ void IntegrationTestServer::onWorkerListenerRemoved() {
}
}

void IntegrationTestServer::threadRoutine(const Network::Address::IpVersion version) {
void IntegrationTestServer::threadRoutine(const Network::Address::IpVersion version,
bool deterministic) {
Server::TestOptionsImpl options(config_path_, version);
Server::HotRestartNopImpl restarter;
Thread::MutexBasicLockable lock;
Expand All @@ -91,8 +94,15 @@ void IntegrationTestServer::threadRoutine(const Network::Address::IpVersion vers
Stats::HeapRawStatDataAllocator stats_allocator;
Stats::ThreadLocalStoreImpl stats_store(stats_allocator);
stat_store_ = &stats_store;
Runtime::RandomGeneratorPtr random_generator;
if (deterministic) {
random_generator = std::make_unique<testing::NiceMock<Runtime::MockRandomGenerator>>();
} else {
random_generator = std::make_unique<Runtime::RandomGeneratorImpl>();
}
server_.reset(new Server::InstanceImpl(options, Network::Utility::getLocalAddress(version), *this,
restarter, stats_store, lock, *this, tls));
restarter, stats_store, lock, *this,
std::move(random_generator), tls));
pending_listeners_ = server_->listenerManager().listeners().size();
ENVOY_LOG(info, "waiting for {} test server listeners", pending_listeners_);
server_set_.setReady();
Expand Down
Loading

0 comments on commit 872728d

Please sign in to comment.