Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adaptive concurrency: Gradient algorithm implementation #7908

Merged
merged 33 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,50 @@ option java_outer_classname = "AdaptiveConcurrencyProto";
option java_multiple_files = true;
option go_package = "v2alpha";

import "google/protobuf/duration.proto";
import "google/api/annotations.proto";
import "google/protobuf/wrappers.proto";

import "validate/validate.proto";

// Configuration parameters for the gradient controller.
message GradientControllerConfig {
// The percentile to use when summarizing aggregated samples. Defaults to p50.
google.protobuf.DoubleValue sample_aggregate_percentile = 1
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
[(validate.rules).double = {gte: 0.0, lte: 1.0}];

// Parameters controlling the periodic recalculation of the concurrency limit from sampled request
// latencies.
message ConcurrencyLimitCalculationParams {
// The maximum value the gradient is allowed to take. This influences how aggressively the
// concurrency limit can increase. Defaults to 2.0.
google.protobuf.DoubleValue max_gradient = 1 [(validate.rules).double.gt = 1.0];

// The allowed upper-bound on the calculated concurrency limit. Defaults to 1000.
google.protobuf.UInt64Value max_concurrency_limit = 2;
tonya11en marked this conversation as resolved.
Show resolved Hide resolved

// The period of time samples are taken to recalculate the concurrency limit.
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
google.protobuf.Duration concurrency_update_interval = 3
[(validate.rules).duration.required = true];
}
ConcurrencyLimitCalculationParams concurrency_limit_params = 2
[(validate.rules).message.required = true];

// Parameters controlling the periodic minRTT recalculation.
message MinimumRTTCalculationParams {
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
// The time interval between recalculating the minimum request round-trip time.
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
google.protobuf.Duration interval = 1 [(validate.rules).duration.required = true];

// The number of requests to aggregate/sample during the minRTT recalculation window before
// updating. Defaults to 50.
google.protobuf.UInt64Value request_count = 2;
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
};
MinimumRTTCalculationParams min_rtt_calc_params = 3 [(validate.rules).message.required = true];
}

message AdaptiveConcurrency {
oneof concurrency_controller_config {
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
// Gradient concurrency control will be used.
GradientControllerConfig gradient_controller_config = 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,20 @@ envoy_package()

envoy_cc_library(
name = "concurrency_controller_lib",
srcs = [],
srcs = ["gradient_controller.cc"],
hdrs = [
"concurrency_controller.h",
"gradient_controller.h",
],
external_deps = [
"libcircllhist",
],
deps = [
"//source/common/event:dispatcher_lib",
"//source/common/protobuf",
"//source/common/runtime:runtime_lib",
"//source/common/stats:isolated_store_lib",
"//source/common/stats:stats_lib",
"@envoy_api//envoy/config/filter/http/adaptive_concurrency/v2alpha:adaptive_concurrency_cc",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class ConcurrencyController {
* @param rq_latency is the clocked round-trip time for the request.
*/
virtual void recordLatencySample(const std::chrono::nanoseconds& rq_latency) PURE;

/**
* Returns the current concurrency limit.
*/
virtual int concurrencyLimit() const PURE;
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace ConcurrencyController
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller.h"

#include <atomic>
#include <chrono>

#include "envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.pb.h"
#include "envoy/event/dispatcher.h"
#include "envoy/runtime/runtime.h"
#include "envoy/stats/stats.h"

#include "common/common/cleanup.h"
#include "common/protobuf/protobuf.h"
#include "common/protobuf/utility.h"

#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h"

#include "absl/synchronization/mutex.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace AdaptiveConcurrency {
namespace ConcurrencyController {

GradientControllerConfig::GradientControllerConfig(
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
const envoy::config::filter::http::adaptive_concurrency::v2alpha::GradientControllerConfig&
proto_config)
: min_rtt_calc_interval_(std::chrono::milliseconds(
DurationUtil::durationToMilliseconds(proto_config.min_rtt_calc_params().interval()))),
sample_rtt_calc_interval_(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
proto_config.concurrency_limit_params().concurrency_update_interval()))),
max_concurrency_limit_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
proto_config.concurrency_limit_params(), max_concurrency_limit, 1000)),
min_rtt_aggregate_request_count_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.min_rtt_calc_params(), request_count, 50)),
max_gradient_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.concurrency_limit_params(),
max_gradient, 2.0)),
sample_aggregate_percentile_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config, sample_aggregate_percentile, 0.5)) {}

GradientController::GradientController(GradientControllerConfigSharedPtr config,
Event::Dispatcher& dispatcher, Runtime::Loader&,
const std::string& stats_prefix, Stats::Scope& scope)
: config_(std::move(config)), dispatcher_(dispatcher), scope_(scope),
stats_(generateStats(scope_, stats_prefix)), recalculating_min_rtt_(true),
num_rq_outstanding_(0), concurrency_limit_(1), latency_sample_hist_(hist_fast_alloc()) {
min_rtt_calc_timer_ = dispatcher_.createTimer([this]() -> void {
absl::MutexLock ml(&update_window_mtx_);
setMinRTTSamplingWindow();
});

sample_reset_timer_ = dispatcher_.createTimer([this]() -> void {
absl::MutexLock ml(&update_window_mtx_);
resetSampleWindow();
});

// Start the sample reset timer upon leaving scope.
auto defer_sample_reset_timer =
Cleanup([this]() { sample_reset_timer_->enableTimer(config_->sample_rtt_calc_interval()); });
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
stats_.concurrency_limit_.set(concurrency_limit_.load());
}

