Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adaptive concurrency: Gradient algorithm implementation #7908

Merged
merged 33 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions api/envoy/config/filter/http/adaptive_concurrency/v2alpha/BUILD

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ load("@envoy_api//bazel:api_build_system.bzl", "api_proto_library_internal", "ap
licenses(["notice"]) # Apache 2

api_proto_package(
deps = ["//envoy/api/v3alpha/core"],
deps = [
"//envoy/api/v3alpha/core",
"//envoy/type",
],
)

api_proto_library_internal(
name = "adaptive_concurrency",
srcs = ["adaptive_concurrency.proto"],
deps = [
"//envoy/api/v3alpha/core:base",
"//envoy/type:percent",
],
)
Original file line number Diff line number Diff line change
@@ -1,10 +1,64 @@
syntax = "proto3";

package envoy.config.filter.http.adaptive_concurrency.v3alpha;
package envoy.config.filter.http.adaptive_concurrency.v2alpha;

option java_package = "io.envoyproxy.envoy.config.filter.http.adaptive_concurrency.v3alpha";
option java_package = "io.envoyproxy.envoy.config.filter.http.adaptive_concurrency.v2alpha";
option java_outer_classname = "AdaptiveConcurrencyProto";
option java_multiple_files = true;

import "envoy/type/percent.proto";

import "google/protobuf/duration.proto";
import "google/api/annotations.proto";
import "google/protobuf/wrappers.proto";

import "validate/validate.proto";

// Configuration parameters for the gradient controller.
message GradientControllerConfig {
// The percentile to use when summarizing aggregated samples. Defaults to p50.
envoy.type.Percent sample_aggregate_percentile = 1;

// Parameters controlling the periodic recalculation of the concurrency limit from sampled request
// latencies.
message ConcurrencyLimitCalculationParams {
// The maximum value the gradient is allowed to take. This influences how aggressively the
// concurrency limit can increase. Defaults to 2.0.
google.protobuf.DoubleValue max_gradient = 1 [(validate.rules).double.gt = 1.0];

// The allowed upper-bound on the calculated concurrency limit. Defaults to 1000.
google.protobuf.UInt32Value max_concurrency_limit = 2 [(validate.rules).uint32.gt = 0];

// The period of time samples are taken to recalculate the concurrency limit.
google.protobuf.Duration concurrency_update_interval = 3 [(validate.rules).duration = {
required: true,
gt: {seconds: 0}
}];
}
ConcurrencyLimitCalculationParams concurrency_limit_params = 2
[(validate.rules).message.required = true];

// Parameters controlling the periodic minRTT recalculation.
message MinimumRTTCalculationParams {
// The time interval between recalculating the minimum request round-trip time.
google.protobuf.Duration interval = 1 [(validate.rules).duration = {
required: true,
gt: {seconds: 0}
}];

// The number of requests to aggregate/sample during the minRTT recalculation window before
// updating. Defaults to 50.
google.protobuf.UInt32Value request_count = 2 [(validate.rules).uint32.gt = 0];
};
MinimumRTTCalculationParams min_rtt_calc_params = 3 [(validate.rules).message.required = true];
}

message AdaptiveConcurrency {
oneof concurrency_controller_config {
option (validate.required) = true;

// Gradient concurrency control will be used.
GradientControllerConfig gradient_controller_config = 1
[(validate.rules).message.required = true];
}
}
10 changes: 9 additions & 1 deletion source/common/common/cleanup.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@ namespace Envoy {
// RAII cleanup via functor.
class Cleanup {
public:
Cleanup(std::function<void()> f) : f_(std::move(f)) {}
Cleanup(std::function<void()> f) : f_(std::move(f)), cancelled_(false) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't end up reverting this change because of the TODO we discussed, can you add an explicit test for this somewhere in common tests in a follow up? If we keep this code I might also use an absl::optional for the functor instead of an explicit cancelled_ boolean, but that's a minor thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both this and the protobuf utility had explicit tests in test/common/common/cleanup_test.cc and test/common/protobuf/utility_test.cc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool thanks, I missed that.

~Cleanup() { f_(); }

void cancel() {
cancelled_ = true;
f_ = []() {};
}

bool cancelled() { return cancelled_; }

private:
std::function<void()> f_;
bool cancelled_;
};

// RAII helper class to add an element to an std::list on construction and erase
Expand Down
9 changes: 9 additions & 0 deletions source/common/protobuf/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ uint64_t fractionalPercentDenominatorToInt(
} // namespace ProtobufPercentHelper
} // namespace Envoy

// Convert an envoy::api::v2::core::Percent to a double or a default.
// @param message supplies the proto message containing the field.
// @param field_name supplies the field name in the message.
// @param default_value supplies the default if the field is not present.
#define PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(message, field_name, default_value) \
(!std::isnan((message).field_name().value()) \
? (message).has_##field_name() ? (message).field_name().value() : default_value \
: throw EnvoyException(fmt::format("Value not in the range of 0..100 range.")))
tonya11en marked this conversation as resolved.
Show resolved Hide resolved

// Convert an envoy::api::v2::core::Percent to a rounded integer or a default.
// @param message supplies the proto message containing the field.
// @param field_name supplies the field name in the message.
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/adaptive_concurrency/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ envoy_cc_library(
"//source/extensions/filters/http:well_known_names",
"//source/extensions/filters/http/adaptive_concurrency/concurrency_controller:concurrency_controller_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"@envoy_api//envoy/config/filter/http/adaptive_concurrency/v2alpha:adaptive_concurrency_cc",
"@envoy_api//envoy/config/filter/http/adaptive_concurrency/v3alpha:adaptive_concurrency_cc",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,28 @@ Http::FilterHeadersStatus AdaptiveConcurrencyFilter::decodeHeaders(Http::HeaderM
return Http::FilterHeadersStatus::StopIteration;
}

rq_start_time_ = config_->timeSource().monotonicTime();
// When the deferred_sample_task_ object is destroyed, the time difference between its destruction
// and the request start time is measured as the request latency. This value is sampled by the
// concurrency controller either when encoding is complete or during destruction of this filter
// object.
deferred_sample_task_ =
std::make_unique<Cleanup>([this, rq_start_time = config_->timeSource().monotonicTime()]() {
const auto now = config_->timeSource().monotonicTime();
const std::chrono::nanoseconds rq_latency = now - rq_start_time;
controller_->recordLatencySample(rq_latency);
});

return Http::FilterHeadersStatus::Continue;
}

void AdaptiveConcurrencyFilter::encodeComplete() {
const auto rq_latency = config_->timeSource().monotonicTime() - rq_start_time_;
controller_->recordLatencySample(rq_latency);
ASSERT(deferred_sample_task_);
deferred_sample_task_.reset();
}

void AdaptiveConcurrencyFilter::onDestroy() {
deferred_sample_task_->cancel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this is still not right. onDestroy() is called in all cases, reset or otherwise, so you need to handle the case in which you have finished the encode. Please make sure you have tests that cover all these cases.

Also, I still think this pattern is a bit error prone and you would be better off just returning an ActiveLatencySample interface or something like that from the controller, which has a method to store the latency, and on destruction will decrement the count. I can discuss this in person tomorrow if what I'm suggesting is confusing.

controller_->cancelLatencySample();
}

} // namespace AdaptiveConcurrency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
#include <string>

#include "envoy/common/time.h"
#include "envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.pb.h"
#include "envoy/config/filter/http/adaptive_concurrency/v3alpha/adaptive_concurrency.pb.h"
#include "envoy/http/filter.h"
#include "envoy/runtime/runtime.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"

#include "common/common/cleanup.h"

#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h"
#include "extensions/filters/http/common/pass_through_filter.h"

Expand Down Expand Up @@ -57,12 +59,12 @@ class AdaptiveConcurrencyFilter : public Http::PassThroughFilter,

// Http::StreamEncoderFilter
void encodeComplete() override;
void onDestroy() override;

private:
AdaptiveConcurrencyFilterConfigSharedPtr config_;
const ConcurrencyControllerSharedPtr controller_;
MonotonicTime rq_start_time_;
std::unique_ptr<ConcurrencyController::RequestForwardingAction> forwarding_action_;
std::unique_ptr<Cleanup> deferred_sample_task_;
};

} // namespace AdaptiveConcurrency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,20 @@ envoy_package()

envoy_cc_library(
name = "concurrency_controller_lib",
srcs = [],
srcs = ["gradient_controller.cc"],
hdrs = [
"concurrency_controller.h",
"gradient_controller.h",
],
external_deps = [
"libcircllhist",
],
deps = [
"//source/common/event:dispatcher_lib",
"//source/common/protobuf",
"//source/common/runtime:runtime_lib",
"//source/common/stats:isolated_store_lib",
"//source/common/stats:stats_lib",
"@envoy_api//envoy/config/filter/http/adaptive_concurrency/v3alpha:adaptive_concurrency_cc",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,18 @@ class ConcurrencyController {
*
* @param rq_latency is the clocked round-trip time for the request.
*/
virtual void recordLatencySample(const std::chrono::nanoseconds& rq_latency) PURE;
virtual void recordLatencySample(std::chrono::nanoseconds rq_latency) PURE;

/**
* Omit sampling an outstanding request and update the internal state of the controller to reflect
* request completion.
*/
virtual void cancelLatencySample() PURE;

/**
* Returns the current concurrency limit.
*/
virtual uint32_t concurrencyLimit() const PURE;
};

} // namespace ConcurrencyController
Expand Down
Loading