Skip to content

Commit

Permalink
adaptive concurrency: Gradient algorithm implementation (envoyproxy#7908
Browse files Browse the repository at this point in the history
)

Signed-off-by: Tony Allen <tallen@lyft.com>
  • Loading branch information
Tony Allen authored and danzh1989 committed Sep 24, 2019
1 parent cf4cf7a commit 22b3a0d
Show file tree
Hide file tree
Showing 18 changed files with 1,127 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ load("@envoy_api//bazel:api_build_system.bzl", "api_proto_library_internal", "ap
licenses(["notice"]) # Apache 2

api_proto_package(
deps = ["//envoy/api/v2/core"],
deps = [
"//envoy/api/v3alpha/core",
"//envoy/type",
],
)

api_proto_library_internal(
name = "adaptive_concurrency",
srcs = ["adaptive_concurrency.proto"],
deps = [
"//envoy/api/v2/core:base",
"//envoy/api/v3alpha/core:base",
"//envoy/type:percent",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,59 @@ option java_package = "io.envoyproxy.envoy.config.filter.http.adaptive_concurren
option java_outer_classname = "AdaptiveConcurrencyProto";
option java_multiple_files = true;

import "envoy/type/percent.proto";

import "google/protobuf/duration.proto";
import "google/api/annotations.proto";
import "google/protobuf/wrappers.proto";

import "validate/validate.proto";

// Configuration parameters for the gradient controller.
message GradientControllerConfig {
// The percentile to use when summarizing aggregated samples. Defaults to p50.
envoy.type.Percent sample_aggregate_percentile = 1;

// Parameters controlling the periodic recalculation of the concurrency limit from sampled request
// latencies.
message ConcurrencyLimitCalculationParams {
// The maximum value the gradient is allowed to take. This influences how aggressively the
// concurrency limit can increase. Defaults to 2.0.
google.protobuf.DoubleValue max_gradient = 1 [(validate.rules).double.gt = 1.0];

// The allowed upper-bound on the calculated concurrency limit. Defaults to 1000.
google.protobuf.UInt32Value max_concurrency_limit = 2 [(validate.rules).uint32.gt = 0];

// The period of time samples are taken to recalculate the concurrency limit.
google.protobuf.Duration concurrency_update_interval = 3 [(validate.rules).duration = {
required: true,
gt: {seconds: 0}
}];
}
ConcurrencyLimitCalculationParams concurrency_limit_params = 2
[(validate.rules).message.required = true];

// Parameters controlling the periodic minRTT recalculation.
message MinimumRTTCalculationParams {
// The time interval between recalculating the minimum request round-trip time.
google.protobuf.Duration interval = 1 [(validate.rules).duration = {
required: true,
gt: {seconds: 0}
}];

// The number of requests to aggregate/sample during the minRTT recalculation window before
// updating. Defaults to 50.
google.protobuf.UInt32Value request_count = 2 [(validate.rules).uint32.gt = 0];
};
MinimumRTTCalculationParams min_rtt_calc_params = 3 [(validate.rules).message.required = true];
}

message AdaptiveConcurrency {
oneof concurrency_controller_config {
option (validate.required) = true;

// Gradient concurrency control will be used.
GradientControllerConfig gradient_controller_config = 1
[(validate.rules).message.required = true];
}
}
15 changes: 0 additions & 15 deletions api/envoy/config/filter/http/adaptive_concurrency/v3alpha/BUILD

This file was deleted.

This file was deleted.

10 changes: 9 additions & 1 deletion source/common/common/cleanup.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@ namespace Envoy {
// RAII cleanup via functor.
class Cleanup {
public:
Cleanup(std::function<void()> f) : f_(std::move(f)) {}
Cleanup(std::function<void()> f) : f_(std::move(f)), cancelled_(false) {}
~Cleanup() { f_(); }

void cancel() {
cancelled_ = true;
f_ = []() {};
}

bool cancelled() { return cancelled_; }

private:
std::function<void()> f_;
bool cancelled_;
};

// RAII helper class to add an element to an std::list on construction and erase
Expand Down
9 changes: 9 additions & 0 deletions source/common/protobuf/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ uint64_t fractionalPercentDenominatorToInt(
} // namespace ProtobufPercentHelper
} // namespace Envoy

// Convert an envoy::api::v2::core::Percent to a double or a default.
// @param message supplies the proto message containing the field.
// @param field_name supplies the field name in the message.
// @param default_value supplies the default if the field is not present.
#define PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(message, field_name, default_value) \
(!std::isnan((message).field_name().value()) \
? (message).has_##field_name() ? (message).field_name().value() : default_value \
: throw EnvoyException(fmt::format("Value not in the range of 0..100 range.")))

// Convert an envoy::api::v2::core::Percent to a rounded integer or a default.
// @param message supplies the proto message containing the field.
// @param field_name supplies the field name in the message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,35 @@ Http::FilterHeadersStatus AdaptiveConcurrencyFilter::decodeHeaders(Http::HeaderM
return Http::FilterHeadersStatus::StopIteration;
}

rq_start_time_ = config_->timeSource().monotonicTime();
// When the deferred_sample_task_ object is destroyed, the time difference between its destruction
// and the request start time is measured as the request latency. This value is sampled by the
// concurrency controller either when encoding is complete or during destruction of this filter
// object.
deferred_sample_task_ =
std::make_unique<Cleanup>([this, rq_start_time = config_->timeSource().monotonicTime()]() {
const auto now = config_->timeSource().monotonicTime();
const std::chrono::nanoseconds rq_latency = now - rq_start_time;
controller_->recordLatencySample(rq_latency);
});

return Http::FilterHeadersStatus::Continue;
}

void AdaptiveConcurrencyFilter::encodeComplete() {
const auto rq_latency = config_->timeSource().monotonicTime() - rq_start_time_;
controller_->recordLatencySample(rq_latency);
ASSERT(deferred_sample_task_);
deferred_sample_task_.reset();
}

void AdaptiveConcurrencyFilter::onDestroy() {
if (deferred_sample_task_) {
// The sampling task hasn't been destroyed yet, so this implies we did not complete encoding.
// Let's stop the sampling from happening and perform request cleanup inside the controller.
//
// TODO (tonya11en): Return some RAII handle from the concurrency controller that performs this
// logic as part of its lifecycle.
deferred_sample_task_->cancel();
controller_->cancelLatencySample();
}
}

} // namespace AdaptiveConcurrency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"