GradientController::~GradientController() {
sample_reset_timer_->disableTimer();
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
min_rtt_calc_timer_->disableTimer();
hist_free(latency_sample_hist_);
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
}

GradientControllerStats GradientController::generateStats(Stats::Scope& scope,
const std::string& stats_prefix) {
return {ALL_GRADIENT_CONTROLLER_STATS(POOL_COUNTER_PREFIX(scope, stats_prefix),
POOL_GAUGE_PREFIX(scope, stats_prefix))};
}

void GradientController::setMinRTTSamplingWindow() {
// Set the minRTT flag to indicate we're gathering samples to update the value. This will
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
// prevent the sample window from resetting until enough requests are gathered to complete the
// recalculation.
concurrency_limit_.store(1);
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
stats_.concurrency_limit_.set(concurrency_limit_.load());
recalculating_min_rtt_.store(true);

// Throw away any latency samples from before the recalculation window as it may not represent
// the minRTT.
absl::MutexLock ml(&latency_sample_mtx_);
hist_clear(latency_sample_hist_);
}

void GradientController::updateMinRTT() {
ASSERT(recalculating_min_rtt_.load());

// Reset the timer to ensure the next minRTT sampling window upon leaving scope.
auto defer =
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
Cleanup([this]() { min_rtt_calc_timer_->enableTimer(config_->min_rtt_calc_interval()); });
tonya11en marked this conversation as resolved.
Show resolved Hide resolved

absl::MutexLock ml(&latency_sample_mtx_);
min_rtt_ = processLatencySamplesAndClear();
stats_.min_rtt_msecs_.set(
std::chrono::duration_cast<std::chrono::milliseconds>(min_rtt_).count());
recalculating_min_rtt_.store(false);
}

void GradientController::resetSampleWindow() {
// Reset the timer upon leaving scope.
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
auto defer =
Cleanup([this]() { sample_reset_timer_->enableTimer(config_->sample_rtt_calc_interval()); });

// The sampling window must not be reset while sampling for the new minRTT value.
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
if (recalculating_min_rtt_.load()) {
return;
}

absl::MutexLock ml(&latency_sample_mtx_);
if (hist_sample_count(latency_sample_hist_) == 0) {
return;
}

sample_rtt_ = processLatencySamplesAndClear();
concurrency_limit_.store(calculateNewLimit());
stats_.concurrency_limit_.set(concurrency_limit_.load());
}

std::chrono::microseconds GradientController::processLatencySamplesAndClear() {
const std::array<double, 1> quantile{config_->sample_aggregate_percentile()};
std::array<double, 1> ans;
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
hist_approx_quantile(latency_sample_hist_, quantile.data(), 1, ans.data());
hist_clear(latency_sample_hist_);
return std::chrono::microseconds(static_cast<int>(ans[0]));
}

int GradientController::calculateNewLimit() {
const double gradient =
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
std::min(config_->max_gradient(), double(min_rtt_.count()) / sample_rtt_.count());
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
stats_.gradient_.set(gradient);
const double limit = concurrency_limit_.load() * gradient;
const double burst_headroom = sqrt(limit);
stats_.burst_queue_size_.set(burst_headroom);

const auto clamp = [](int min, int max, int val) { return std::max(min, std::min(max, val)); };
return clamp(1, config_->max_concurrency_limit(), static_cast<int>(limit + burst_headroom));
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
}

RequestForwardingAction GradientController::forwardingDecision() {
while (true) {
int curr_outstanding = num_rq_outstanding_.load();
if (curr_outstanding < concurrency_limit_.load()) {
// Using the weak compare/exchange for improved performance at the cost of spurious failures.
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
// This is not a problem since there are no side effects when retrying. A legitimate failure
// implies that another thread swooped in and modified num_rq_outstanding_ between the
// comparison and attempt at the increment.
if (!num_rq_outstanding_.compare_exchange_weak(curr_outstanding, curr_outstanding + 1)) {
continue;
}

stats_.rq_outstanding_.inc();
return RequestForwardingAction::Forward;
}

// Concurrency limit is reached.
break;
}

return RequestForwardingAction::Block;
}

void GradientController::recordLatencySample(const std::chrono::nanoseconds& rq_latency) {
const uint32_t latency_usec =
std::chrono::duration_cast<std::chrono::microseconds>(rq_latency).count();
--num_rq_outstanding_;
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
stats_.rq_outstanding_.dec();

int sample_count;
{
absl::MutexLock ml(&latency_sample_mtx_);
hist_insert(latency_sample_hist_, latency_usec, 1);
sample_count = hist_sample_count(latency_sample_hist_);
}

if (recalculating_min_rtt_.load() && sample_count >= config_->min_rtt_aggregate_request_count()) {
// This sample has pushed the request count over the request count requirement for the minRTT
// recalculation. It must now be finished.
updateMinRTT();
}
}

} // namespace ConcurrencyController
} // namespace AdaptiveConcurrency
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Loading