Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adaptive concurrency: Gradient algorithm implementation #7908

Merged
merged 33 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ load("@envoy_api//bazel:api_build_system.bzl", "api_proto_library_internal", "ap
licenses(["notice"]) # Apache 2

api_proto_package(
deps = ["//envoy/api/v2/core"],
deps = [
"//envoy/api/v3alpha/core",
"//envoy/type",
],
)

api_proto_library_internal(
name = "adaptive_concurrency",
srcs = ["adaptive_concurrency.proto"],
deps = [
"//envoy/api/v2/core:base",
"//envoy/api/v3alpha/core:base",
"//envoy/type:percent",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,59 @@ option java_package = "io.envoyproxy.envoy.config.filter.http.adaptive_concurren
option java_outer_classname = "AdaptiveConcurrencyProto";
option java_multiple_files = true;

import "envoy/type/percent.proto";

import "google/protobuf/duration.proto";
import "google/api/annotations.proto";
import "google/protobuf/wrappers.proto";

import "validate/validate.proto";

// Configuration parameters for the gradient controller.
message GradientControllerConfig {
// The percentile to use when summarizing aggregated samples. Defaults to p50.
envoy.type.Percent sample_aggregate_percentile = 1;

// Parameters controlling the periodic recalculation of the concurrency limit from sampled request
// latencies.
message ConcurrencyLimitCalculationParams {
// The maximum value the gradient is allowed to take. This influences how aggressively the
// concurrency limit can increase. Defaults to 2.0.
google.protobuf.DoubleValue max_gradient = 1 [(validate.rules).double.gt = 1.0];

// The allowed upper-bound on the calculated concurrency limit. Defaults to 1000.
google.protobuf.UInt32Value max_concurrency_limit = 2 [(validate.rules).uint32.gt = 0];

// The period of time samples are taken to recalculate the concurrency limit.
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
google.protobuf.Duration concurrency_update_interval = 3 [(validate.rules).duration = {
required: true,
gt: {seconds: 0}
}];
}
ConcurrencyLimitCalculationParams concurrency_limit_params = 2
[(validate.rules).message.required = true];

// Parameters controlling the periodic minRTT recalculation.
message MinimumRTTCalculationParams {
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
// The time interval between recalculating the minimum request round-trip time.
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
google.protobuf.Duration interval = 1 [(validate.rules).duration = {
required: true,
gt: {seconds: 0}
}];

// The number of requests to aggregate/sample during the minRTT recalculation window before
// updating. Defaults to 50.
google.protobuf.UInt32Value request_count = 2 [(validate.rules).uint32.gt = 0];
};
MinimumRTTCalculationParams min_rtt_calc_params = 3 [(validate.rules).message.required = true];
}

message AdaptiveConcurrency {
oneof concurrency_controller_config {
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
option (validate.required) = true;

// Gradient concurrency control will be used.
GradientControllerConfig gradient_controller_config = 1
[(validate.rules).message.required = true];
}
}
15 changes: 0 additions & 15 deletions api/envoy/config/filter/http/adaptive_concurrency/v3alpha/BUILD

This file was deleted.

This file was deleted.

10 changes: 9 additions & 1 deletion source/common/common/cleanup.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@ namespace Envoy {
// RAII cleanup via functor.
class Cleanup {
public:
Cleanup(std::function<void()> f) : f_(std::move(f)) {}
Cleanup(std::function<void()> f) : f_(std::move(f)), cancelled_(false) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't end up reverting this change because of the TODO we discussed, can you add an explicit test for this somewhere in common tests in a follow up? If we keep this code I might also use an absl::optional for the functor instead of an explicit cancelled_ boolean, but that's a minor thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both this and the protobuf utility had explicit tests in test/common/common/cleanup_test.cc and test/common/protobuf/utility_test.cc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool thanks, I missed that.

~Cleanup() { f_(); }

void cancel() {
cancelled_ = true;
f_ = []() {};
}

bool cancelled() { return cancelled_; }

private:
std::function<void()> f_;
bool cancelled_;
};

// RAII helper class to add an element to an std::list on construction and erase
Expand Down
9 changes: 9 additions & 0 deletions source/common/protobuf/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ uint64_t fractionalPercentDenominatorToInt(
} // namespace ProtobufPercentHelper
} // namespace Envoy

// Convert an envoy::api::v2::core::Percent to a double or a default.
// @param message supplies the proto message containing the field.
// @param field_name supplies the field name in the message.
// @param default_value supplies the default if the field is not present.
#define PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(message, field_name, default_value) \
(!std::isnan((message).field_name().value()) \
? (message).has_##field_name() ? (message).field_name().value() : default_value \
: throw EnvoyException(fmt::format("Value not in the range of 0..100 range.")))
tonya11en marked this conversation as resolved.
Show resolved Hide resolved

// Convert an envoy::api::v2::core::Percent to a rounded integer or a default.
// @param message supplies the proto message containing the field.
// @param field_name supplies the field name in the message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,35 @@ Http::FilterHeadersStatus AdaptiveConcurrencyFilter::decodeHeaders(Http::HeaderM
return Http::FilterHeadersStatus::StopIteration;
}

rq_start_time_ = config_->timeSource().monotonicTime();
// When the deferred_sample_task_ object is destroyed, the time difference between its destruction
// and the request start time is measured as the request latency. This value is sampled by the
// concurrency controller either when encoding is complete or during destruction of this filter
// object.
deferred_sample_task_ =
std::make_unique<Cleanup>([this, rq_start_time = config_->timeSource().monotonicTime()]() {
const auto now = config_->timeSource().monotonicTime();
const std::chrono::nanoseconds rq_latency = now - rq_start_time;
controller_->recordLatencySample(rq_latency);
});

return Http::FilterHeadersStatus::Continue;
}

void AdaptiveConcurrencyFilter::encodeComplete() {
const auto rq_latency = config_->timeSource().monotonicTime() - rq_start_time_;
controller_->recordLatencySample(rq_latency);
ASSERT(deferred_sample_task_);
deferred_sample_task_.reset();
}

void AdaptiveConcurrencyFilter::onDestroy() {
if (deferred_sample_task_) {
// The sampling task hasn't been destroyed yet, so this implies we did not complete encoding.
// Let's stop the sampling from happening and perform request cleanup inside the controller.
//
// TODO (tonya11en): Return some RAII handle from the concurrency controller that performs this
// logic as part of its lifecycle.
deferred_sample_task_->cancel();
controller_->cancelLatencySample();
}
}

} // namespace AdaptiveConcurrency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"

#include "common/common/cleanup.h"

#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h"
#include "extensions/filters/http/common/pass_through_filter.h"

Expand Down Expand Up @@ -57,12 +59,12 @@ class AdaptiveConcurrencyFilter : public Http::PassThroughFilter,

// Http::StreamEncoderFilter
void encodeComplete() override;
void onDestroy() override;

private:
AdaptiveConcurrencyFilterConfigSharedPtr config_;
const ConcurrencyControllerSharedPtr controller_;
MonotonicTime rq_start_time_;
std::unique_ptr<ConcurrencyController::RequestForwardingAction> forwarding_action_;
std::unique_ptr<Cleanup> deferred_sample_task_;
};

} // namespace AdaptiveConcurrency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,20 @@ envoy_package()

envoy_cc_library(
name = "concurrency_controller_lib",
srcs = [],
srcs = ["gradient_controller.cc"],
hdrs = [
"concurrency_controller.h",
"gradient_controller.h",
],
external_deps = [
"libcircllhist",
],
deps = [
"//source/common/event:dispatcher_lib",
"//source/common/protobuf",
"//source/common/runtime:runtime_lib",
"//source/common/stats:isolated_store_lib",
"//source/common/stats:stats_lib",
"@envoy_api//envoy/config/filter/http/adaptive_concurrency/v2alpha:adaptive_concurrency_cc",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,18 @@ class ConcurrencyController {
*
* @param rq_latency is the clocked round-trip time for the request.
*/
virtual void recordLatencySample(const std::chrono::nanoseconds& rq_latency) PURE;
virtual void recordLatencySample(std::chrono::nanoseconds rq_latency) PURE;

/**
* Omit sampling an outstanding request and update the internal state of the controller to reflect
* request completion.
*/
virtual void cancelLatencySample() PURE;

/**
* Returns the current concurrency limit.
*/
virtual uint32_t concurrencyLimit() const PURE;
};

} // namespace ConcurrencyController
Expand Down
Loading