#include "common/common/cleanup.h"

#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h"
#include "extensions/filters/http/common/pass_through_filter.h"

Expand Down Expand Up @@ -57,12 +59,12 @@ class AdaptiveConcurrencyFilter : public Http::PassThroughFilter,

// Http::StreamEncoderFilter
void encodeComplete() override;
void onDestroy() override;

private:
AdaptiveConcurrencyFilterConfigSharedPtr config_;
const ConcurrencyControllerSharedPtr controller_;
MonotonicTime rq_start_time_;
std::unique_ptr<ConcurrencyController::RequestForwardingAction> forwarding_action_;
std::unique_ptr<Cleanup> deferred_sample_task_;
};

} // namespace AdaptiveConcurrency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,20 @@ envoy_package()

envoy_cc_library(
name = "concurrency_controller_lib",
srcs = [],
srcs = ["gradient_controller.cc"],
hdrs = [
"concurrency_controller.h",
"gradient_controller.h",
],
external_deps = [
"libcircllhist",
],
deps = [
"//source/common/event:dispatcher_lib",
"//source/common/protobuf",
"//source/common/runtime:runtime_lib",
"//source/common/stats:isolated_store_lib",
"//source/common/stats:stats_lib",
"@envoy_api//envoy/config/filter/http/adaptive_concurrency/v2alpha:adaptive_concurrency_cc",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,18 @@ class ConcurrencyController {
*
* @param rq_latency is the clocked round-trip time for the request.
*/
virtual void recordLatencySample(const std::chrono::nanoseconds& rq_latency) PURE;
virtual void recordLatencySample(std::chrono::nanoseconds rq_latency) PURE;

/**
* Omit sampling an outstanding request and update the internal state of the controller to reflect
* request completion.
*/
virtual void cancelLatencySample() PURE;

/**
* Returns the current concurrency limit.
*/
virtual uint32_t concurrencyLimit() const PURE;
};

} // namespace ConcurrencyController
Expand Down
Loading

0 comments on commit 22b3a0d

Please sign in to comment.