From efc366f277a5bd330eaf65bf8aec7e9bcc561cf0 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Fri, 6 Jan 2023 12:21:41 +0000 Subject: [PATCH 01/10] initial commit Signed-off-by: wbpcode --- .../extensions/filters/network/generic_proxy/v3/BUILD | 1 + .../network/generic_proxy/v3/generic_proxy.proto | 6 +++++- .../filters/network/source/interface/BUILD | 3 +++ .../filters/network/source/interface/stream.h | 11 ++++++++--- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/api/contrib/envoy/extensions/filters/network/generic_proxy/v3/BUILD b/api/contrib/envoy/extensions/filters/network/generic_proxy/v3/BUILD index 6efc2efe01e8..6b5b06b72563 100644 --- a/api/contrib/envoy/extensions/filters/network/generic_proxy/v3/BUILD +++ b/api/contrib/envoy/extensions/filters/network/generic_proxy/v3/BUILD @@ -7,6 +7,7 @@ licenses(["notice"]) # Apache 2 api_proto_package( deps = [ "//envoy/config/core/v3:pkg", + "//envoy/extensions/filters/network/http_connection_manager/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", "@com_github_cncf_udpa//xds/annotations/v3:pkg", "@com_github_cncf_udpa//xds/type/matcher/v3:pkg", diff --git a/api/contrib/envoy/extensions/filters/network/generic_proxy/v3/generic_proxy.proto b/api/contrib/envoy/extensions/filters/network/generic_proxy/v3/generic_proxy.proto index bd2efca5ab19..0c5cbd7e60ea 100644 --- a/api/contrib/envoy/extensions/filters/network/generic_proxy/v3/generic_proxy.proto +++ b/api/contrib/envoy/extensions/filters/network/generic_proxy/v3/generic_proxy.proto @@ -5,6 +5,7 @@ package envoy.extensions.filters.network.generic_proxy.v3; import "contrib/envoy/extensions/filters/network/generic_proxy/v3/route.proto"; import "envoy/config/core/v3/config_source.proto"; import "envoy/config/core/v3/extension.proto"; +import "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto"; import "xds/annotations/v3/status.proto"; @@ -22,7 +23,7 @@ option (xds.annotations.v3.file_status).work_in_progress = true; // Generic proxy. // [#extension: envoy.filters.network.generic_proxy] -// [#next-free-field: 6] +// [#next-free-field: 7] message GenericProxy { // The human readable prefix to use when emitting statistics. string stat_prefix = 1 [(validate.rules).string = {min_len: 1}]; @@ -47,6 +48,9 @@ message GenericProxy { // happen. // [#extension-category: envoy.generic_proxy.filters] repeated config.core.v3.TypedExtensionConfig filters = 5; + + // Tracing configuration for the generic proxy. + http_connection_manager.v3.HttpConnectionManager.Tracing tracing = 6; } message GenericRds { diff --git a/contrib/generic_proxy/filters/network/source/interface/BUILD b/contrib/generic_proxy/filters/network/source/interface/BUILD index 28279fbbe018..1bb5c08d82cf 100644 --- a/contrib/generic_proxy/filters/network/source/interface/BUILD +++ b/contrib/generic_proxy/filters/network/source/interface/BUILD @@ -13,6 +13,9 @@ envoy_cc_library( hdrs = [ "stream.h", ], + deps = [ + "//envoy/tracing:trace_context_interface", + ], ) envoy_cc_library( diff --git a/contrib/generic_proxy/filters/network/source/interface/stream.h b/contrib/generic_proxy/filters/network/source/interface/stream.h index f0bb8924a67c..11ee23aaf145 100644 --- a/contrib/generic_proxy/filters/network/source/interface/stream.h +++ b/contrib/generic_proxy/filters/network/source/interface/stream.h @@ -5,6 +5,7 @@ #include #include "envoy/common/pure.h" +#include "envoy/tracing/trace_context.h" #include "absl/status/status.h" #include "absl/strings/string_view.h" @@ -74,7 +75,7 @@ class StreamBase { static constexpr absl::string_view name() { return "generic_proxy"; } }; -class Request : public StreamBase { +class Request : public StreamBase, public Tracing::TraceContext { public: /** * Get request host. @@ -92,7 +93,7 @@ class Request : public StreamBase { * different application protocols. It typically should be RPC service name that used to * represents set of method or functionality provided by target service. */ - virtual absl::string_view path() const PURE; + virtual absl::string_view path() const PURE; // NOLINT /** * Get request method. @@ -100,7 +101,11 @@ class Request : public StreamBase { * @return The method of generic request. The meaning of the return value may be different For * different application protocols. */ - virtual absl::string_view method() const PURE; + virtual absl::string_view method() const PURE; // NOLINT + + // TODO(wbpcode): remove this method after we update the authority() in the TraceContext to + // host(). + absl::string_view authority() const final { return host(); } }; using RequestPtr = std::unique_ptr; using RequestSharedPtr = std::shared_ptr; From 046ca4452560fa0d9974f89d174ac422104f9ecb Mon Sep 17 00:00:00 2001 From: wbpcode Date: Mon, 9 Jan 2023 12:50:15 +0000 Subject: [PATCH 02/10] temporary commit that make no sense Signed-off-by: wbpcode --- .../filters/network/source/BUILD | 1 + .../filters/network/source/interface/BUILD | 1 + .../network/source/interface/proxy_config.h | 15 ++++ .../filters/network/source/interface/stream.h | 25 +----- .../filters/network/source/proxy.cc | 76 +++++++++++++++++++ .../filters/network/source/proxy.h | 14 ++-- 6 files changed, 105 insertions(+), 27 deletions(-) diff --git a/contrib/generic_proxy/filters/network/source/BUILD b/contrib/generic_proxy/filters/network/source/BUILD index fabde2bc1890..4c0af717f235 100644 --- a/contrib/generic_proxy/filters/network/source/BUILD +++ b/contrib/generic_proxy/filters/network/source/BUILD @@ -28,6 +28,7 @@ envoy_cc_library( "//source/common/common:linked_object", "//source/common/common:minimal_logger_lib", "//source/common/stream_info:stream_info_lib", + "//source/common/tracing:custom_tag_lib", "//source/extensions/filters/network/common:factory_base_lib", "@envoy_api//contrib/envoy/extensions/filters/network/generic_proxy/v3:pkg_cc_proto", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", diff --git a/contrib/generic_proxy/filters/network/source/interface/BUILD b/contrib/generic_proxy/filters/network/source/interface/BUILD index 1bb5c08d82cf..9f2c8dc1367d 100644 --- a/contrib/generic_proxy/filters/network/source/interface/BUILD +++ b/contrib/generic_proxy/filters/network/source/interface/BUILD @@ -94,5 +94,6 @@ envoy_cc_library( ":codec_interface", ":filter_interface", ":route_interface", + "//source/common/http:conn_manager_config_interface", ], ) diff --git a/contrib/generic_proxy/filters/network/source/interface/proxy_config.h b/contrib/generic_proxy/filters/network/source/interface/proxy_config.h index 85e3e1de08d0..89a83a884ecb 100644 --- a/contrib/generic_proxy/filters/network/source/interface/proxy_config.h +++ b/contrib/generic_proxy/filters/network/source/interface/proxy_config.h @@ -4,11 +4,16 @@ #include "contrib/generic_proxy/filters/network/source/interface/filter.h" #include "contrib/generic_proxy/filters/network/source/interface/route.h" +#include "source/common/http/conn_manager_config.h" + namespace Envoy { namespace Extensions { namespace NetworkFilters { namespace GenericProxy { +using TracingConnectionManagerConfig = Http::TracingConnectionManagerConfig; +using TracingConnectionManagerConfigPtr = std::unique_ptr; + /** * Filter config interface for generic proxy read filter. */ @@ -32,6 +37,16 @@ class FilterConfig : public FilterChainFactory { * determine if they should be doing graceful closes on connections when possible. */ virtual const Network::DrainDecision& drainDecision() const PURE; + + /** + * @return Tracing::Driver tracing driver to use. + */ + virtual OptRef tracingProvider() const PURE; + + /** + * @return tracing config. + */ + virtual OptRef tracingConfig() const PURE; }; } // namespace GenericProxy diff --git a/contrib/generic_proxy/filters/network/source/interface/stream.h b/contrib/generic_proxy/filters/network/source/interface/stream.h index 11ee23aaf145..23e370e93c83 100644 --- a/contrib/generic_proxy/filters/network/source/interface/stream.h +++ b/contrib/generic_proxy/filters/network/source/interface/stream.h @@ -70,12 +70,9 @@ class StreamBase { * @param val The metadata value of string view type. */ virtual void setByReference(absl::string_view key, absl::string_view val) PURE; - - // Used for matcher. - static constexpr absl::string_view name() { return "generic_proxy"; } }; -class Request : public StreamBase, public Tracing::TraceContext { +class Request : public Tracing::TraceContext { public: /** * Get request host. @@ -86,26 +83,12 @@ class Request : public StreamBase, public Tracing::TraceContext { */ virtual absl::string_view host() const PURE; - /** - * Get request path. - * - * @return The path of generic request. The meaning of the return value may be different For - * different application protocols. It typically should be RPC service name that used to - * represents set of method or functionality provided by target service. - */ - virtual absl::string_view path() const PURE; // NOLINT - - /** - * Get request method. - * - * @return The method of generic request. The meaning of the return value may be different For - * different application protocols. - */ - virtual absl::string_view method() const PURE; // NOLINT - // TODO(wbpcode): remove this method after we update the authority() in the TraceContext to // host(). absl::string_view authority() const final { return host(); } + + // Used for matcher. + static constexpr absl::string_view name() { return "generic_proxy"; } }; using RequestPtr = std::unique_ptr; using RequestSharedPtr = std::shared_ptr; diff --git a/contrib/generic_proxy/filters/network/source/proxy.cc b/contrib/generic_proxy/filters/network/source/proxy.cc index 9e2a767eda31..fa360d634363 100644 --- a/contrib/generic_proxy/filters/network/source/proxy.cc +++ b/contrib/generic_proxy/filters/network/source/proxy.cc @@ -11,11 +11,87 @@ #include "contrib/generic_proxy/filters/network/source/interface/filter.h" #include "contrib/generic_proxy/filters/network/source/route.h" +#include "source/common/tracing/custom_tag_impl.h" + namespace Envoy { namespace Extensions { namespace NetworkFilters { namespace GenericProxy { +namespace { +using ProtoTracingConfig = envoy::extensions::filters::network::http_connection_manager::v3:: + HttpConnectionManager::Tracing; + +// TODO(wbpcode): Move +TracingConnectionManagerConfigPtr +getTracingConfig(const ProtoTracingConfig& tracing_config, + envoy::config::core::v3::TrafficDirection direction) { + + Tracing::OperationName tracing_operation_name; + + // Listener level traffic direction overrides the operation name + switch (direction) { + PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; + case envoy::config::core::v3::UNSPECIFIED: + // Continuing legacy behavior; if unspecified, we treat this as ingress. + tracing_operation_name = Tracing::OperationName::Ingress; + break; + case envoy::config::core::v3::INBOUND: + tracing_operation_name = Tracing::OperationName::Ingress; + break; + case envoy::config::core::v3::OUTBOUND: + tracing_operation_name = Tracing::OperationName::Egress; + break; + } + + Tracing::CustomTagMap custom_tags; + for (const auto& tag : tracing_config.custom_tags()) { + custom_tags.emplace(tag.tag(), Tracing::CustomTagUtility::createCustomTag(tag)); + } + + envoy::type::v3::FractionalPercent client_sampling; + client_sampling.set_numerator( + tracing_config.has_client_sampling() ? tracing_config.client_sampling().value() : 100); + envoy::type::v3::FractionalPercent random_sampling; + // TODO: Random sampling historically was an integer and default to out of 10,000. We should + // deprecate that and move to a straight fractional percent config. + uint64_t random_sampling_numerator{PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT( + tracing_config, random_sampling, 10000, 10000)}; + random_sampling.set_numerator(random_sampling_numerator); + random_sampling.set_denominator(envoy::type::v3::FractionalPercent::TEN_THOUSAND); + envoy::type::v3::FractionalPercent overall_sampling; + uint64_t overall_sampling_numerator{PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT( + tracing_config, overall_sampling, 10000, 10000)}; + overall_sampling.set_numerator(overall_sampling_numerator); + overall_sampling.set_denominator(envoy::type::v3::FractionalPercent::TEN_THOUSAND); + + const uint32_t max_path_tag_length = PROTOBUF_GET_WRAPPED_OR_DEFAULT( + tracing_config, max_path_tag_length, Tracing::DefaultMaxPathTagLength); + + return std::make_unique(TracingConnectionManagerConfig{ + tracing_operation_name, custom_tags, client_sampling, random_sampling, overall_sampling, + tracing_config.verbose(), max_path_tag_length}); +} +} // namespace + +FilterConfigImpl::FilterConfigImpl(const ProxyConfig& proto_config, const std::string& stat_prefix, + CodecFactoryPtr codec, + Rds::RouteConfigProviderSharedPtr route_config_provider, + std::vector factories, + Envoy::Server::Configuration::FactoryContext& context) + : stat_prefix_(stat_prefix), codec_factory_(std::move(codec)), + route_config_provider_(std::move(route_config_provider)), factories_(std::move(factories)), + drain_decision_(context.drainDecision()) { + + if (proto_config.has_tracing()) { + if (!proto_config.tracing().has_provider()) { + throw EnvoyException("generic proxy: tracing configuration must have provider"); + } + + tracing_config_ = getTracingConfig(proto_config.tracing(), context.direction()); + } +} + ActiveStream::ActiveStream(Filter& parent, RequestPtr request) : parent_(parent), downstream_request_stream_(std::move(request)) {} diff --git a/contrib/generic_proxy/filters/network/source/proxy.h b/contrib/generic_proxy/filters/network/source/proxy.h index 415ba3e6adcc..9b28f1927f45 100644 --- a/contrib/generic_proxy/filters/network/source/proxy.h +++ b/contrib/generic_proxy/filters/network/source/proxy.h @@ -24,6 +24,8 @@ #include "contrib/generic_proxy/filters/network/source/rds_impl.h" #include "contrib/generic_proxy/filters/network/source/route.h" +#include "envoy/tracing/trace_driver.h" + namespace Envoy { namespace Extensions { namespace NetworkFilters { @@ -43,13 +45,10 @@ struct NamedFilterFactoryCb { class FilterConfigImpl : public FilterConfig { public: - FilterConfigImpl(const std::string& stat_prefix, CodecFactoryPtr codec, - Rds::RouteConfigProviderSharedPtr route_config_provider, + FilterConfigImpl(const ProxyConfig& proto_config, const std::string& stat_prefix, + CodecFactoryPtr codec, Rds::RouteConfigProviderSharedPtr route_config_provider, std::vector factories, - Envoy::Server::Configuration::FactoryContext& context) - : stat_prefix_(stat_prefix), codec_factory_(std::move(codec)), - route_config_provider_(std::move(route_config_provider)), factories_(std::move(factories)), - drain_decision_(context.drainDecision()) {} + Envoy::Server::Configuration::FactoryContext& context); // FilterConfig RouteEntryConstSharedPtr routeEntry(const Request& request) const override { @@ -78,6 +77,9 @@ class FilterConfigImpl : public FilterConfig { std::vector factories_; + Tracing::DriverSharedPtr tracing_driver_; + TracingConnectionManagerConfigPtr tracing_config_; + const Network::DrainDecision& drain_decision_; }; From 91a2c12c1733d8ef2994a8461bf5a1079fe3fa79 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Fri, 3 Feb 2023 09:49:47 +0000 Subject: [PATCH 03/10] Revert "temporary commit that make no sense" This reverts commit 046ca4452560fa0d9974f89d174ac422104f9ecb. --- .../filters/network/source/BUILD | 1 - .../filters/network/source/interface/BUILD | 1 - .../network/source/interface/proxy_config.h | 15 ---- .../filters/network/source/interface/stream.h | 25 +++++- .../filters/network/source/proxy.cc | 76 ------------------- .../filters/network/source/proxy.h | 14 ++-- 6 files changed, 27 insertions(+), 105 deletions(-) diff --git a/contrib/generic_proxy/filters/network/source/BUILD b/contrib/generic_proxy/filters/network/source/BUILD index 4c0af717f235..fabde2bc1890 100644 --- a/contrib/generic_proxy/filters/network/source/BUILD +++ b/contrib/generic_proxy/filters/network/source/BUILD @@ -28,7 +28,6 @@ envoy_cc_library( "//source/common/common:linked_object", "//source/common/common:minimal_logger_lib", "//source/common/stream_info:stream_info_lib", - "//source/common/tracing:custom_tag_lib", "//source/extensions/filters/network/common:factory_base_lib", "@envoy_api//contrib/envoy/extensions/filters/network/generic_proxy/v3:pkg_cc_proto", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", diff --git a/contrib/generic_proxy/filters/network/source/interface/BUILD b/contrib/generic_proxy/filters/network/source/interface/BUILD index 9f2c8dc1367d..1bb5c08d82cf 100644 --- a/contrib/generic_proxy/filters/network/source/interface/BUILD +++ b/contrib/generic_proxy/filters/network/source/interface/BUILD @@ -94,6 +94,5 @@ envoy_cc_library( ":codec_interface", ":filter_interface", ":route_interface", - "//source/common/http:conn_manager_config_interface", ], ) diff --git a/contrib/generic_proxy/filters/network/source/interface/proxy_config.h b/contrib/generic_proxy/filters/network/source/interface/proxy_config.h index 89a83a884ecb..85e3e1de08d0 100644 --- a/contrib/generic_proxy/filters/network/source/interface/proxy_config.h +++ b/contrib/generic_proxy/filters/network/source/interface/proxy_config.h @@ -4,16 +4,11 @@ #include "contrib/generic_proxy/filters/network/source/interface/filter.h" #include "contrib/generic_proxy/filters/network/source/interface/route.h" -#include "source/common/http/conn_manager_config.h" - namespace Envoy { namespace Extensions { namespace NetworkFilters { namespace GenericProxy { -using TracingConnectionManagerConfig = Http::TracingConnectionManagerConfig; -using TracingConnectionManagerConfigPtr = std::unique_ptr; - /** * Filter config interface for generic proxy read filter. */ @@ -37,16 +32,6 @@ class FilterConfig : public FilterChainFactory { * determine if they should be doing graceful closes on connections when possible. */ virtual const Network::DrainDecision& drainDecision() const PURE; - - /** - * @return Tracing::Driver tracing driver to use. - */ - virtual OptRef tracingProvider() const PURE; - - /** - * @return tracing config. - */ - virtual OptRef tracingConfig() const PURE; }; } // namespace GenericProxy diff --git a/contrib/generic_proxy/filters/network/source/interface/stream.h b/contrib/generic_proxy/filters/network/source/interface/stream.h index 23e370e93c83..11ee23aaf145 100644 --- a/contrib/generic_proxy/filters/network/source/interface/stream.h +++ b/contrib/generic_proxy/filters/network/source/interface/stream.h @@ -70,9 +70,12 @@ class StreamBase { * @param val The metadata value of string view type. */ virtual void setByReference(absl::string_view key, absl::string_view val) PURE; + + // Used for matcher. + static constexpr absl::string_view name() { return "generic_proxy"; } }; -class Request : public Tracing::TraceContext { +class Request : public StreamBase, public Tracing::TraceContext { public: /** * Get request host. @@ -83,12 +86,26 @@ class Request : public Tracing::TraceContext { */ virtual absl::string_view host() const PURE; + /** + * Get request path. + * + * @return The path of generic request. The meaning of the return value may be different For + * different application protocols. It typically should be RPC service name that used to + * represents set of method or functionality provided by target service. + */ + virtual absl::string_view path() const PURE; // NOLINT + + /** + * Get request method. + * + * @return The method of generic request. The meaning of the return value may be different For + * different application protocols. + */ + virtual absl::string_view method() const PURE; // NOLINT + // TODO(wbpcode): remove this method after we update the authority() in the TraceContext to // host(). absl::string_view authority() const final { return host(); } - - // Used for matcher. - static constexpr absl::string_view name() { return "generic_proxy"; } }; using RequestPtr = std::unique_ptr; using RequestSharedPtr = std::shared_ptr; diff --git a/contrib/generic_proxy/filters/network/source/proxy.cc b/contrib/generic_proxy/filters/network/source/proxy.cc index fa360d634363..9e2a767eda31 100644 --- a/contrib/generic_proxy/filters/network/source/proxy.cc +++ b/contrib/generic_proxy/filters/network/source/proxy.cc @@ -11,87 +11,11 @@ #include "contrib/generic_proxy/filters/network/source/interface/filter.h" #include "contrib/generic_proxy/filters/network/source/route.h" -#include "source/common/tracing/custom_tag_impl.h" - namespace Envoy { namespace Extensions { namespace NetworkFilters { namespace GenericProxy { -namespace { -using ProtoTracingConfig = envoy::extensions::filters::network::http_connection_manager::v3:: - HttpConnectionManager::Tracing; - -// TODO(wbpcode): Move -TracingConnectionManagerConfigPtr -getTracingConfig(const ProtoTracingConfig& tracing_config, - envoy::config::core::v3::TrafficDirection direction) { - - Tracing::OperationName tracing_operation_name; - - // Listener level traffic direction overrides the operation name - switch (direction) { - PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; - case envoy::config::core::v3::UNSPECIFIED: - // Continuing legacy behavior; if unspecified, we treat this as ingress. - tracing_operation_name = Tracing::OperationName::Ingress; - break; - case envoy::config::core::v3::INBOUND: - tracing_operation_name = Tracing::OperationName::Ingress; - break; - case envoy::config::core::v3::OUTBOUND: - tracing_operation_name = Tracing::OperationName::Egress; - break; - } - - Tracing::CustomTagMap custom_tags; - for (const auto& tag : tracing_config.custom_tags()) { - custom_tags.emplace(tag.tag(), Tracing::CustomTagUtility::createCustomTag(tag)); - } - - envoy::type::v3::FractionalPercent client_sampling; - client_sampling.set_numerator( - tracing_config.has_client_sampling() ? tracing_config.client_sampling().value() : 100); - envoy::type::v3::FractionalPercent random_sampling; - // TODO: Random sampling historically was an integer and default to out of 10,000. We should - // deprecate that and move to a straight fractional percent config. - uint64_t random_sampling_numerator{PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT( - tracing_config, random_sampling, 10000, 10000)}; - random_sampling.set_numerator(random_sampling_numerator); - random_sampling.set_denominator(envoy::type::v3::FractionalPercent::TEN_THOUSAND); - envoy::type::v3::FractionalPercent overall_sampling; - uint64_t overall_sampling_numerator{PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT( - tracing_config, overall_sampling, 10000, 10000)}; - overall_sampling.set_numerator(overall_sampling_numerator); - overall_sampling.set_denominator(envoy::type::v3::FractionalPercent::TEN_THOUSAND); - - const uint32_t max_path_tag_length = PROTOBUF_GET_WRAPPED_OR_DEFAULT( - tracing_config, max_path_tag_length, Tracing::DefaultMaxPathTagLength); - - return std::make_unique(TracingConnectionManagerConfig{ - tracing_operation_name, custom_tags, client_sampling, random_sampling, overall_sampling, - tracing_config.verbose(), max_path_tag_length}); -} -} // namespace - -FilterConfigImpl::FilterConfigImpl(const ProxyConfig& proto_config, const std::string& stat_prefix, - CodecFactoryPtr codec, - Rds::RouteConfigProviderSharedPtr route_config_provider, - std::vector factories, - Envoy::Server::Configuration::FactoryContext& context) - : stat_prefix_(stat_prefix), codec_factory_(std::move(codec)), - route_config_provider_(std::move(route_config_provider)), factories_(std::move(factories)), - drain_decision_(context.drainDecision()) { - - if (proto_config.has_tracing()) { - if (!proto_config.tracing().has_provider()) { - throw EnvoyException("generic proxy: tracing configuration must have provider"); - } - - tracing_config_ = getTracingConfig(proto_config.tracing(), context.direction()); - } -} - ActiveStream::ActiveStream(Filter& parent, RequestPtr request) : parent_(parent), downstream_request_stream_(std::move(request)) {} diff --git a/contrib/generic_proxy/filters/network/source/proxy.h b/contrib/generic_proxy/filters/network/source/proxy.h index 9b28f1927f45..415ba3e6adcc 100644 --- a/contrib/generic_proxy/filters/network/source/proxy.h +++ b/contrib/generic_proxy/filters/network/source/proxy.h @@ -24,8 +24,6 @@ #include "contrib/generic_proxy/filters/network/source/rds_impl.h" #include "contrib/generic_proxy/filters/network/source/route.h" -#include "envoy/tracing/trace_driver.h" - namespace Envoy { namespace Extensions { namespace NetworkFilters { @@ -45,10 +43,13 @@ struct NamedFilterFactoryCb { class FilterConfigImpl : public FilterConfig { public: - FilterConfigImpl(const ProxyConfig& proto_config, const std::string& stat_prefix, - CodecFactoryPtr codec, Rds::RouteConfigProviderSharedPtr route_config_provider, + FilterConfigImpl(const std::string& stat_prefix, CodecFactoryPtr codec, + Rds::RouteConfigProviderSharedPtr route_config_provider, std::vector factories, - Envoy::Server::Configuration::FactoryContext& context); + Envoy::Server::Configuration::FactoryContext& context) + : stat_prefix_(stat_prefix), codec_factory_(std::move(codec)), + route_config_provider_(std::move(route_config_provider)), factories_(std::move(factories)), + drain_decision_(context.drainDecision()) {} // FilterConfig RouteEntryConstSharedPtr routeEntry(const Request& request) const override { @@ -77,9 +78,6 @@ class FilterConfigImpl : public FilterConfig { std::vector factories_; - Tracing::DriverSharedPtr tracing_driver_; - TracingConnectionManagerConfigPtr tracing_config_; - const Network::DrainDecision& drain_decision_; }; From a036a827b2a47bc0343eeb3b80718d743b3d5e72 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Mon, 6 Feb 2023 11:35:41 +0000 Subject: [PATCH 04/10] minor update: move TracerManager registration to common/tracing Signed-off-by: wbpcode --- source/common/tracing/BUILD | 3 ++- source/common/tracing/tracer_impl.cc | 3 +++ source/common/tracing/tracer_manager_impl.cc | 12 ++++++++++++ source/common/tracing/tracer_manager_impl.h | 7 +++++++ .../network/http_connection_manager/config.cc | 8 +------- 5 files changed, 25 insertions(+), 8 deletions(-) diff --git a/source/common/tracing/BUILD b/source/common/tracing/BUILD index 9be533ad829f..d532c410bfc5 100644 --- a/source/common/tracing/BUILD +++ b/source/common/tracing/BUILD @@ -90,12 +90,13 @@ envoy_cc_library( "tracer_manager_impl.h", ], deps = [ + ":tracer_config_lib", "//envoy/server:tracer_config_interface", "//envoy/singleton:instance_interface", "//envoy/tracing:tracer_manager_interface", "//source/common/common:minimal_logger_lib", "//source/common/config:utility_lib", - "//source/common/tracing:http_tracer_lib", + "//source/common/tracing:tracer_lib", ], ) diff --git a/source/common/tracing/tracer_impl.cc b/source/common/tracing/tracer_impl.cc index e273ac42b247..b4d2edcaf9f3 100644 --- a/source/common/tracing/tracer_impl.cc +++ b/source/common/tracing/tracer_impl.cc @@ -128,6 +128,9 @@ void TracerUtility::finalizeSpan(Span& span, const TraceContext& trace_context, it.second->applySpan(span, ctx); } } + + // Finish the span. + span.finishSpan(); } TracerImpl::TracerImpl(DriverSharedPtr driver, const LocalInfo::LocalInfo& local_info) diff --git a/source/common/tracing/tracer_manager_impl.cc b/source/common/tracing/tracer_manager_impl.cc index 61877d83c035..be96c358f430 100644 --- a/source/common/tracing/tracer_manager_impl.cc +++ b/source/common/tracing/tracer_manager_impl.cc @@ -5,6 +5,8 @@ namespace Envoy { namespace Tracing { +SINGLETON_MANAGER_REGISTRATION(tracer_manager); + TracerManagerImpl::TracerManagerImpl(Server::Configuration::TracerFactoryContextPtr factory_context) : factory_context_(std::move(factory_context)) {} @@ -59,5 +61,15 @@ void TracerManagerImpl::removeExpiredCacheEntries() { }); } +std::shared_ptr +TracerManagerImpl::singleton(Server::Configuration::FactoryContext& context) { + return context.singletonManager().getTyped( + SINGLETON_MANAGER_REGISTERED_NAME(tracer_manager), [&context] { + return std::make_shared( + std::make_unique( + context.getServerFactoryContext(), context.messageValidationVisitor())); + }); +} + } // namespace Tracing } // namespace Envoy diff --git a/source/common/tracing/tracer_manager_impl.h b/source/common/tracing/tracer_manager_impl.h index 354b66f99ab8..657406bc837d 100644 --- a/source/common/tracing/tracer_manager_impl.h +++ b/source/common/tracing/tracer_manager_impl.h @@ -5,11 +5,16 @@ #include "envoy/tracing/tracer_manager.h" #include "source/common/common/logger.h" +#include "source/common/tracing/tracer_config_impl.h" #include "source/common/tracing/tracer_impl.h" namespace Envoy { namespace Tracing { +/** + * TracerManager implementation that manages the tracers. This should be used as a singleton except + * in tests. + */ class TracerManagerImpl : public TracerManager, public Singleton::Instance, Logger::Loggable { @@ -24,6 +29,8 @@ class TracerManagerImpl : public TracerManager, return tracers_; } + static std::shared_ptr singleton(Server::Configuration::FactoryContext& context); + private: void removeExpiredCacheEntries(); diff --git a/source/extensions/filters/network/http_connection_manager/config.cc b/source/extensions/filters/network/http_connection_manager/config.cc index 86409ae183cc..8a7d1e899b48 100644 --- a/source/extensions/filters/network/http_connection_manager/config.cc +++ b/source/extensions/filters/network/http_connection_manager/config.cc @@ -191,7 +191,6 @@ Http::HeaderValidatorFactoryPtr createHeaderValidatorFactory( SINGLETON_MANAGER_REGISTRATION(date_provider); SINGLETON_MANAGER_REGISTRATION(route_config_provider_manager); SINGLETON_MANAGER_REGISTRATION(scoped_routes_config_provider_manager); -SINGLETON_MANAGER_REGISTRATION(tracer_manager); Utility::Singletons Utility::createSingletons(Server::Configuration::FactoryContext& context) { std::shared_ptr date_provider = @@ -215,12 +214,7 @@ Utility::Singletons Utility::createSingletons(Server::Configuration::FactoryCont context.admin(), *route_config_provider_manager); }); - auto tracer_manager = context.singletonManager().getTyped( - SINGLETON_MANAGER_REGISTERED_NAME(tracer_manager), [&context] { - return std::make_shared( - std::make_unique( - context.getServerFactoryContext(), context.messageValidationVisitor())); - }); + auto tracer_manager = Tracing::TracerManagerImpl::singleton(context); std::shared_ptr filter_config_provider_manager = Http::FilterChainUtility::createSingletonDownstreamFilterConfigProviderManager( From 88bbaf46389dc5244968f00f1c66dde4eaba6090 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Mon, 6 Feb 2023 11:36:07 +0000 Subject: [PATCH 05/10] initial commit of tracing for generic proxy Signed-off-by: wbpcode --- .../filters/network/source/BUILD | 2 + .../filters/network/source/config.cc | 13 ++- .../filters/network/source/interface/BUILD | 2 + .../filters/network/source/interface/filter.h | 16 ++++ .../network/source/interface/proxy_config.h | 13 +++ .../filters/network/source/interface/stream.h | 37 ++------- .../filters/network/source/proxy.cc | 79 ++++++++++++++++--- .../filters/network/source/proxy.h | 67 ++++++++++++++-- .../filters/network/source/router/router.cc | 65 +++++++++++---- .../filters/network/source/router/router.h | 20 +++-- 10 files changed, 244 insertions(+), 70 deletions(-) diff --git a/contrib/generic_proxy/filters/network/source/BUILD b/contrib/generic_proxy/filters/network/source/BUILD index fabde2bc1890..1bbf708d6efa 100644 --- a/contrib/generic_proxy/filters/network/source/BUILD +++ b/contrib/generic_proxy/filters/network/source/BUILD @@ -28,6 +28,8 @@ envoy_cc_library( "//source/common/common:linked_object", "//source/common/common:minimal_logger_lib", "//source/common/stream_info:stream_info_lib", + "//source/common/tracing:tracer_config_lib", + "//source/common/tracing:tracer_lib", "//source/extensions/filters/network/common:factory_base_lib", "@envoy_api//contrib/envoy/extensions/filters/network/generic_proxy/v3:pkg_cc_proto", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", diff --git a/contrib/generic_proxy/filters/network/source/config.cc b/contrib/generic_proxy/filters/network/source/config.cc index 07f1ef86e6c5..96fe5fe8fd9f 100644 --- a/contrib/generic_proxy/filters/network/source/config.cc +++ b/contrib/generic_proxy/filters/network/source/config.cc @@ -1,5 +1,7 @@ #include "contrib/generic_proxy/filters/network/source/config.h" +#include "source/common/tracing/tracer_manager_impl.h" + #include "contrib/generic_proxy/filters/network/source/rds.h" #include "contrib/generic_proxy/filters/network/source/rds_impl.h" @@ -89,16 +91,18 @@ Factory::createFilterFactoryFromProtoTyped(const ProxyConfig& proto_config, SINGLETON_MANAGER_REGISTERED_NAME(generic_route_config_provider_manager), [&context] { return std::make_shared(context.admin()); }); + auto tracer_manager = Tracing::TracerManagerImpl::singleton(context); + auto factories = factoriesFromProto(proto_config.codec_config(), context); std::shared_ptr custom_proxy_factory = std::move(factories.second); const FilterConfigSharedPtr config = std::make_shared( - proto_config.stat_prefix(), std::move(factories.first), + proto_config, proto_config.stat_prefix(), std::move(factories.first), routeConfigProviderFromProto(proto_config, context, *route_config_provider_manager), filtersFactoryFromProto(proto_config.filters(), proto_config.stat_prefix(), context), - context); + *tracer_manager, context); - return [route_config_provider_manager, config, + return [route_config_provider_manager, tracer_manager, config, &context, custom_proxy_factory](Envoy::Network::FilterManager& filter_manager) -> void { // Create filter by the custom filter factory if the custom filter factory is not null. if (custom_proxy_factory != nullptr) { @@ -106,7 +110,8 @@ Factory::createFilterFactoryFromProtoTyped(const ProxyConfig& proto_config, return; } - filter_manager.addReadFilter(std::make_shared(config)); + filter_manager.addReadFilter( + std::make_shared(config, context.mainThreadDispatcher().timeSource())); }; } diff --git a/contrib/generic_proxy/filters/network/source/interface/BUILD b/contrib/generic_proxy/filters/network/source/interface/BUILD index 1bb5c08d82cf..a7144b4fe329 100644 --- a/contrib/generic_proxy/filters/network/source/interface/BUILD +++ b/contrib/generic_proxy/filters/network/source/interface/BUILD @@ -94,5 +94,7 @@ envoy_cc_library( ":codec_interface", ":filter_interface", ":route_interface", + "//envoy/tracing:trace_config_interface", + "//envoy/tracing:tracer_interface", ], ) diff --git a/contrib/generic_proxy/filters/network/source/interface/filter.h b/contrib/generic_proxy/filters/network/source/interface/filter.h index e6a78716dd40..6069db45423b 100644 --- a/contrib/generic_proxy/filters/network/source/interface/filter.h +++ b/contrib/generic_proxy/filters/network/source/interface/filter.h @@ -49,6 +49,22 @@ class StreamFilterCallbacks { * name will be used to get the config. */ virtual const RouteSpecificFilterConfig* perFilterConfig() const PURE; + + /** + * @return StreamInfo::StreamInfo& the stream info object associated with the stream. + */ + virtual const StreamInfo::StreamInfo& streamInfo() const PURE; + virtual StreamInfo::StreamInfo& streamInfo() PURE; + + /** + * @return Tracing::Span& the active span associated with the stream. + */ + virtual Tracing::Span& activeSpan() PURE; + + /** + * @return const Tracing::Config& the tracing configuration. + */ + virtual OptRef tracingConfig() const PURE; }; class DecoderFilterCallback : public virtual StreamFilterCallbacks { diff --git a/contrib/generic_proxy/filters/network/source/interface/proxy_config.h b/contrib/generic_proxy/filters/network/source/interface/proxy_config.h index 85e3e1de08d0..6fd666e7f0e8 100644 --- a/contrib/generic_proxy/filters/network/source/interface/proxy_config.h +++ b/contrib/generic_proxy/filters/network/source/interface/proxy_config.h @@ -1,5 +1,8 @@ #pragma once +#include "envoy/tracing/trace_config.h" +#include "envoy/tracing/tracer.h" + #include "contrib/generic_proxy/filters/network/source/interface/codec.h" #include "contrib/generic_proxy/filters/network/source/interface/filter.h" #include "contrib/generic_proxy/filters/network/source/interface/route.h" @@ -32,6 +35,16 @@ class FilterConfig : public FilterChainFactory { * determine if they should be doing graceful closes on connections when possible. */ virtual const Network::DrainDecision& drainDecision() const PURE; + + /** + * @return Tracing::Tracer tracing provider to use. + */ + virtual OptRef tracingProvider() const PURE; + + /** + * @return connection manager tracing config. + */ + virtual OptRef tracingConfig() const PURE; }; } // namespace GenericProxy diff --git a/contrib/generic_proxy/filters/network/source/interface/stream.h b/contrib/generic_proxy/filters/network/source/interface/stream.h index 11ee23aaf145..c8a628fcd918 100644 --- a/contrib/generic_proxy/filters/network/source/interface/stream.h +++ b/contrib/generic_proxy/filters/network/source/interface/stream.h @@ -75,38 +75,15 @@ class StreamBase { static constexpr absl::string_view name() { return "generic_proxy"; } }; -class Request : public StreamBase, public Tracing::TraceContext { +/** + * Using interface that provided by the TraceContext as the interface of generic request. + */ +class Request : public Tracing::TraceContext { public: - /** - * Get request host. - * - * @return The host of generic request. The meaning of the return value may be different For - * different application protocols. It typically should be domain, VIP, or service name that - * used to represents target service instances. - */ - virtual absl::string_view host() const PURE; - - /** - * Get request path. - * - * @return The path of generic request. The meaning of the return value may be different For - * different application protocols. It typically should be RPC service name that used to - * represents set of method or functionality provided by target service. - */ - virtual absl::string_view path() const PURE; // NOLINT - - /** - * Get request method. - * - * @return The method of generic request. The meaning of the return value may be different For - * different application protocols. - */ - virtual absl::string_view method() const PURE; // NOLINT - - // TODO(wbpcode): remove this method after we update the authority() in the TraceContext to - // host(). - absl::string_view authority() const final { return host(); } + // Used for matcher. + static constexpr absl::string_view name() { return "generic_proxy"; } }; + using RequestPtr = std::unique_ptr; using RequestSharedPtr = std::shared_ptr; diff --git a/contrib/generic_proxy/filters/network/source/proxy.cc b/contrib/generic_proxy/filters/network/source/proxy.cc index 9e2a767eda31..bdb3da6dea61 100644 --- a/contrib/generic_proxy/filters/network/source/proxy.cc +++ b/contrib/generic_proxy/filters/network/source/proxy.cc @@ -16,19 +16,59 @@ namespace Extensions { namespace NetworkFilters { namespace GenericProxy { +namespace { + +Tracing::Decision tracingDecision(const Tracing::ConnectionManagerTracingConfig& tracing_config, + Runtime::Loader& runtime) { + bool traced = runtime.snapshot().featureEnabled("tracing.random_sampling", + tracing_config.getRandomSampling()); + + if (traced) { + return {Tracing::Reason::Sampling, true}; + } + return {Tracing::Reason::NotTraceable, false}; +} + +} // namespace + ActiveStream::ActiveStream(Filter& parent, RequestPtr request) - : parent_(parent), downstream_request_stream_(std::move(request)) {} + : parent_(parent), downstream_request_stream_(std::move(request)), + stream_info_(parent_.time_source_, + parent_.callbacks_->connection().connectionInfoProviderSharedPtr()) { -ActiveStream::~ActiveStream() { - for (auto& filter : decoder_filters_) { - filter->filter_->onDestroy(); + connection_manager_tracing_config_ = parent_.config_->tracingConfig(); + + auto tracer = parent_.config_->tracingProvider(); + + if (!connection_manager_tracing_config_.has_value() || !tracer.has_value()) { + return; } - for (auto& filter : encoder_filters_) { - if (filter->isDualFilter()) { - continue; - } - filter->filter_->onDestroy(); + + auto decision = tracingDecision(connection_manager_tracing_config_.value(), parent_.runtime_); + if (decision.traced) { + stream_info_.setTraceReason(decision.reason); } + active_span_ = tracer->startSpan(*this, *request, stream_info_, decision); +} + +Tracing::OperationName ActiveStream::operationName() const { + ASSERT(connection_manager_tracing_config_.has_value()); + return connection_manager_tracing_config_->operationName(); +} + +const Tracing::CustomTagMap* ActiveStream::customTags() const { + ASSERT(connection_manager_tracing_config_.has_value()); + return &connection_manager_tracing_config_->getCustomTags(); +} + +bool ActiveStream::verbose() const { + ASSERT(connection_manager_tracing_config_.has_value()); + return connection_manager_tracing_config_->verbose(); +} + +uint32_t ActiveStream::maxPathTagLength() const { + ASSERT(connection_manager_tracing_config_.has_value()); + return connection_manager_tracing_config_->maxPathTagLength(); } Envoy::Event::Dispatcher& ActiveStream::dispatcher() { return parent_.connection().dispatcher(); } @@ -119,6 +159,25 @@ void ActiveStream::initializeFilterChain(FilterChainFactory& factory) { std::reverse(encoder_filters_.begin(), encoder_filters_.end()); } +void ActiveStream::completeRequest() { + stream_info_.onRequestComplete(); + + if (active_span_) { + Tracing::TracerUtility::finalizeSpan(*active_span_, *downstream_request_stream_, stream_info_, + *this, false); + } + + for (auto& filter : decoder_filters_) { + filter->filter_->onDestroy(); + } + for (auto& filter : encoder_filters_) { + if (filter->isDualFilter()) { + continue; + } + filter->filter_->onDestroy(); + } +} + Envoy::Network::FilterStatus Filter::onData(Envoy::Buffer::Instance& data, bool) { if (downstream_connection_closed_) { return Envoy::Network::FilterStatus::StopIteration; @@ -151,6 +210,8 @@ void Filter::newDownstreamRequest(RequestPtr request) { } void Filter::deferredStream(ActiveStream& stream) { + stream.completeRequest(); + if (!stream.inserted()) { return; } diff --git a/contrib/generic_proxy/filters/network/source/proxy.h b/contrib/generic_proxy/filters/network/source/proxy.h index 415ba3e6adcc..4642b46a7835 100644 --- a/contrib/generic_proxy/filters/network/source/proxy.h +++ b/contrib/generic_proxy/filters/network/source/proxy.h @@ -7,11 +7,14 @@ #include "envoy/network/connection.h" #include "envoy/network/filter.h" #include "envoy/server/factory_context.h" +#include "envoy/tracing/tracer_manager.h" #include "source/common/buffer/buffer_impl.h" #include "source/common/common/linked_object.h" #include "source/common/common/logger.h" #include "source/common/stream_info/stream_info_impl.h" +#include "source/common/tracing/tracer_config_impl.h" +#include "source/common/tracing/tracer_impl.h" #include "contrib/envoy/extensions/filters/network/generic_proxy/v3/generic_proxy.pb.h" #include "contrib/envoy/extensions/filters/network/generic_proxy/v3/generic_proxy.pb.validate.h" @@ -43,13 +46,21 @@ struct NamedFilterFactoryCb { class FilterConfigImpl : public FilterConfig { public: - FilterConfigImpl(const std::string& stat_prefix, CodecFactoryPtr codec, + FilterConfigImpl(const ProxyConfig& config, const std::string& stat_prefix, CodecFactoryPtr codec, Rds::RouteConfigProviderSharedPtr route_config_provider, std::vector factories, + Tracing::TracerManager& tracer_manager, Envoy::Server::Configuration::FactoryContext& context) : stat_prefix_(stat_prefix), codec_factory_(std::move(codec)), route_config_provider_(std::move(route_config_provider)), factories_(std::move(factories)), - drain_decision_(context.drainDecision()) {} + drain_decision_(context.drainDecision()) { + if (config.has_tracing()) { + tracer_ = tracer_manager.getOrCreateTracer( + config.tracing().has_provider() ? &config.tracing().provider() : nullptr); + tracing_config_ = std::make_unique( + context.direction(), config.tracing()); + } + } // FilterConfig RouteEntryConstSharedPtr routeEntry(const Request& request) const override { @@ -58,6 +69,12 @@ class FilterConfigImpl : public FilterConfig { } const CodecFactory& codecFactory() const override { return *codec_factory_; } const Network::DrainDecision& drainDecision() const override { return drain_decision_; } + OptRef tracingProvider() const override { + return makeOptRefFromPtr(tracer_.get()); + } + OptRef tracingConfig() const override { + return makeOptRefFromPtr(tracing_config_.get()); + } // FilterChainFactory void createFilterChain(FilterChainManager& manager) override { @@ -79,12 +96,16 @@ class FilterConfigImpl : public FilterConfig { std::vector factories_; const Network::DrainDecision& drain_decision_; + + Tracing::TracerSharedPtr tracer_{std::make_shared()}; + Tracing::ConnectionManagerTracingConfigPtr tracing_config_; }; class ActiveStream : public FilterChainManager, public LinkedObject, public Envoy::Event::DeferredDeletable, public ResponseEncoderCallback, + public Tracing::Config, Logger::Loggable { public: class ActiveFilterBase : public virtual StreamFilterCallbacks { @@ -103,6 +124,10 @@ class ActiveStream : public FilterChainManager, } return nullptr; } + const StreamInfo::StreamInfo& streamInfo() const override { return parent_.stream_info_; } + StreamInfo::StreamInfo& streamInfo() override { return parent_.stream_info_; } + Tracing::Span& activeSpan() override { return parent_.activeSpan(); } + OptRef tracingConfig() const override { return parent_.tracingConfig(); } bool isDualFilter() const { return is_dual_; } @@ -175,7 +200,6 @@ class ActiveStream : public FilterChainManager, }; ActiveStream(Filter& parent, RequestPtr request); - ~ActiveStream() override; void addDecoderFilter(ActiveDecoderFilterPtr filter) { decoder_filters_.emplace_back(std::move(filter)); @@ -212,7 +236,32 @@ class ActiveStream : public FilterChainManager, size_t nextDecoderFilterIndexForTest() { return next_decoder_filter_index_; } size_t nextEncoderFilterIndexForTest() { return next_encoder_filter_index_; } + Tracing::Span& activeSpan() { + if (active_span_) { + return *active_span_; + } else { + return Tracing::NullSpan::instance(); + } + } + + OptRef tracingConfig() const { + if (connection_manager_tracing_config_.has_value()) { + return {*this}; + } + return {}; + } + + void completeRequest(); + private: + // Keep these methods private to ensure that these methods are only called by the reference + // returned by the public tracingConfig() method. + // Tracing::TracingConfig + Tracing::OperationName operationName() const override; + const Tracing::CustomTagMap* customTags() const override; + bool verbose() const override; + uint32_t maxPathTagLength() const override; + bool active_stream_reset_{false}; Filter& parent_; @@ -227,6 +276,11 @@ class ActiveStream : public FilterChainManager, std::vector encoder_filters_; size_t next_encoder_filter_index_{0}; + + StreamInfo::StreamInfoImpl stream_info_; + + OptRef connection_manager_tracing_config_; + Tracing::SpanPtr active_span_; }; using ActiveStreamPtr = std::unique_ptr; @@ -235,8 +289,9 @@ class Filter : public Envoy::Network::ReadFilter, public Envoy::Logger::Loggable, public RequestDecoderCallback { public: - Filter(FilterConfigSharedPtr config) - : config_(std::move(config)), drain_decision_(config_->drainDecision()) { + Filter(FilterConfigSharedPtr config, TimeSource& time_source, Runtime::Loader& runtime) + : config_(std::move(config)), drain_decision_(config_->drainDecision()), + time_source_(time_source), runtime_(runtime) { decoder_ = config_->codecFactory().requestDecoder(); decoder_->setDecoderCallback(*this); response_encoder_ = config_->codecFactory().responseEncoder(); @@ -302,6 +357,8 @@ class Filter : public Envoy::Network::ReadFilter, FilterConfigSharedPtr config_{}; const Network::DrainDecision& drain_decision_; + TimeSource& time_source_; + Runtime::Loader& runtime_; RequestDecoderPtr decoder_; ResponseEncoderPtr response_encoder_; diff --git a/contrib/generic_proxy/filters/network/source/router/router.cc b/contrib/generic_proxy/filters/network/source/router/router.cc index 1d81d418d026..60a193c5b6b5 100644 --- a/contrib/generic_proxy/filters/network/source/router/router.cc +++ b/contrib/generic_proxy/filters/network/source/router/router.cc @@ -4,6 +4,7 @@ #include "envoy/network/connection.h" #include "source/common/common/assert.h" +#include "source/common/tracing/tracer_impl.h" #include "contrib/generic_proxy/filters/network/source/interface/filter.h" @@ -22,7 +23,26 @@ absl::string_view resetReasonToStringView(StreamResetReason reason) { } // namespace UpstreamRequest::UpstreamRequest(RouterFilter& parent, Upstream::TcpPoolData tcp_data) - : parent_(parent), tcp_data_(std::move(tcp_data)) {} + : parent_(parent), tcp_data_(std::move(tcp_data)), + stream_info_(parent.context_.mainThreadDispatcher().timeSource(), nullptr) { + + stream_info_.setUpstreamInfo(std::make_shared()); + parent_.callbacks_->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo()); + + stream_info_.healthCheck(parent_.callbacks_->streamInfo().healthCheck()); + absl::optional cluster_info = + parent_.callbacks_->streamInfo().upstreamClusterInfo(); + if (cluster_info.has_value()) { + stream_info_.setUpstreamClusterInfo(*cluster_info); + } + + if (auto tracing_config = parent_.callbacks_->tracingConfig(); tracing_config.has_value()) { + span_ = parent_.callbacks_->activeSpan().spawnChild( + tracing_config.value().get(), + absl::StrCat("router ", parent_.cluster_->observabilityName(), " egress"), + parent.context_.mainThreadDispatcher().timeSource().systemTime()); + } +} void UpstreamRequest::startStream() { Tcp::ConnectionPool::Cancellable* handle = tcp_data_.newConnection(*this); @@ -48,10 +68,22 @@ void UpstreamRequest::resetStream(StreamResetReason reason) { conn_data_.reset(); } + if (span_ != nullptr) { + span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True); + span_->setTag(Tracing::Tags::get().ErrorReason, resetReasonToStringView(reason)); + Tracing::TracerUtility::finalizeSpan(*span_, *parent_.request_, stream_info_, + parent_.callbacks_->tracingConfig().value(), true); + } + parent_.onUpstreamRequestReset(*this, reason); } void UpstreamRequest::completeUpstreamRequest() { + if (span_ != nullptr) { + Tracing::TracerUtility::finalizeSpan(*span_, *parent_.request_, stream_info_, + parent_.callbacks_->tracingConfig().value(), true); + } + response_complete_ = true; ASSERT(conn_pool_handle_ == nullptr); ASSERT(conn_data_ != nullptr); @@ -73,6 +105,15 @@ void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason, ab resetStream(StreamResetReason::ConnectionFailure); } +void UpstreamRequest::onEncodingSuccess(Buffer::Instance& buffer, bool expect_response) { + ENVOY_LOG(debug, "upstream request encoding success"); + encodeBufferToUpstream(buffer); + if (!expect_response) { + completeUpstreamRequest(); + parent_.completeDirectly(); + } +} + void UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn, Upstream::HostDescriptionConstSharedPtr host) { ENVOY_LOG(debug, "upstream request: tcp connection has ready"); @@ -82,12 +123,11 @@ void UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn, conn_data_->addUpstreamCallbacks(*this); conn_pool_handle_ = nullptr; - encodeBufferToUpstream(parent_.upstream_request_buffer_); - - if (parent_.expect_response_ == false) { - completeUpstreamRequest(); - parent_.completeDirectly(); + if (span_ != nullptr) { + span_->injectContext(*parent_.request_, upstream_host_); } + + parent_.request_encoder_->encode(*parent_.request_, *this); } void UpstreamRequest::onUpstreamData(Buffer::Instance& data, bool end_stream) { @@ -208,13 +248,6 @@ void RouterFilter::resetStream(StreamResetReason reason) { filter_complete_ = true; } -void RouterFilter::onEncodingSuccess(Buffer::Instance& buffer, bool expect_response) { - ENVOY_LOG(debug, "upstream request encoding success"); - upstream_request_buffer_.move(buffer); - kickOffNewUpstreamRequest(); - expect_response_ = expect_response; -} - void RouterFilter::kickOffNewUpstreamRequest() { const auto& cluster_name = route_entry_->clusterName(); @@ -225,6 +258,8 @@ void RouterFilter::kickOffNewUpstreamRequest() { return; } + cluster_ = thread_local_cluster->info(); + auto cluster_info = thread_local_cluster->info(); if (cluster_info->maintenanceMode()) { callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, "cluster_maintain_mode")); @@ -250,6 +285,7 @@ FilterStatus RouterFilter::onStreamDecoded(Request& request) { ENVOY_LOG(debug, "Try route request to the upstream based on the route entry"); setRouteEntry(callbacks_->routeEntry()); + request_ = &request; if (route_entry_ == nullptr) { ENVOY_LOG(debug, "No route for current request and send local reply"); @@ -258,8 +294,7 @@ FilterStatus RouterFilter::onStreamDecoded(Request& request) { } request_encoder_ = callbacks_->downstreamCodec().requestEncoder(); - request_encoder_->encode(request, *this); - + kickOffNewUpstreamRequest(); return FilterStatus::StopIteration; } diff --git a/contrib/generic_proxy/filters/network/source/router/router.h b/contrib/generic_proxy/filters/network/source/router/router.h index 1fe06319fc95..27d5932182aa 100644 --- a/contrib/generic_proxy/filters/network/source/router/router.h +++ b/contrib/generic_proxy/filters/network/source/router/router.h @@ -5,6 +5,7 @@ #include "source/common/buffer/buffer_impl.h" #include "source/common/common/linked_object.h" +#include "source/common/stream_info/stream_info_impl.h" #include "source/common/upstream/load_balancer_impl.h" #include "contrib/generic_proxy/filters/network/source/interface/codec.h" @@ -38,6 +39,7 @@ class UpstreamRequest : public Tcp::ConnectionPool::Callbacks, public Tcp::ConnectionPool::UpstreamCallbacks, public LinkedObject, public Envoy::Event::DeferredDeletable, + public RequestEncoderCallback, public ResponseDecoderCallback, Logger::Loggable { public: @@ -63,6 +65,9 @@ class UpstreamRequest : public Tcp::ConnectionPool::Callbacks, void onDecodingSuccess(ResponsePtr response) override; void onDecodingFailure() override; + // RequestEncoderCallback + void onEncodingSuccess(Buffer::Instance& buffer, bool expect_response) override; + void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host); void encodeBufferToUpstream(Buffer::Instance& buffer); @@ -81,12 +86,17 @@ class UpstreamRequest : public Tcp::ConnectionPool::Callbacks, bool response_started_{}; bool response_complete_{}; ResponseDecoderPtr response_decoder_; + + Buffer::OwnedImpl upstream_request_buffer_; + bool expect_response_{}; + + StreamInfo::StreamInfoImpl stream_info_; + Tracing::SpanPtr span_; }; using UpstreamRequestPtr = std::unique_ptr; class RouterFilter : public DecoderFilter, public Upstream::LoadBalancerContextBase, - public RequestEncoderCallback, Logger::Loggable { public: RouterFilter(Server::Configuration::FactoryContext& context) : context_(context) {} @@ -99,9 +109,6 @@ class RouterFilter : public DecoderFilter, } FilterStatus onStreamDecoded(Request& request) override; - // RequestEncoderCallback - void onEncodingSuccess(Buffer::Instance& buffer, bool expect_response) override; - void onUpstreamResponse(ResponsePtr response); void completeDirectly(); @@ -118,12 +125,11 @@ class RouterFilter : public DecoderFilter, void kickOffNewUpstreamRequest(); void resetStream(StreamResetReason reason); - bool expect_response_{}; bool filter_complete_{}; const RouteEntry* route_entry_{}; - - Buffer::OwnedImpl upstream_request_buffer_; + Upstream::ClusterInfoConstSharedPtr cluster_; + Request* request_{}; RequestEncoderPtr request_encoder_; From 5cdfbbe9b97c79e7f2e69b056bfdbc293173fa31 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Mon, 6 Feb 2023 13:47:42 +0000 Subject: [PATCH 06/10] some test Signed-off-by: wbpcode --- .../filters/network/source/BUILD | 1 + .../filters/network/source/config.cc | 14 +++- .../filters/network/source/proxy.h | 18 ++--- .../generic_proxy/filters/network/test/BUILD | 1 + .../filters/network/test/config_test.cc | 48 ++++++++++++ .../filters/network/test/mocks/filter.h | 2 + .../filters/network/test/proxy_test.cc | 74 +++++++++++++++++-- 7 files changed, 138 insertions(+), 20 deletions(-) diff --git a/contrib/generic_proxy/filters/network/source/BUILD b/contrib/generic_proxy/filters/network/source/BUILD index 1bbf708d6efa..6bc7dc4ce76f 100644 --- a/contrib/generic_proxy/filters/network/source/BUILD +++ b/contrib/generic_proxy/filters/network/source/BUILD @@ -30,6 +30,7 @@ envoy_cc_library( "//source/common/stream_info:stream_info_lib", "//source/common/tracing:tracer_config_lib", "//source/common/tracing:tracer_lib", + "//source/common/tracing:tracer_manager_lib", "//source/extensions/filters/network/common:factory_base_lib", "@envoy_api//contrib/envoy/extensions/filters/network/generic_proxy/v3:pkg_cc_proto", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", diff --git a/contrib/generic_proxy/filters/network/source/config.cc b/contrib/generic_proxy/filters/network/source/config.cc index 96fe5fe8fd9f..f365633cf78d 100644 --- a/contrib/generic_proxy/filters/network/source/config.cc +++ b/contrib/generic_proxy/filters/network/source/config.cc @@ -96,11 +96,21 @@ Factory::createFilterFactoryFromProtoTyped(const ProxyConfig& proto_config, auto factories = factoriesFromProto(proto_config.codec_config(), context); std::shared_ptr custom_proxy_factory = std::move(factories.second); + Tracing::TracerSharedPtr tracer; + Tracing::ConnectionManagerTracingConfigPtr tracing_config; + if (proto_config.has_tracing()) { + if (proto_config.tracing().has_provider()) { + tracer = tracer_manager->getOrCreateTracer(&proto_config.tracing().provider()); + } + tracing_config = std::make_unique( + context.direction(), proto_config.tracing()); + } + const FilterConfigSharedPtr config = std::make_shared( - proto_config, proto_config.stat_prefix(), std::move(factories.first), + proto_config.stat_prefix(), std::move(factories.first), routeConfigProviderFromProto(proto_config, context, *route_config_provider_manager), filtersFactoryFromProto(proto_config.filters(), proto_config.stat_prefix(), context), - *tracer_manager, context); + std::move(tracer), std::move(tracing_config), context); return [route_config_provider_manager, tracer_manager, config, &context, custom_proxy_factory](Envoy::Network::FilterManager& filter_manager) -> void { diff --git a/contrib/generic_proxy/filters/network/source/proxy.h b/contrib/generic_proxy/filters/network/source/proxy.h index 4642b46a7835..96ca02b46bae 100644 --- a/contrib/generic_proxy/filters/network/source/proxy.h +++ b/contrib/generic_proxy/filters/network/source/proxy.h @@ -46,21 +46,15 @@ struct NamedFilterFactoryCb { class FilterConfigImpl : public FilterConfig { public: - FilterConfigImpl(const ProxyConfig& config, const std::string& stat_prefix, CodecFactoryPtr codec, + FilterConfigImpl(const std::string& stat_prefix, CodecFactoryPtr codec, Rds::RouteConfigProviderSharedPtr route_config_provider, - std::vector factories, - Tracing::TracerManager& tracer_manager, + std::vector factories, Tracing::TracerSharedPtr tracer, + Tracing::ConnectionManagerTracingConfigPtr tracing_config, Envoy::Server::Configuration::FactoryContext& context) : stat_prefix_(stat_prefix), codec_factory_(std::move(codec)), route_config_provider_(std::move(route_config_provider)), factories_(std::move(factories)), - drain_decision_(context.drainDecision()) { - if (config.has_tracing()) { - tracer_ = tracer_manager.getOrCreateTracer( - config.tracing().has_provider() ? &config.tracing().provider() : nullptr); - tracing_config_ = std::make_unique( - context.direction(), config.tracing()); - } - } + drain_decision_(context.drainDecision()), tracer_(std::move(tracer)), + tracing_config_(std::move(tracing_config)) {} // FilterConfig RouteEntryConstSharedPtr routeEntry(const Request& request) const override { @@ -97,7 +91,7 @@ class FilterConfigImpl : public FilterConfig { const Network::DrainDecision& drain_decision_; - Tracing::TracerSharedPtr tracer_{std::make_shared()}; + Tracing::TracerSharedPtr tracer_; Tracing::ConnectionManagerTracingConfigPtr tracing_config_; }; diff --git a/contrib/generic_proxy/filters/network/test/BUILD b/contrib/generic_proxy/filters/network/test/BUILD index 2376a9f15d7c..08a4ed870bd2 100644 --- a/contrib/generic_proxy/filters/network/test/BUILD +++ b/contrib/generic_proxy/filters/network/test/BUILD @@ -68,6 +68,7 @@ envoy_cc_test( "//contrib/generic_proxy/filters/network/test/mocks:filter_mocks", "//contrib/generic_proxy/filters/network/test/mocks:route_mocks", "//source/common/buffer:buffer_lib", + "//source/extensions/tracers/zipkin:config", "//test/mocks/network:network_mocks", "//test/mocks/server:factory_context_mocks", "//test/test_common:registry_lib", diff --git a/contrib/generic_proxy/filters/network/test/config_test.cc b/contrib/generic_proxy/filters/network/test/config_test.cc index e865a2c8c9fa..9550bd8f2860 100644 --- a/contrib/generic_proxy/filters/network/test/config_test.cc +++ b/contrib/generic_proxy/filters/network/test/config_test.cc @@ -330,6 +330,54 @@ TEST(BasicFilterConfigTest, CreatingFilterFactories) { } } +TEST(BasicFilterConfigTest, TestConfigurationWithTracing) { + const std::string config_yaml = R"EOF( + stat_prefix: ingress + filters: + - name: envoy.filters.generic.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.generic_proxy.router.v3.Router + codec_config: + name: mock + typed_config: + "@type": type.googleapis.com/xds.type.v3.TypedStruct + type_url: envoy.generic_proxy.codecs.mock.type + value: {} + generic_rds: + config_source: { resource_api_version: V3, ads: {} } + route_config_name: test_route + tracing: + max_path_tag_length: 128 + provider: + name: zipkin + typed_config: + "@type": type.googleapis.com/envoy.config.trace.v3.ZipkinConfig + collector_cluster: zipkin + collector_endpoint: "/api/v2/spans" + collector_endpoint_version: HTTP_JSON + )EOF"; + + MockStreamCodecFactoryConfig codec_factory_config; + Registry::InjectFactory registration(codec_factory_config); + + NiceMock factory_context; + + Factory factory; + + envoy::extensions::filters::network::generic_proxy::v3::GenericProxy config; + TestUtility::loadFromYaml(config_yaml, config); + + auto mock_codec_factory = std::make_unique(); + + EXPECT_CALL(codec_factory_config, createCodecFactory(_, _)) + .WillOnce(Return(testing::ByMove(std::move(mock_codec_factory)))); + + Network::FilterFactoryCb cb = factory.createFilterFactoryFromProto(config, factory_context); + EXPECT_NE(nullptr, cb); + Network::MockFilterManager filter_manager; + cb(filter_manager); +} + } // namespace } // namespace GenericProxy } // namespace NetworkFilters diff --git a/contrib/generic_proxy/filters/network/test/mocks/filter.h b/contrib/generic_proxy/filters/network/test/mocks/filter.h index 735eb0d36439..8c684aed75c1 100644 --- a/contrib/generic_proxy/filters/network/test/mocks/filter.h +++ b/contrib/generic_proxy/filters/network/test/mocks/filter.h @@ -85,6 +85,8 @@ template class MockStreamFilterCallbacks : public Base { MOCK_METHOD(void, resetStream, ()); MOCK_METHOD(const RouteEntry*, routeEntry, (), (const)); MOCK_METHOD(const RouteSpecificFilterConfig*, perFilterConfig, (), (const)); + MOCK_METHOD(Tracing::Span&, activeSpan, ()); + MOCK_METHOD(OptRef, tracingConfig, (), (const)); }; class MockDecoderFilterCallback : public MockStreamFilterCallbacks { diff --git a/contrib/generic_proxy/filters/network/test/proxy_test.cc b/contrib/generic_proxy/filters/network/test/proxy_test.cc index cb93bf955510..e50deaaa4fec 100644 --- a/contrib/generic_proxy/filters/network/test/proxy_test.cc +++ b/contrib/generic_proxy/filters/network/test/proxy_test.cc @@ -2,6 +2,8 @@ #include #include +#include "source/common/tracing/tracer_manager_impl.h" + #include "test/mocks/server/factory_context.h" #include "test/test_common/registry.h" #include "test/test_common/utility.h" @@ -38,7 +40,22 @@ class MockRouteConfigProvider : public Rds::RouteConfigProvider { class FilterConfigTest : public testing::Test { public: - void initializeFilterConfig() { + void initializeFilterConfig(bool with_tracing = false) { + if (with_tracing) { + tracer_ = std::make_shared>(); + + const std::string tracing_config_yaml = R"EOF( + max_path_tag_length: 128 + )EOF"; + + Tracing::ConnectionManagerTracingConfigProto tracing_config; + + TestUtility::loadFromYaml(tracing_config_yaml, tracing_config); + + tracing_config_ = std::make_unique( + envoy::config::core::v3::TrafficDirection::OUTBOUND, tracing_config); + } + std::vector factories; for (const auto& filter : mock_stream_filters_) { @@ -63,11 +80,14 @@ class FilterConfigTest : public testing::Test { mock_route_entry_ = std::make_shared>(); - filter_config_ = - std::make_shared("test_prefix", std::move(codec_factory), - route_config_provider_, factories, factory_context_); + filter_config_ = std::make_shared( + "test_prefix", std::move(codec_factory), route_config_provider_, factories, tracer_, + std::move(tracing_config_), factory_context_); } + std::shared_ptr> tracer_; + Tracing::ConnectionManagerTracingConfigPtr tracing_config_; + std::shared_ptr filter_config_; NiceMock factory_context_; @@ -134,8 +154,8 @@ TEST_F(FilterConfigTest, CodecFactory) { class FilterTest : public FilterConfigTest { public: - void initializeFilter() { - FilterConfigTest::initializeFilterConfig(); + void initializeFilter(bool with_tracing = false) { + FilterConfigTest::initializeFilterConfig(with_tracing); auto encoder = std::make_unique>(); encoder_ = encoder.get(); @@ -683,6 +703,48 @@ TEST_F(FilterTest, NewStreamAndReplyNormallyWithDrainClose) { active_stream->upstreamResponse(std::move(response)); } +TEST_F(FilterTest, NewStreamAndReplyNormallyWithTracing) { + auto mock_decoder_filter_0 = std::make_shared>(); + mock_decoder_filters_ = {{"mock_0", mock_decoder_filter_0}}; + + initializeFilter(true); + + auto request = std::make_unique(); + + auto* span = new NiceMock(); + EXPECT_CALL(*tracer_, startSpan_(_, _, _, _)) + .WillOnce( + Invoke([&](const Tracing::Config& config, Tracing::TraceContext&, + const StreamInfo::StreamInfo&, const Tracing::Decision) -> Tracing::Span* { + EXPECT_EQ(Tracing::OperationName::Ingress, config.operationName()); + + return span; + })); + + filter_->newDownstreamRequest(std::move(request)); + EXPECT_EQ(1, filter_->activeStreamsForTest().size()); + + auto active_stream = filter_->activeStreamsForTest().begin()->get(); + + EXPECT_CALL(*span, setTag(_, _)).Times(testing::AnyNumber()); + EXPECT_CALL(*span, finishSpan()); + + EXPECT_CALL(filter_callbacks_.connection_, write(BufferStringEqual("test"), true)); + + EXPECT_CALL(*encoder_, encode(_, _)) + .WillOnce(Invoke([&](const Response&, ResponseEncoderCallback& callback) { + Buffer::OwnedImpl buffer; + buffer.add("test"); + callback.onEncodingSuccess(buffer, true); + })); + + EXPECT_CALL(factory_context_.drain_manager_, drainClose()).WillOnce(Return(false)); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)); + + auto response = std::make_unique(); + active_stream->upstreamResponse(std::move(response)); +} + } // namespace } // namespace GenericProxy } // namespace NetworkFilters From 1c280b4d7304b6100aa5b638513ec6e2b61071dc Mon Sep 17 00:00:00 2001 From: wbpcode Date: Tue, 7 Feb 2023 07:46:00 +0000 Subject: [PATCH 07/10] fix all unit tests Signed-off-by: wbpcode --- .../filters/network/source/config.cc | 4 +- .../filters/network/source/router/BUILD | 2 + .../filters/network/source/router/router.cc | 19 +- .../filters/network/source/router/router.h | 2 + .../filters/network/test/config_test.cc | 9 +- .../filters/network/test/mocks/filter.h | 2 + .../filters/network/test/proxy_test.cc | 6 +- .../network/test/router/router_test.cc | 339 ++++++++++++++---- 8 files changed, 296 insertions(+), 87 deletions(-) diff --git a/contrib/generic_proxy/filters/network/source/config.cc b/contrib/generic_proxy/filters/network/source/config.cc index f365633cf78d..9bbd71d133d8 100644 --- a/contrib/generic_proxy/filters/network/source/config.cc +++ b/contrib/generic_proxy/filters/network/source/config.cc @@ -120,8 +120,8 @@ Factory::createFilterFactoryFromProtoTyped(const ProxyConfig& proto_config, return; } - filter_manager.addReadFilter( - std::make_shared(config, context.mainThreadDispatcher().timeSource())); + filter_manager.addReadFilter(std::make_shared( + config, context.mainThreadDispatcher().timeSource(), context.runtime())); }; } diff --git a/contrib/generic_proxy/filters/network/source/router/BUILD b/contrib/generic_proxy/filters/network/source/router/BUILD index ab96fe79c269..9201fd5887c9 100644 --- a/contrib/generic_proxy/filters/network/source/router/BUILD +++ b/contrib/generic_proxy/filters/network/source/router/BUILD @@ -24,6 +24,8 @@ envoy_cc_library( "//source/common/buffer:buffer_lib", "//source/common/common:linked_object", "//source/common/common:minimal_logger_lib", + "//source/common/stream_info:stream_info_lib", + "//source/common/tracing:tracer_lib", "//source/common/upstream:load_balancer_lib", ], ) diff --git a/contrib/generic_proxy/filters/network/source/router/router.cc b/contrib/generic_proxy/filters/network/source/router/router.cc index 60a193c5b6b5..3480e0be6c7d 100644 --- a/contrib/generic_proxy/filters/network/source/router/router.cc +++ b/contrib/generic_proxy/filters/network/source/router/router.cc @@ -30,15 +30,12 @@ UpstreamRequest::UpstreamRequest(RouterFilter& parent, Upstream::TcpPoolData tcp parent_.callbacks_->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo()); stream_info_.healthCheck(parent_.callbacks_->streamInfo().healthCheck()); - absl::optional cluster_info = - parent_.callbacks_->streamInfo().upstreamClusterInfo(); - if (cluster_info.has_value()) { - stream_info_.setUpstreamClusterInfo(*cluster_info); - } + stream_info_.setUpstreamClusterInfo(parent_.cluster_); - if (auto tracing_config = parent_.callbacks_->tracingConfig(); tracing_config.has_value()) { + tracing_config_ = parent_.callbacks_->tracingConfig(); + if (tracing_config_.has_value()) { span_ = parent_.callbacks_->activeSpan().spawnChild( - tracing_config.value().get(), + tracing_config_.value().get(), absl::StrCat("router ", parent_.cluster_->observabilityName(), " egress"), parent.context_.mainThreadDispatcher().timeSource().systemTime()); } @@ -72,7 +69,7 @@ void UpstreamRequest::resetStream(StreamResetReason reason) { span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True); span_->setTag(Tracing::Tags::get().ErrorReason, resetReasonToStringView(reason)); Tracing::TracerUtility::finalizeSpan(*span_, *parent_.request_, stream_info_, - parent_.callbacks_->tracingConfig().value(), true); + tracing_config_.value().get(), true); } parent_.onUpstreamRequestReset(*this, reason); @@ -81,7 +78,7 @@ void UpstreamRequest::resetStream(StreamResetReason reason) { void UpstreamRequest::completeUpstreamRequest() { if (span_ != nullptr) { Tracing::TracerUtility::finalizeSpan(*span_, *parent_.request_, stream_info_, - parent_.callbacks_->tracingConfig().value(), true); + tracing_config_.value().get(), true); } response_complete_ = true; @@ -259,9 +256,9 @@ void RouterFilter::kickOffNewUpstreamRequest() { } cluster_ = thread_local_cluster->info(); + callbacks_->streamInfo().setUpstreamClusterInfo(cluster_); - auto cluster_info = thread_local_cluster->info(); - if (cluster_info->maintenanceMode()) { + if (cluster_->maintenanceMode()) { callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, "cluster_maintain_mode")); filter_complete_ = true; return; diff --git a/contrib/generic_proxy/filters/network/source/router/router.h b/contrib/generic_proxy/filters/network/source/router/router.h index 27d5932182aa..e16f38823ed9 100644 --- a/contrib/generic_proxy/filters/network/source/router/router.h +++ b/contrib/generic_proxy/filters/network/source/router/router.h @@ -91,6 +91,8 @@ class UpstreamRequest : public Tcp::ConnectionPool::Callbacks, bool expect_response_{}; StreamInfo::StreamInfoImpl stream_info_; + + OptRef tracing_config_; Tracing::SpanPtr span_; }; using UpstreamRequestPtr = std::unique_ptr; diff --git a/contrib/generic_proxy/filters/network/test/config_test.cc b/contrib/generic_proxy/filters/network/test/config_test.cc index 9550bd8f2860..0ec95762c944 100644 --- a/contrib/generic_proxy/filters/network/test/config_test.cc +++ b/contrib/generic_proxy/filters/network/test/config_test.cc @@ -357,24 +357,27 @@ TEST(BasicFilterConfigTest, TestConfigurationWithTracing) { collector_endpoint_version: HTTP_JSON )EOF"; - MockStreamCodecFactoryConfig codec_factory_config; + NiceMock codec_factory_config; Registry::InjectFactory registration(codec_factory_config); NiceMock factory_context; + factory_context.server_factory_context_.cluster_manager_.initializeClusters({"zipkin"}, {}); + factory_context.server_factory_context_.cluster_manager_.initializeThreadLocalClusters( + {"zipkin"}); Factory factory; envoy::extensions::filters::network::generic_proxy::v3::GenericProxy config; TestUtility::loadFromYaml(config_yaml, config); - auto mock_codec_factory = std::make_unique(); + auto mock_codec_factory = std::make_unique>(); EXPECT_CALL(codec_factory_config, createCodecFactory(_, _)) .WillOnce(Return(testing::ByMove(std::move(mock_codec_factory)))); Network::FilterFactoryCb cb = factory.createFilterFactoryFromProto(config, factory_context); EXPECT_NE(nullptr, cb); - Network::MockFilterManager filter_manager; + NiceMock filter_manager; cb(filter_manager); } diff --git a/contrib/generic_proxy/filters/network/test/mocks/filter.h b/contrib/generic_proxy/filters/network/test/mocks/filter.h index 8c684aed75c1..e15377878216 100644 --- a/contrib/generic_proxy/filters/network/test/mocks/filter.h +++ b/contrib/generic_proxy/filters/network/test/mocks/filter.h @@ -85,6 +85,8 @@ template class MockStreamFilterCallbacks : public Base { MOCK_METHOD(void, resetStream, ()); MOCK_METHOD(const RouteEntry*, routeEntry, (), (const)); MOCK_METHOD(const RouteSpecificFilterConfig*, perFilterConfig, (), (const)); + MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const)); + MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, ()); MOCK_METHOD(Tracing::Span&, activeSpan, ()); MOCK_METHOD(OptRef, tracingConfig, (), (const)); }; diff --git a/contrib/generic_proxy/filters/network/test/proxy_test.cc b/contrib/generic_proxy/filters/network/test/proxy_test.cc index e50deaaa4fec..9daf8b79c219 100644 --- a/contrib/generic_proxy/filters/network/test/proxy_test.cc +++ b/contrib/generic_proxy/filters/network/test/proxy_test.cc @@ -173,7 +173,8 @@ class FilterTest : public FilterConfigTest { .WillOnce( Invoke([this](RequestDecoderCallback& callback) { decoder_callback_ = &callback; })); - filter_ = std::make_shared(filter_config_); + filter_ = std::make_shared(filter_config_, factory_context_.time_system_, + factory_context_.runtime_loader_); EXPECT_EQ(filter_.get(), decoder_callback_); @@ -716,8 +717,7 @@ TEST_F(FilterTest, NewStreamAndReplyNormallyWithTracing) { .WillOnce( Invoke([&](const Tracing::Config& config, Tracing::TraceContext&, const StreamInfo::StreamInfo&, const Tracing::Decision) -> Tracing::Span* { - EXPECT_EQ(Tracing::OperationName::Ingress, config.operationName()); - + EXPECT_EQ(Tracing::OperationName::Egress, config.operationName()); return span; })); diff --git a/contrib/generic_proxy/filters/network/test/router/router_test.cc b/contrib/generic_proxy/filters/network/test/router/router_test.cc index 6c69b6b64509..ebeb45da37cd 100644 --- a/contrib/generic_proxy/filters/network/test/router/router_test.cc +++ b/contrib/generic_proxy/filters/network/test/router/router_test.cc @@ -1,3 +1,5 @@ +#include "source/common/tracing/common_values.h" + #include "test/mocks/server/factory_context.h" #include "test/test_common/registry.h" #include "test/test_common/utility.h" @@ -26,38 +28,66 @@ class RouterFilterTest : public testing::Test { RouterFilterTest() { filter_ = std::make_shared(factory_context_); filter_->setDecoderFilterCallbacks(mock_filter_callback_); + request_ = std::make_unique(); + + // Common mock calls. ON_CALL(mock_filter_callback_, dispatcher()).WillByDefault(ReturnRef(dispatcher_)); + ON_CALL(mock_filter_callback_, activeSpan()).WillByDefault(ReturnRef(active_span_)); + ON_CALL(mock_filter_callback_, downstreamCodec()).WillByDefault(ReturnRef(mock_codec_factory_)); + ON_CALL(mock_filter_callback_, streamInfo()).WillByDefault(ReturnRef(mock_stream_info_)); } - void kickOffNewUpstreamRequest(bool expect_response) { - NiceMock mock_route_entry; - - filter_->setRouteEntry(&mock_route_entry); + void kickOffNewUpstreamRequest(bool with_tracing) { + EXPECT_CALL(mock_filter_callback_, routeEntry()).WillOnce(Return(&mock_route_entry_)); const std::string cluster_name = "cluster_0"; - EXPECT_CALL(mock_route_entry, clusterName()).WillRepeatedly(ReturnRef(cluster_name)); - - Buffer::OwnedImpl test_buffer; - test_buffer.add("test"); - + EXPECT_CALL(mock_route_entry_, clusterName()).WillRepeatedly(ReturnRef(cluster_name)); factory_context_.cluster_manager_.initializeThreadLocalClusters({cluster_name}); EXPECT_CALL(factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_, newConnection(_)); - filter_->onEncodingSuccess(test_buffer, expect_response); + if (with_tracing) { + EXPECT_CALL(mock_filter_callback_, tracingConfig()) + .WillOnce(Return(OptRef{tracing_config_})); + EXPECT_CALL(active_span_, spawnChild_(_, "router observability_name egress", _)) + .WillOnce( + Invoke([&](const Tracing::Config&, const std::string&, SystemTime) -> Tracing::Span* { + child_span_ = new NiceMock(); + return child_span_; + })); + } else { + EXPECT_CALL(mock_filter_callback_, tracingConfig()) + .WillOnce(Return(OptRef{})); + } + + auto request_encoder = std::make_unique>(); + mock_request_encoder_ = request_encoder.get(); + EXPECT_CALL(mock_codec_factory_, requestEncoder()) + .WillOnce(Return(testing::ByMove(std::move(request_encoder)))); + + EXPECT_EQ(filter_->onStreamDecoded(*request_), FilterStatus::StopIteration); EXPECT_EQ(1, filter_->upstreamRequestsForTest().size()); } NiceMock factory_context_; + NiceMock dispatcher_; + NiceMock mock_filter_callback_; + NiceMock mock_stream_info_; + NiceMock mock_codec_factory_; - NiceMock dispatcher_; + NiceMock* mock_request_encoder_{}; NiceMock mock_route_entry_; std::shared_ptr filter_; + std::unique_ptr request_; + + NiceMock active_span_; + NiceMock* child_span_{}; + NiceMock tracing_config_; }; TEST_F(RouterFilterTest, OnStreamDecodedAndNoRouteEntry) { @@ -67,45 +97,17 @@ TEST_F(RouterFilterTest, OnStreamDecodedAndNoRouteEntry) { EXPECT_EQ(status.message(), "route_not_found"); })); - auto request = std::make_unique(); - - EXPECT_EQ(filter_->onStreamDecoded(*request), FilterStatus::StopIteration); -} - -TEST_F(RouterFilterTest, OnStreamDecodedWithRouteEntry) { - NiceMock mock_route_entry; - - EXPECT_CALL(mock_filter_callback_, routeEntry()).WillOnce(Return(&mock_route_entry)); - EXPECT_CALL(mock_filter_callback_, downstreamCodec()).WillOnce(ReturnRef(mock_codec_factory_)); - - auto mock_request_encoder = std::make_unique>(); - auto raw_mock_request_encoder = mock_request_encoder.get(); - - EXPECT_CALL(mock_codec_factory_, requestEncoder()) - .WillOnce(Return(ByMove(std::move(mock_request_encoder)))); - - auto request = std::make_unique(); - - EXPECT_CALL(*raw_mock_request_encoder, encode(_, _)) - .WillOnce(Invoke([&](const Request& rq, RequestEncoderCallback& cb) { - EXPECT_EQ(&rq, request.get()); - EXPECT_EQ(&cb, filter_.get()); - })); - - EXPECT_EQ(filter_->onStreamDecoded(*request), FilterStatus::StopIteration); + EXPECT_EQ(filter_->onStreamDecoded(*request_), FilterStatus::StopIteration); } -TEST_F(RouterFilterTest, OnUpstreamCluster) { - NiceMock mock_route_entry; +TEST_F(RouterFilterTest, OnStreamDecodedWithRouteEntry) { kickOffNewUpstreamRequest(false); } - filter_->setRouteEntry(&mock_route_entry); +TEST_F(RouterFilterTest, NoUpstreamCluster) { + EXPECT_CALL(mock_filter_callback_, routeEntry()).WillOnce(Return(&mock_route_entry_)); const std::string cluster_name = "cluster_0"; - EXPECT_CALL(mock_route_entry, clusterName()).WillRepeatedly(ReturnRef(cluster_name)); - - Buffer::OwnedImpl test_buffer; - test_buffer.add("test"); + EXPECT_CALL(mock_route_entry_, clusterName()).WillRepeatedly(ReturnRef(cluster_name)); // No upstream cluster. EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _)) @@ -113,20 +115,15 @@ TEST_F(RouterFilterTest, OnUpstreamCluster) { EXPECT_EQ(status.message(), "cluster_not_found"); })); - filter_->onEncodingSuccess(test_buffer, false); + filter_->onStreamDecoded(*request_); } TEST_F(RouterFilterTest, UpstreamClusterMaintainMode) { - NiceMock mock_route_entry; - - filter_->setRouteEntry(&mock_route_entry); + EXPECT_CALL(mock_filter_callback_, routeEntry()).WillOnce(Return(&mock_route_entry_)); const std::string cluster_name = "cluster_0"; - EXPECT_CALL(mock_route_entry, clusterName()).WillRepeatedly(ReturnRef(cluster_name)); - - Buffer::OwnedImpl test_buffer; - test_buffer.add("test"); + EXPECT_CALL(mock_route_entry_, clusterName()).WillRepeatedly(ReturnRef(cluster_name)); factory_context_.cluster_manager_.initializeThreadLocalClusters({cluster_name}); @@ -139,20 +136,15 @@ TEST_F(RouterFilterTest, UpstreamClusterMaintainMode) { EXPECT_EQ(status.message(), "cluster_maintain_mode"); })); - filter_->onEncodingSuccess(test_buffer, false); + filter_->onStreamDecoded(*request_); } TEST_F(RouterFilterTest, UpstreamClusterNoHealthyUpstream) { - NiceMock mock_route_entry; - - filter_->setRouteEntry(&mock_route_entry); + EXPECT_CALL(mock_filter_callback_, routeEntry()).WillOnce(Return(&mock_route_entry_)); const std::string cluster_name = "cluster_0"; - EXPECT_CALL(mock_route_entry, clusterName()).WillRepeatedly(ReturnRef(cluster_name)); - - Buffer::OwnedImpl test_buffer; - test_buffer.add("test"); + EXPECT_CALL(mock_route_entry_, clusterName()).WillRepeatedly(ReturnRef(cluster_name)); factory_context_.cluster_manager_.initializeThreadLocalClusters({cluster_name}); @@ -165,10 +157,12 @@ TEST_F(RouterFilterTest, UpstreamClusterNoHealthyUpstream) { EXPECT_EQ(status.message(), "no_healthy_upstream"); })); - filter_->onEncodingSuccess(test_buffer, false); + filter_->onStreamDecoded(*request_); } -TEST_F(RouterFilterTest, KickOffNormalUpstreamRequest) { kickOffNewUpstreamRequest(false); } +TEST_F(RouterFilterTest, KickOffNormalUpstreamRequestAndWithTracing) { + kickOffNewUpstreamRequest(true); +} TEST_F(RouterFilterTest, UpstreamStreamRequestWatermarkCheck) { kickOffNewUpstreamRequest(false); @@ -195,6 +189,30 @@ TEST_F(RouterFilterTest, UpstreamRequestResetBeforePoolCallback) { EXPECT_EQ(nullptr, upstream_request->conn_pool_handle_); } +TEST_F(RouterFilterTest, UpstreamRequestResetBeforePoolCallbackWithTracing) { + kickOffNewUpstreamRequest(true); + + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().Error, "true")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().ErrorReason, "local_reset")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().Component, "proxy")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().ResponseFlags, "-")); + EXPECT_CALL(*child_span_, finishSpan()); + + EXPECT_CALL( + factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.handles_.back(), + cancel(_)); + EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _)) + .WillOnce(Invoke([](Status status, ResponseUpdateFunction&&) { + EXPECT_EQ(status.message(), "local_reset"); + })); + + auto upstream_request = filter_->upstreamRequestsForTest().begin()->get(); + + filter_->upstreamRequestsForTest().begin()->get()->resetStream(StreamResetReason::LocalReset); + + EXPECT_EQ(nullptr, upstream_request->conn_pool_handle_); +} + TEST_F(RouterFilterTest, UpstreamRequestPoolFailureConnctionOverflow) { kickOffNewUpstreamRequest(false); @@ -207,6 +225,24 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolFailureConnctionOverflow) { Tcp::ConnectionPool::PoolFailureReason::Overflow); } +TEST_F(RouterFilterTest, UpstreamRequestPoolFailureConnctionOverflowWithTracing) { + kickOffNewUpstreamRequest(true); + + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().Error, "true")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().ErrorReason, "overflow")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().Component, "proxy")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().ResponseFlags, "-")); + EXPECT_CALL(*child_span_, finishSpan()); + + EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _)) + .WillOnce(Invoke([](Status status, ResponseUpdateFunction&&) { + EXPECT_EQ(status.message(), "overflow"); + })); + + factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolFailure( + Tcp::ConnectionPool::PoolFailureReason::Overflow); +} + TEST_F(RouterFilterTest, UpstreamRequestPoolFailureConnctionTimeout) { kickOffNewUpstreamRequest(false); @@ -219,6 +255,24 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolFailureConnctionTimeout) { Tcp::ConnectionPool::PoolFailureReason::Timeout); } +TEST_F(RouterFilterTest, UpstreamRequestPoolFailureConnctionTimeoutWithTracing) { + kickOffNewUpstreamRequest(true); + + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().Error, "true")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().ErrorReason, "connection_failure")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().Component, "proxy")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().ResponseFlags, "-")); + EXPECT_CALL(*child_span_, finishSpan()); + + EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _)) + .WillOnce(Invoke([](Status status, ResponseUpdateFunction&&) { + EXPECT_EQ(status.message(), "connection_failure"); + })); + + factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolFailure( + Tcp::ConnectionPool::PoolFailureReason::Timeout); +} + TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndExpectNoResponse) { kickOffNewUpstreamRequest(false); @@ -229,13 +283,21 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndExpectNoResponse) { EXPECT_CALL(mock_conn, write(_, _)); EXPECT_CALL(mock_filter_callback_, completeDirectly()); + EXPECT_CALL(*mock_request_encoder_, encode(_, _)) + .WillOnce(Invoke([&](const Request&, RequestEncoderCallback& callback) -> void { + Buffer::OwnedImpl buffer; + buffer.add("hello"); + // Expect no response. + callback.onEncodingSuccess(buffer, false); + })); + factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolReady(mock_conn); EXPECT_EQ(upstream_request->conn_data_, nullptr); EXPECT_EQ(upstream_request->conn_pool_handle_, nullptr); } -TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButConnectionErrorBeforeResponse) { +TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndExpectNoResponseWithTracing) { kickOffNewUpstreamRequest(true); auto upstream_request = filter_->upstreamRequestsForTest().begin()->get(); @@ -243,6 +305,43 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButConnectionErrorBeforeRespons NiceMock mock_conn; EXPECT_CALL(mock_conn, write(_, _)); + EXPECT_CALL(mock_filter_callback_, completeDirectly()); + + EXPECT_CALL(*mock_request_encoder_, encode(_, _)) + .WillOnce(Invoke([&](const Request&, RequestEncoderCallback& callback) -> void { + Buffer::OwnedImpl buffer; + buffer.add("hello"); + // Expect no response. + callback.onEncodingSuccess(buffer, false); + })); + + // Request complete directly. + EXPECT_CALL(*child_span_, injectContext(_, _)); + EXPECT_CALL(*child_span_, setTag(_, _)).Times(testing::AnyNumber()); + EXPECT_CALL(*child_span_, finishSpan()); + + factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolReady(mock_conn); + + EXPECT_EQ(upstream_request->conn_data_, nullptr); + EXPECT_EQ(upstream_request->conn_pool_handle_, nullptr); +} + +TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButConnectionErrorBeforeResponse) { + kickOffNewUpstreamRequest(false); + + auto upstream_request = filter_->upstreamRequestsForTest().begin()->get(); + + NiceMock mock_conn; + + EXPECT_CALL(mock_conn, write(_, _)); + + EXPECT_CALL(*mock_request_encoder_, encode(_, _)) + .WillOnce(Invoke([&](const Request&, RequestEncoderCallback& callback) -> void { + Buffer::OwnedImpl buffer; + buffer.add("hello"); + // Expect response. + callback.onEncodingSuccess(buffer, true); + })); factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolReady(mock_conn); @@ -262,7 +361,7 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButConnectionErrorBeforeRespons } TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButConnectionTerminationBeforeResponse) { - kickOffNewUpstreamRequest(true); + kickOffNewUpstreamRequest(false); auto upstream_request = filter_->upstreamRequestsForTest().begin()->get(); @@ -270,6 +369,14 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButConnectionTerminationBeforeR EXPECT_CALL(mock_conn, write(_, _)); + EXPECT_CALL(*mock_request_encoder_, encode(_, _)) + .WillOnce(Invoke([&](const Request&, RequestEncoderCallback& callback) -> void { + Buffer::OwnedImpl buffer; + buffer.add("hello"); + // Expect response. + callback.onEncodingSuccess(buffer, true); + })); + factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolReady(mock_conn); EXPECT_NE(upstream_request->conn_data_, nullptr); @@ -288,7 +395,7 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButConnectionTerminationBeforeR } TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButStreamDestroyBeforeResponse) { - kickOffNewUpstreamRequest(true); + kickOffNewUpstreamRequest(false); auto upstream_request = filter_->upstreamRequestsForTest().begin()->get(); @@ -296,6 +403,14 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButStreamDestroyBeforeResponse) EXPECT_CALL(mock_conn, write(_, _)); + EXPECT_CALL(*mock_request_encoder_, encode(_, _)) + .WillOnce(Invoke([&](const Request&, RequestEncoderCallback& callback) -> void { + Buffer::OwnedImpl buffer; + buffer.add("hello"); + // Expect response. + callback.onEncodingSuccess(buffer, true); + })); + factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolReady(mock_conn); EXPECT_NE(upstream_request->conn_data_, nullptr); @@ -310,7 +425,7 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButStreamDestroyBeforeResponse) } TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndResponse) { - kickOffNewUpstreamRequest(true); + kickOffNewUpstreamRequest(false); auto upstream_request = filter_->upstreamRequestsForTest().begin()->get(); @@ -318,6 +433,14 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndResponse) { EXPECT_CALL(mock_conn, write(_, _)); + EXPECT_CALL(*mock_request_encoder_, encode(_, _)) + .WillOnce(Invoke([&](const Request&, RequestEncoderCallback& callback) -> void { + Buffer::OwnedImpl buffer; + buffer.add("hello"); + // Expect response. + callback.onEncodingSuccess(buffer, true); + })); + factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolReady(mock_conn); EXPECT_NE(upstream_request->conn_data_, nullptr); @@ -356,7 +479,63 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndResponse) { upstream_request->onDecodingSuccess(std::move(response)); } -TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndEndStreamBeforeResponse) { +TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndResponseWithTracing) { + kickOffNewUpstreamRequest(true); + + auto upstream_request = filter_->upstreamRequestsForTest().begin()->get(); + + NiceMock mock_conn; + + EXPECT_CALL(mock_conn, write(_, _)); + + EXPECT_CALL(*mock_request_encoder_, encode(_, _)) + .WillOnce(Invoke([&](const Request&, RequestEncoderCallback& callback) -> void { + Buffer::OwnedImpl buffer; + buffer.add("hello"); + // Expect response. + callback.onEncodingSuccess(buffer, true); + })); + // Inject tracing context. + EXPECT_CALL(*child_span_, injectContext(_, _)); + + factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolReady(mock_conn); + + EXPECT_NE(upstream_request->conn_data_, nullptr); + EXPECT_EQ(upstream_request->conn_pool_handle_, nullptr); + + auto mock_response_decoder = std::make_unique>(); + auto raw_mock_response_decoder = mock_response_decoder.get(); + + EXPECT_CALL(mock_filter_callback_, downstreamCodec()).WillOnce(ReturnRef(mock_codec_factory_)); + EXPECT_CALL(mock_codec_factory_, responseDecoder()) + .WillOnce(Return(ByMove(std::move(mock_response_decoder)))); + EXPECT_CALL(*raw_mock_response_decoder, setDecoderCallback(_)) + .WillOnce(Invoke([&](ResponseDecoderCallback& cb) { EXPECT_EQ(upstream_request, &cb); })); + + Buffer::OwnedImpl test_buffer; + test_buffer.add("test_1"); + + EXPECT_CALL(*raw_mock_response_decoder, decode(BufferStringEqual("test_1"))) + .WillOnce(Invoke([&](Buffer::Instance& buffer) { buffer.drain(buffer.length()); })); + upstream_request->onUpstreamData(test_buffer, false); + + EXPECT_EQ(0, test_buffer.length()); + test_buffer.add("test_2"); + + EXPECT_CALL(*raw_mock_response_decoder, decode(BufferStringEqual("test_2"))) + .WillOnce(Invoke([&](Buffer::Instance& buffer) { buffer.drain(buffer.length()); })); + upstream_request->onUpstreamData(test_buffer, false); + + EXPECT_CALL(*child_span_, setTag(_, _)).Times(testing::AnyNumber()); + EXPECT_CALL(*child_span_, finishSpan()); + + EXPECT_CALL(mock_filter_callback_, upstreamResponse(_)); + + auto response = std::make_unique(); + upstream_request->onDecodingSuccess(std::move(response)); +} + +TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndEndStreamBeforeResponseWithTracing) { kickOffNewUpstreamRequest(true); auto upstream_request = filter_->upstreamRequestsForTest().begin()->get(); @@ -365,6 +544,16 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndEndStreamBeforeResponse) { EXPECT_CALL(mock_conn, write(_, _)); + EXPECT_CALL(*mock_request_encoder_, encode(_, _)) + .WillOnce(Invoke([&](const Request&, RequestEncoderCallback& callback) -> void { + Buffer::OwnedImpl buffer; + buffer.add("hello"); + // Expect response. + callback.onEncodingSuccess(buffer, true); + })); + // Inject tracing context. + EXPECT_CALL(*child_span_, injectContext(_, _)); + factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolReady(mock_conn); EXPECT_NE(upstream_request->conn_data_, nullptr); @@ -399,11 +588,17 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndEndStreamBeforeResponse) { EXPECT_EQ(status.message(), "protocol_error"); })); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().Error, "true")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().ErrorReason, "protocol_error")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().Component, "proxy")); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().ResponseFlags, "-")); + EXPECT_CALL(*child_span_, finishSpan()); + upstream_request->onUpstreamData(test_buffer, true); } TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndResponseDecodingFailure) { - kickOffNewUpstreamRequest(true); + kickOffNewUpstreamRequest(false); auto upstream_request = filter_->upstreamRequestsForTest().begin()->get(); @@ -411,6 +606,14 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndResponseDecodingFailure) { EXPECT_CALL(mock_conn, write(_, _)); + EXPECT_CALL(*mock_request_encoder_, encode(_, _)) + .WillOnce(Invoke([&](const Request&, RequestEncoderCallback& callback) -> void { + Buffer::OwnedImpl buffer; + buffer.add("hello"); + // Expect response. + callback.onEncodingSuccess(buffer, true); + })); + factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolReady(mock_conn); EXPECT_NE(upstream_request->conn_data_, nullptr); From f49499472670e8ae67157ec57cad9d919b043ee7 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Tue, 7 Feb 2023 11:58:00 +0000 Subject: [PATCH 08/10] fix test and possible crash Signed-off-by: wbpcode --- .../filters/network/source/proxy.cc | 2 +- .../filters/network/source/router/router.cc | 23 +++++++++++-------- .../filters/network/source/router/router.h | 4 ++++ 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/contrib/generic_proxy/filters/network/source/proxy.cc b/contrib/generic_proxy/filters/network/source/proxy.cc index bdb3da6dea61..f9e5b76aebb0 100644 --- a/contrib/generic_proxy/filters/network/source/proxy.cc +++ b/contrib/generic_proxy/filters/network/source/proxy.cc @@ -48,7 +48,7 @@ ActiveStream::ActiveStream(Filter& parent, RequestPtr request) if (decision.traced) { stream_info_.setTraceReason(decision.reason); } - active_span_ = tracer->startSpan(*this, *request, stream_info_, decision); + active_span_ = tracer->startSpan(*this, *downstream_request_stream_, stream_info_, decision); } Tracing::OperationName ActiveStream::operationName() const { diff --git a/contrib/generic_proxy/filters/network/source/router/router.cc b/contrib/generic_proxy/filters/network/source/router/router.cc index 3480e0be6c7d..a6c0bec0ac57 100644 --- a/contrib/generic_proxy/filters/network/source/router/router.cc +++ b/contrib/generic_proxy/filters/network/source/router/router.cc @@ -46,11 +46,14 @@ void UpstreamRequest::startStream() { conn_pool_handle_ = handle; } -// TODO(wbpcode): To support stream reset reason. void UpstreamRequest::resetStream(StreamResetReason reason) { - ENVOY_LOG(debug, "generic proxy upstream request: reset upstream request"); + if (stream_reset_) { + return; + } stream_reset_ = true; + ENVOY_LOG(debug, "generic proxy upstream request: reset upstream request"); + if (conn_pool_handle_) { ASSERT(!conn_data_); ENVOY_LOG(debug, "generic proxy upstream request: cacel upstream request"); @@ -182,14 +185,13 @@ void UpstreamRequest::encodeBufferToUpstream(Buffer::Instance& buffer) { } void RouterFilter::onUpstreamResponse(ResponsePtr response) { - // TODO(wbpcode): To support retry policy. - callbacks_->upstreamResponse(std::move(response)); filter_complete_ = true; + callbacks_->upstreamResponse(std::move(response)); } void RouterFilter::completeDirectly() { - callbacks_->completeDirectly(); filter_complete_ = true; + callbacks_->completeDirectly(); } void RouterFilter::onUpstreamRequestReset(UpstreamRequest& upstream_request, @@ -223,6 +225,11 @@ void RouterFilter::onDestroy() { } void RouterFilter::resetStream(StreamResetReason reason) { + if (filter_complete_) { + return; + } + filter_complete_ = true; + ASSERT(upstream_requests_.empty()); switch (reason) { case StreamResetReason::LocalReset: @@ -241,8 +248,6 @@ void RouterFilter::resetStream(StreamResetReason reason) { callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, resetReasonToStringView(reason))); break; } - - filter_complete_ = true; } void RouterFilter::kickOffNewUpstreamRequest() { @@ -250,8 +255,8 @@ void RouterFilter::kickOffNewUpstreamRequest() { auto thread_local_cluster = context_.clusterManager().getThreadLocalCluster(cluster_name); if (thread_local_cluster == nullptr) { - callbacks_->sendLocalReply(Status(StatusCode::kNotFound, "cluster_not_found")); filter_complete_ = true; + callbacks_->sendLocalReply(Status(StatusCode::kNotFound, "cluster_not_found")); return; } @@ -259,8 +264,8 @@ void RouterFilter::kickOffNewUpstreamRequest() { callbacks_->streamInfo().setUpstreamClusterInfo(cluster_); if (cluster_->maintenanceMode()) { - callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, "cluster_maintain_mode")); filter_complete_ = true; + callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, "cluster_maintain_mode")); return; } diff --git a/contrib/generic_proxy/filters/network/source/router/router.h b/contrib/generic_proxy/filters/network/source/router/router.h index e16f38823ed9..f8dadeec0541 100644 --- a/contrib/generic_proxy/filters/network/source/router/router.h +++ b/contrib/generic_proxy/filters/network/source/router/router.h @@ -127,6 +127,10 @@ class RouterFilter : public DecoderFilter, void kickOffNewUpstreamRequest(); void resetStream(StreamResetReason reason); + // Set filter_complete_ to true before any local or upstream response. Because the + // response processing may complete and destroy the L7 filter chain directly and cause the + // onDestory() of RouterFilter to be called. The filter_complete_ will be used to block + // unnecessary clearUpstreamRequests() in the onDestory() of RouterFilter. bool filter_complete_{}; const RouteEntry* route_entry_{}; From 13eabb4b546c9929f28df6aa338ef3f566dc128b Mon Sep 17 00:00:00 2001 From: wbpcode Date: Wed, 8 Feb 2023 05:40:00 +0000 Subject: [PATCH 09/10] minor update Signed-off-by: wbpcode --- .../filters/network/source/router/router.cc | 21 +++++++++--- .../filters/network/source/router/router.h | 6 ++-- .../network/test/codecs/dubbo/config_test.cc | 11 ++++--- .../network/test/router/router_test.cc | 32 +++++++++++++------ 4 files changed, 50 insertions(+), 20 deletions(-) diff --git a/contrib/generic_proxy/filters/network/source/router/router.cc b/contrib/generic_proxy/filters/network/source/router/router.cc index a6c0bec0ac57..4d464276e948 100644 --- a/contrib/generic_proxy/filters/network/source/router/router.cc +++ b/contrib/generic_proxy/filters/network/source/router/router.cc @@ -75,6 +75,10 @@ void UpstreamRequest::resetStream(StreamResetReason reason) { tracing_config_.value().get(), true); } + // Remove this stream form the parent's list because this upstream request is reset. + deferredDelete(); + + // Notify the parent filter that the upstream request has been reset. parent_.onUpstreamRequestReset(*this, reason); } @@ -88,6 +92,17 @@ void UpstreamRequest::completeUpstreamRequest() { ASSERT(conn_pool_handle_ == nullptr); ASSERT(conn_data_ != nullptr); conn_data_.reset(); + + // Remove this stream form the parent's list because this upstream request is complete. + deferredDelete(); +} + +void UpstreamRequest::deferredDelete() { + if (inserted()) { + // Remove this stream from the parent's list of upstream requests and delete it at + // next event loop iteration. + parent_.callbacks_->dispatcher().deferredDelete(removeFromList(parent_.upstream_requests_)); + } } void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl::string_view, @@ -194,11 +209,7 @@ void RouterFilter::completeDirectly() { callbacks_->completeDirectly(); } -void RouterFilter::onUpstreamRequestReset(UpstreamRequest& upstream_request, - StreamResetReason reason) { - // Remove upstream request from router filter and move it to the deferred-delete list. - callbacks_->dispatcher().deferredDelete(upstream_request.removeFromList(upstream_requests_)); - +void RouterFilter::onUpstreamRequestReset(UpstreamRequest&, StreamResetReason reason) { if (filter_complete_) { return; } diff --git a/contrib/generic_proxy/filters/network/source/router/router.h b/contrib/generic_proxy/filters/network/source/router/router.h index f8dadeec0541..48a720a88251 100644 --- a/contrib/generic_proxy/filters/network/source/router/router.h +++ b/contrib/generic_proxy/filters/network/source/router/router.h @@ -47,6 +47,10 @@ class UpstreamRequest : public Tcp::ConnectionPool::Callbacks, void startStream(); void resetStream(StreamResetReason reason); + void completeUpstreamRequest(); + + // Called when the stream has been reset or completed. + void deferredDelete(); // Tcp::ConnectionPool::Callbacks void onPoolFailure(ConnectionPool::PoolFailureReason reason, @@ -71,8 +75,6 @@ class UpstreamRequest : public Tcp::ConnectionPool::Callbacks, void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host); void encodeBufferToUpstream(Buffer::Instance& buffer); - void completeUpstreamRequest(); - bool stream_reset_{}; RouterFilter& parent_; diff --git a/contrib/generic_proxy/filters/network/test/codecs/dubbo/config_test.cc b/contrib/generic_proxy/filters/network/test/codecs/dubbo/config_test.cc index 1dada6111694..f684c6b4c93c 100644 --- a/contrib/generic_proxy/filters/network/test/codecs/dubbo/config_test.cc +++ b/contrib/generic_proxy/filters/network/test/codecs/dubbo/config_test.cc @@ -269,8 +269,8 @@ TEST(ResponseDecoderTest, ResponseDecoderTest) { auto codec = std::make_unique(); codec->initilize(std::make_unique()); - MockRequestDecoderCallback callback; - DubboRequestDecoder decoder(std::move(codec)); + MockResponseDecoderCallback callback; + DubboResponseDecoder decoder(std::move(codec)); decoder.setDecoderCallback(callback); auto raw_serializer = const_cast( @@ -306,7 +306,7 @@ TEST(ResponseDecoderTest, ResponseDecoderTest) { decoder.decode(buffer); } - // Decode request. + // Decode response. { Buffer::OwnedImpl buffer; buffer.add(std::string({'\xda', '\xbb', '\x02', 20})); @@ -314,8 +314,11 @@ TEST(ResponseDecoderTest, ResponseDecoderTest) { buffer.writeBEInt(8); buffer.add("anything"); + auto response = std::make_unique(); + response->setResponseType(RpcResponseType::ResponseWithValue); + EXPECT_CALL(*raw_serializer, deserializeRpcResponse(_, _)) - .WillOnce(Return(ByMove(std::make_unique()))); + .WillOnce(Return(ByMove(std::move(response)))); EXPECT_CALL(callback, onDecodingSuccess(_)); decoder.decode(buffer); diff --git a/contrib/generic_proxy/filters/network/test/router/router_test.cc b/contrib/generic_proxy/filters/network/test/router/router_test.cc index ebeb45da37cd..bc501b1f5425 100644 --- a/contrib/generic_proxy/filters/network/test/router/router_test.cc +++ b/contrib/generic_proxy/filters/network/test/router/router_test.cc @@ -178,7 +178,8 @@ TEST_F(RouterFilterTest, UpstreamRequestResetBeforePoolCallback) { factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.handles_.back(), cancel(_)); EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _)) - .WillOnce(Invoke([](Status status, ResponseUpdateFunction&&) { + .WillOnce(Invoke([this](Status status, ResponseUpdateFunction&&) { + EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); EXPECT_EQ(status.message(), "local_reset"); })); @@ -202,7 +203,8 @@ TEST_F(RouterFilterTest, UpstreamRequestResetBeforePoolCallbackWithTracing) { factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.handles_.back(), cancel(_)); EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _)) - .WillOnce(Invoke([](Status status, ResponseUpdateFunction&&) { + .WillOnce(Invoke([this](Status status, ResponseUpdateFunction&&) { + EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); EXPECT_EQ(status.message(), "local_reset"); })); @@ -305,7 +307,9 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndExpectNoResponseWithTracing) NiceMock mock_conn; EXPECT_CALL(mock_conn, write(_, _)); - EXPECT_CALL(mock_filter_callback_, completeDirectly()); + EXPECT_CALL(mock_filter_callback_, completeDirectly()).WillOnce(Invoke([this]() -> void { + EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); + })); EXPECT_CALL(*mock_request_encoder_, encode(_, _)) .WillOnce(Invoke([&](const Request&, RequestEncoderCallback& callback) -> void { @@ -349,7 +353,8 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButConnectionErrorBeforeRespons EXPECT_EQ(upstream_request->conn_pool_handle_, nullptr); EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _)) - .WillOnce(Invoke([](Status status, ResponseUpdateFunction&&) { + .WillOnce(Invoke([this](Status status, ResponseUpdateFunction&&) { + EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); EXPECT_EQ(status.message(), "local_reset"); })); @@ -383,7 +388,8 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyButConnectionTerminationBeforeR EXPECT_EQ(upstream_request->conn_pool_handle_, nullptr); EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _)) - .WillOnce(Invoke([](Status status, ResponseUpdateFunction&&) { + .WillOnce(Invoke([this](Status status, ResponseUpdateFunction&&) { + EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); EXPECT_EQ(status.message(), "connection_termination"); })); @@ -474,7 +480,10 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndResponse) { auto response = std::make_unique(); - EXPECT_CALL(mock_filter_callback_, upstreamResponse(_)); + EXPECT_CALL(mock_filter_callback_, upstreamResponse(_)).WillOnce(Invoke([this](ResponsePtr) { + // When the response is sent to callback, the upstream request should be removed. + EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); + })); upstream_request->onDecodingSuccess(std::move(response)); } @@ -529,7 +538,10 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndResponseWithTracing) { EXPECT_CALL(*child_span_, setTag(_, _)).Times(testing::AnyNumber()); EXPECT_CALL(*child_span_, finishSpan()); - EXPECT_CALL(mock_filter_callback_, upstreamResponse(_)); + EXPECT_CALL(mock_filter_callback_, upstreamResponse(_)).WillOnce(Invoke([this](ResponsePtr) { + // When the response is sent to callback, the upstream request should be removed. + EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); + })); auto response = std::make_unique(); upstream_request->onDecodingSuccess(std::move(response)); @@ -584,7 +596,8 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndEndStreamBeforeResponseWithT .WillOnce(Invoke([&](Buffer::Instance& buffer) { buffer.drain(buffer.length()); })); EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _)) - .WillOnce(Invoke([](Status status, ResponseUpdateFunction&&) { + .WillOnce(Invoke([this](Status status, ResponseUpdateFunction&&) { + EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); EXPECT_EQ(status.message(), "protocol_error"); })); @@ -646,7 +659,8 @@ TEST_F(RouterFilterTest, UpstreamRequestPoolReadyAndResponseDecodingFailure) { upstream_request->onUpstreamData(test_buffer, false); EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _)) - .WillOnce(Invoke([](Status status, ResponseUpdateFunction&&) { + .WillOnce(Invoke([this](Status status, ResponseUpdateFunction&&) { + EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); EXPECT_EQ(status.message(), "protocol_error"); })); From 6af68ccafdaf8956bbbd6d03a4fe8a5a81ee19a1 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Tue, 14 Feb 2023 02:55:29 +0000 Subject: [PATCH 10/10] add release note Signed-off-by: wbpcode --- changelogs/current.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 00f3102ddb82..3defb0bb79a1 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -102,5 +102,8 @@ new_features: - area: sni_dynamic_forward_proxy change: | added an option to dynamically set the host used by the SNI dynamic forward proxy filter, by setting a filter state object under the key ``envoy.upstream.dynamic_host``. +- area: generic_proxy + change: | + added :ref:`tracing support ` for the generic proxy. deprecated: