Skip to content

Commit

Permalink
eds: decrease computational complexity of updates (#11442)
Browse files Browse the repository at this point in the history
Makes BaseDynamicClusterImpl::updateDynamicHostList O(n) rather than O(n^2)

Instead of calling .erase() on list iterators as we find them, we swap with the end of the list and erase after iterating over the list. This shows a ~3x improvement in execution time in the included benchmark test.

Risk Level: Medium. No reordering happens to the endpoint list. Not runtime guarded.
Testing: New benchmark, existing unit tests pass (and cover the affected function).
Docs Changes: N/A
Release Notes: N/A

Relates to #2874 #11362

Signed-off-by: Phil Genera <pgenera@google.com>
  • Loading branch information
pgenera authored Jul 9, 2020
1 parent d67246d commit b1e62a3
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 67 deletions.
6 changes: 4 additions & 2 deletions bazel/test_for_benchmark_wrapper.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!/bin/bash

# Set the benchmark time to 0 to just verify that the benchmark runs to completion.
"${TEST_SRCDIR}/envoy/$@" --benchmark_min_time=0
# Set the benchmark time to 0 to just verify that the benchmark runs to
# completion. We're interacting with two different flag parsers, so the order
# of flags and the -- matters.
"${TEST_SRCDIR}/envoy/$@" --skip_expensive_benchmarks -- --benchmark_min_time=0
58 changes: 31 additions & 27 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1332,9 +1332,7 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
bool hosts_changed = false;

// Go through and see if the list we have is different from what we just got. If it is, we make a
// new host list and raise a change notification. This uses an N^2 search given that this does not
// happen very often and the list sizes should be small (see
// https://github.com/envoyproxy/envoy/issues/2874). We also check for duplicates here. It's
// new host list and raise a change notification. We also check for duplicates here. It's
// possible for DNS to return the same address multiple times, and a bad EDS implementation could
// do the same thing.

Expand Down Expand Up @@ -1437,16 +1435,20 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,

// Remove hosts from current_priority_hosts that were matched to an existing host in the previous
// loop.
for (auto itr = current_priority_hosts.begin(); itr != current_priority_hosts.end();) {
auto existing_itr = existing_hosts_for_current_priority.find((*itr)->address()->asString());
auto erase_from =
std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(),
[&existing_hosts_for_current_priority](const HostSharedPtr& p) {
auto existing_itr =
existing_hosts_for_current_priority.find(p->address()->asString());

if (existing_itr != existing_hosts_for_current_priority.end()) {
existing_hosts_for_current_priority.erase(existing_itr);
itr = current_priority_hosts.erase(itr);
} else {
itr++;
}
}
if (existing_itr != existing_hosts_for_current_priority.end()) {
existing_hosts_for_current_priority.erase(existing_itr);
return true;
}

return false;
});
current_priority_hosts.erase(erase_from, current_priority_hosts.end());

// If we saw existing hosts during this iteration from a different priority, then we've moved
// a host from another priority into this one, so we should mark the priority as having changed.
Expand All @@ -1464,21 +1466,23 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
const bool dont_remove_healthy_hosts =
health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval();
if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) {
for (auto i = current_priority_hosts.begin(); i != current_priority_hosts.end();) {
if (!((*i)->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
(*i)->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) {
if ((*i)->weight() > max_host_weight) {
max_host_weight = (*i)->weight();
}

final_hosts.push_back(*i);
updated_hosts[(*i)->address()->asString()] = *i;
(*i)->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
i = current_priority_hosts.erase(i);
} else {
i++;
}
}
erase_from =
std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(),
[&updated_hosts, &final_hosts, &max_host_weight](const HostSharedPtr& p) {
if (!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) {
if (p->weight() > max_host_weight) {
max_host_weight = p->weight();
}

final_hosts.push_back(p);
updated_hosts[p->address()->asString()] = p;
p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
return true;
}
return false;
});
current_priority_hosts.erase(erase_from, current_priority_hosts.end());
}

