Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive concurrency no-op implementation #7819

Merged
merged 17 commits into from
Aug 8, 2019
2 changes: 2 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,7 @@ extensions/filters/common/original_src @snowp @klarose
/*/extensions/filters/http/dynamic_forward_proxy @mattklein123 @alyssawilk
# omit_canary_hosts retry predicate
/*/extensions/retry/host/omit_canary_hosts @sriduth @snowp
# adaptive concurrency limit extension.
/*/extensions/filters/http/adaptive_concurrency @tonya11en @mattklein123
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
# http inspector
/*/extensions/filters/listener/http_inspector @crazyxy @PiotrSikora @lizan
11 changes: 11 additions & 0 deletions api/envoy/config/filter/http/adaptive_concurrency/v2alpha/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
load("@envoy_api//bazel:api_build_system.bzl", "api_proto_library_internal")

licenses(["notice"]) # Apache 2

api_proto_library_internal(
name = "adaptive_concurrency",
srcs = ["adaptive_concurrency.proto"],
deps = [
"//envoy/api/v2/core:base",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";

package envoy.config.filter.http.adaptive_concurrency.v2alpha;

option java_package = "io.envoyproxy.envoy.config.filter.http.adaptive_concurrency.v2alpha";
option java_outer_classname = "AdaptiveConcurrencyProto";
option java_multiple_files = true;
option go_package = "v2alpha";

message AdaptiveConcurrency {
}
38 changes: 38 additions & 0 deletions source/extensions/filters/http/adaptive_concurrency/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
licenses(["notice"]) # Apache 2

# HTTP L7 filter that dynamically adjusts the number of allowed concurrent
# requests based on sampled latencies.
# Public docs: TODO (tonya11en)

load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

envoy_package()

envoy_cc_library(
name = "adaptive_concurrency_filter_lib",
srcs = ["adaptive_concurrency_filter.cc"],
hdrs = ["adaptive_concurrency_filter.h"],
deps = [
"//include/envoy/http:filter_interface",
"//source/extensions/filters/http:well_known_names",
"//source/extensions/filters/http/adaptive_concurrency/concurrency_controller:concurrency_controller_lib",
"@envoy_api//envoy/config/filter/http/adaptive_concurrency/v2alpha:adaptive_concurrency_cc",
],
)

envoy_cc_library(
name = "config",
srcs = ["config.cc"],
hdrs = ["config.h"],
deps = [
"//include/envoy/registry",
"//source/common/config:filter_json_lib",
"//source/extensions/filters/http:well_known_names",
"//source/extensions/filters/http/adaptive_concurrency:adaptive_concurrency_filter_lib",
"//source/extensions/filters/http/common:factory_base_lib",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#include "extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h"

#include <chrono>
#include <cstdint>
#include <string>
#include <vector>

#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h"
#include "extensions/filters/http/well_known_names.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace AdaptiveConcurrency {

AdaptiveConcurrencyFilterConfig::AdaptiveConcurrencyFilterConfig(
const envoy::config::filter::http::adaptive_concurrency::v2alpha::AdaptiveConcurrency&,
Runtime::Loader& runtime, std::string stats_prefix, Stats::Scope& scope,
TimeSource& time_source)
: runtime_(runtime), stats_prefix_(std::move(stats_prefix)), scope_(scope),
time_source_(time_source) {}

AdaptiveConcurrencyFilter::AdaptiveConcurrencyFilter(
AdaptiveConcurrencyFilterConfigSharedPtr config, ConcurrencyControllerSharedPtr controller)
: config_(std::move(config)), controller_(std::move(controller)) {}

AdaptiveConcurrencyFilter::~AdaptiveConcurrencyFilter() = default;
tonya11en marked this conversation as resolved.
Show resolved Hide resolved

Http::FilterHeadersStatus AdaptiveConcurrencyFilter::decodeHeaders(Http::HeaderMap&,
bool end_stream) {
if (!end_stream) {
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
return Http::FilterHeadersStatus::Continue;
}

if (controller_->tryForwardRequest()) {
rq_start_time_ = config_->timeSource().monotonicTime();
return Http::FilterHeadersStatus::Continue;
}

// TODO (tonya11en): Remove filler words.
decoder_callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "filler words", nullptr,
absl::nullopt, "more filler words");
return Http::FilterHeadersStatus::StopIteration;
}

void AdaptiveConcurrencyFilter::onDestroy() {
// TODO (tonya11en).
}

Http::FilterHeadersStatus AdaptiveConcurrencyFilter::encodeHeaders(Http::HeaderMap&,
bool end_stream) {
if (end_stream) {
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
const std::chrono::nanoseconds rq_latency =
config_->timeSource().monotonicTime() - rq_start_time_;
controller_->recordLatencySample(rq_latency);
}

return Http::FilterHeadersStatus::Continue;
}

} // namespace AdaptiveConcurrency
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <unordered_set>
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
#include <vector>

#include "envoy/common/time.h"
#include "envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.pb.h"
#include "envoy/http/filter.h"
#include "envoy/runtime/runtime.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"

#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace AdaptiveConcurrency {

/**
* Configuration for the adaptive concurrency limit filter.
*/
class AdaptiveConcurrencyFilterConfig {
public:
AdaptiveConcurrencyFilterConfig(
const envoy::config::filter::http::adaptive_concurrency::v2alpha::AdaptiveConcurrency&
adaptive_concurrency,
Runtime::Loader& runtime, std::string stats_prefix, Stats::Scope& scope,
TimeSource& time_source);

Runtime::Loader& runtime() { return runtime_; }
const std::string& statsPrefix() const { return stats_prefix_; }
Stats::Scope& scope() { return scope_; }
TimeSource& timeSource() { return time_source_; }

private:
Runtime::Loader& runtime_;
const std::string stats_prefix_;
Stats::Scope& scope_;
TimeSource& time_source_;
};

using AdaptiveConcurrencyFilterConfigSharedPtr = std::shared_ptr<AdaptiveConcurrencyFilterConfig>;
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
using ConcurrencyControllerSharedPtr =
std::shared_ptr<ConcurrencyController::ConcurrencyController>;

/**
* A filter that samples request latencies and dynamically adjusts the request
* concurrency window.
*/
class AdaptiveConcurrencyFilter : public Http::StreamFilter, Logger::Loggable<Logger::Id::filter> {
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
public:
AdaptiveConcurrencyFilter(AdaptiveConcurrencyFilterConfigSharedPtr config,
ConcurrencyControllerSharedPtr controller);
~AdaptiveConcurrencyFilter() override;

// Http::StreamFilterBase
void onDestroy() override;

// Http::StreamDecoderFilter
Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& headers, bool end_stream) override;
Http::FilterDataStatus decodeData(Buffer::Instance&, bool) override {
return Http::FilterDataStatus::Continue;
}
Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap&) override {
return Http::FilterTrailersStatus::Continue;
}
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}

// Http::StreamEncoderFilter
Http::FilterHeadersStatus encode100ContinueHeaders(Http::HeaderMap&) override {
return Http::FilterHeadersStatus::Continue;
}
Http::FilterHeadersStatus encodeHeaders(Http::HeaderMap&, bool) override;
Http::FilterDataStatus encodeData(Buffer::Instance&, bool) override {
return Http::FilterDataStatus::Continue;
}
Http::FilterTrailersStatus encodeTrailers(Http::HeaderMap&) override {
return Http::FilterTrailersStatus::Continue;
}
Http::FilterMetadataStatus encodeMetadata(Http::MetadataMap&) override {
return Http::FilterMetadataStatus::Continue;
}
void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override {
encoder_callbacks_ = &callbacks;
}

private:
AdaptiveConcurrencyFilterConfigSharedPtr config_;
Http::StreamDecoderFilterCallbacks* decoder_callbacks_{};
Http::StreamEncoderFilterCallbacks* encoder_callbacks_{};
ConcurrencyControllerSharedPtr controller_;
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
MonotonicTime rq_start_time_;
};

} // namespace AdaptiveConcurrency
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
licenses(["notice"]) # Apache 2

# HTTP L7 filter that dynamically adjusts the number of allowed concurrent
# requests based on sampled latencies.
# Public docs: TODO (tonya11en)

load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

envoy_package()

envoy_cc_library(
name = "concurrency_controller_lib",
srcs = [],
hdrs = [
"concurrency_controller.h",
"noop_controller.h",
],
deps = [
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include <chrono>

#include "envoy/common/pure.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace AdaptiveConcurrency {
namespace ConcurrencyController {

/**
* Adaptive concurrency controller interface. All implementations of this
* interface must be thread-safe.
*/
class ConcurrencyController {
public:
virtual ~ConcurrencyController() = default;

/**
* Called during decoding when the adaptive concurrency filter is attempting
* to forward a request. Returns true once the controller's internal state is
* updated and the request can be forwarded.
*/
virtual bool tryForwardRequest() PURE;
tonya11en marked this conversation as resolved.
Show resolved Hide resolved

/**
* Called during encoding when the request latency is known. Records the
* request latency to update the internal state of the controller for
* concurrency limit calculations.
*
* @param rq_latency is the clocked round-trip time for the request.
*/
virtual void recordLatencySample(const std::chrono::nanoseconds& rq_latency) PURE;
tonya11en marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace ConcurrencyController
} // namespace AdaptiveConcurrency
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <chrono>

#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace AdaptiveConcurrency {
namespace ConcurrencyController {

/**
* Adaptive concurrency controller that does nothing.
*/
class NoopController : public ConcurrencyController {
public:
// ConcurrencyController.
bool tryForwardRequest() override { return true; }
void recordLatencySample(const std::chrono::nanoseconds&) override {}
};

} // namespace ConcurrencyController
} // namespace AdaptiveConcurrency
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
42 changes: 42 additions & 0 deletions source/extensions/filters/http/adaptive_concurrency/config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "extensions/filters/http/adaptive_concurrency/config.h"

#include "envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.pb.validate.h"
#include "envoy/registry/registry.h"

#include "common/config/filter_json.h"

#include "extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h"
#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/noop_controller.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace AdaptiveConcurrency {

Http::FilterFactoryCb AdaptiveConcurrencyFilterFactory::createFilterFactoryFromProtoTyped(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think anything in this file or the NOP controller is actually used in this PR. Can we just remove it for now until we have the integration test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually forced to implement createFilterFactoryFromProtoTyped or the filter factory is an abstract class. I ripped out everything else I could.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I meant I don't think you need config.h/config.cc at all in this PR? See https://github.com/envoyproxy/envoy/blob/master/source/extensions/extensions_build_config.bzl#L79 for an example of getting tests to run w/o a config file that we don't need yet.

/wait

const envoy::config::filter::http::adaptive_concurrency::v2alpha::AdaptiveConcurrency& config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) {

// TODO (tonya11en): Noop controller needs to be replaced with an actual
// implementation in a future patch.
std::shared_ptr<ConcurrencyController::ConcurrencyController> noop_ctl =
std::make_shared<ConcurrencyController::NoopController>();

AdaptiveConcurrencyFilterConfigSharedPtr filter_config(new AdaptiveConcurrencyFilterConfig(
config, context.runtime(), stats_prefix, context.scope(), context.timeSource()));

return [filter_config, noop_ctl](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<AdaptiveConcurrencyFilter>(filter_config, noop_ctl));
};
}

/**
* Static registration for the adaptive_concurrency filter. @see RegisterFactory.
*/
REGISTER_FACTORY(AdaptiveConcurrencyFilterFactory,
Server::Configuration::NamedHttpFilterConfigFactory);

} // namespace AdaptiveConcurrency
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Loading