// At this point we've accounted for all the new hosts as well the hosts that previously
Expand Down
2 changes: 2 additions & 0 deletions test/benchmark/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ envoy_package()
envoy_cc_test_library(
name = "main",
srcs = ["main.cc"],
hdrs = ["main.h"],
external_deps = [
"benchmark",
"tclap",
],
deps = [
"//test/test_common:environment_lib",
Expand Down
32 changes: 28 additions & 4 deletions test/benchmark/main.cc
Original file line number Diff line number Diff line change
@@ -1,16 +1,40 @@
// NOLINT(namespace-envoy)
// This is an Envoy driver for benchmarks.
#include "test/benchmark/main.h"

#include "test/test_common/environment.h"

#include "benchmark/benchmark.h"
#include "tclap/CmdLine.h"

static bool skip_expensive_benchmarks = false;

// Boilerplate main(), which discovers benchmarks and runs them.
// Boilerplate main(), which discovers benchmarks and runs them. This uses two
// different flag parsers, so the order of flags matters: flags defined here
// must be passed first, and flags defined in benchmark::Initialize second,
// separated by --.
// TODO(pgenera): convert this to abseil/flags/ when benchmark also adopts abseil.
int main(int argc, char** argv) {
Envoy::TestEnvironment::initializeTestMain(argv[0]);

benchmark::Initialize(&argc, argv);
if (benchmark::ReportUnrecognizedArguments(argc, argv)) {
return 1;
// NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.VirtualCall)
TCLAP::CmdLine cmd("envoy-benchmark-test", ' ', "0.1");
TCLAP::SwitchArg skip_switch("s", "skip_expensive_benchmarks",
"skip or minimize expensive benchmarks", cmd, false);

cmd.setExceptionHandling(false);
try {
cmd.parse(argc, argv);
} catch (const TCLAP::ExitException& e) {
// parse() throws an ExitException with status 0 after printing the output
// for --help and --version.
return 0;
}

skip_expensive_benchmarks = skip_switch.getValue();

benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
}

bool Envoy::benchmark::skipExpensiveBenchmarks() { return skip_expensive_benchmarks; }
13 changes: 13 additions & 0 deletions test/benchmark/main.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once

/**
* Benchmarks can use this to skip or hurry through long-running tests in CI.
*/

namespace Envoy {
namespace benchmark {

bool skipExpensiveBenchmarks();

}
} // namespace Envoy
111 changes: 77 additions & 34 deletions test/common/upstream/eds_speed_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "server/transport_socket_config_impl.h"

#include "test/benchmark/main.h"
#include "test/common/upstream/utility.h"
#include "test/mocks/local_info/mocks.h"
#include "test/mocks/protobuf/mocks.h"
Expand All @@ -28,12 +29,15 @@

#include "benchmark/benchmark.h"

using ::benchmark::State;
using Envoy::benchmark::skipExpensiveBenchmarks;

namespace Envoy {
namespace Upstream {

class EdsSpeedTest {
public:
EdsSpeedTest(benchmark::State& state, bool v2_config)
EdsSpeedTest(State& state, bool v2_config)
: state_(state), v2_config_(v2_config),
type_url_(v2_config_
? "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
Expand All @@ -44,7 +48,26 @@ class EdsSpeedTest {
local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"),
envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, {}, true)) {}
envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, {}, true)) {
resetCluster(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF",
Envoy::Upstream::Cluster::InitializePhase::Secondary);

EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_));
cluster_->initialize([this] { initialized_ = true; });
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(testing::Return(&async_stream_));
subscription_->start({"fare"});
}

void resetCluster(const std::string& yaml_config, Cluster::InitializePhase initialize_phase) {
local_info_.node_.mutable_locality()->set_zone("us-east-1a");
Expand All @@ -64,30 +87,14 @@ class EdsSpeedTest {
std::chrono::milliseconds(), false);
}

void initialize() {
EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_));
cluster_->initialize([this] { initialized_ = true; });
}

// Set up an EDS config with multiple priorities, localities, weights and make sure
// they are loaded and reloaded as expected.
void priorityAndLocalityWeightedHelper(bool ignore_unknown_dynamic_fields, size_t num_hosts) {
// they are loaded as expected.
void priorityAndLocalityWeightedHelper(bool ignore_unknown_dynamic_fields, size_t num_hosts,
bool healthy) {
state_.PauseTiming();

envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("fare");
resetCluster(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF",
Envoy::Upstream::Cluster::InitializePhase::Secondary);

// Add a whole bunch of hosts in a single place:
auto* endpoints = cluster_load_assignment.add_endpoints();
Expand All @@ -100,18 +107,21 @@ class EdsSpeedTest {

uint32_t port = 1000;
for (size_t i = 0; i < num_hosts; ++i) {
auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
auto* lb_endpoint = endpoints->add_lb_endpoints();
if (healthy) {
lb_endpoint->set_health_status(envoy::config::core::v3::HEALTHY);
} else {
lb_endpoint->set_health_status(envoy::config::core::v3::UNHEALTHY);
}
auto* socket_address =
lb_endpoint->mutable_endpoint()->mutable_address()->mutable_socket_address();
socket_address->set_address("10.0.1." + std::to_string(i / 60000));
socket_address->set_port_value((port + i) % 60000);
}

// this is what we're actually testing:
validation_visitor_.setSkipValidation(ignore_unknown_dynamic_fields);

initialize();
auto response = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
response->set_type_url(type_url_);
auto* resource = response->mutable_resources()->Add();
Expand All @@ -122,16 +132,13 @@ class EdsSpeedTest {
"");
resource->set_type_url("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment");
}
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(testing::Return(&async_stream_));
subscription_->start({"fare"});
state_.ResumeTiming();
grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response));
ASSERT(initialized_);
ASSERT(cluster_->prioritySet().hostSetsPerPriority()[1]->hostsPerLocality().get()[0].size() ==
num_hosts);
}

benchmark::State& state_;
State& state_;
const bool v2_config_;
const std::string type_url_;
bool initialized_{};
Expand Down Expand Up @@ -162,14 +169,50 @@ class EdsSpeedTest {
} // namespace Upstream
} // namespace Envoy

static void priorityAndLocalityWeighted(benchmark::State& state) {
static void priorityAndLocalityWeighted(State& state) {
Envoy::Thread::MutexBasicLockable lock;
Envoy::Logger::Context logging_state(spdlog::level::warn,
Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false);
for (auto _ : state) {
Envoy::Upstream::EdsSpeedTest speed_test(state, state.range(0));
speed_test.priorityAndLocalityWeightedHelper(state.range(1), state.range(2));
// if we've been instructed to skip tests, only run once no matter the argument:
uint32_t endpoints = skipExpensiveBenchmarks() ? 1 : state.range(2);

speed_test.priorityAndLocalityWeightedHelper(state.range(1), endpoints, true);
}
}

BENCHMARK(priorityAndLocalityWeighted)
->Ranges({{false, true}, {false, true}, {1, 100000}})
->Unit(benchmark::kMillisecond);

static void duplicateUpdate(State& state) {
Envoy::Thread::MutexBasicLockable lock;
Envoy::Logger::Context logging_state(spdlog::level::warn,
Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false);

for (auto _ : state) {
Envoy::Upstream::EdsSpeedTest speed_test(state, false);
uint32_t endpoints = skipExpensiveBenchmarks() ? 1 : state.range(0);

speed_test.priorityAndLocalityWeightedHelper(true, endpoints, true);
speed_test.priorityAndLocalityWeightedHelper(true, endpoints, true);
}
}

BENCHMARK(duplicateUpdate)->Range(1, 100000)->Unit(benchmark::kMillisecond);

static void healthOnlyUpdate(State& state) {
Envoy::Thread::MutexBasicLockable lock;
Envoy::Logger::Context logging_state(spdlog::level::warn,
Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false);
for (auto _ : state) {
Envoy::Upstream::EdsSpeedTest speed_test(state, false);
uint32_t endpoints = skipExpensiveBenchmarks() ? 1 : state.range(0);

speed_test.priorityAndLocalityWeightedHelper(true, endpoints, true);
speed_test.priorityAndLocalityWeightedHelper(true, endpoints, false);
}
}

BENCHMARK(priorityAndLocalityWeighted)->Ranges({{false, true}, {false, true}, {2000, 100000}});
BENCHMARK(healthOnlyUpdate)->Range(1, 100000)->Unit(benchmark::kMillisecond);

0 comments on commit b1e62a3

Please sign in to comment.