From 23e5fc2939e1782416155509edfca323150e8a59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20M=C4=85ka?= <62388446+michalmaka@users.noreply.github.com> Date: Sat, 20 Nov 2021 00:00:33 +0100 Subject: [PATCH] udp_proxy: added per packet load balancing possibility (#18605) Signed-off-by: Michal Maka --- .../filters/udp/udp_proxy/v3/udp_proxy.proto | 7 +- .../listeners/udp_filters/udp_proxy.rst | 15 +- docs/root/version_history/current.rst | 1 + .../filters/udp/udp_proxy/udp_proxy_filter.cc | 152 +++++++++++----- .../filters/udp/udp_proxy/udp_proxy_filter.h | 92 ++++++++-- .../udp/udp_proxy/udp_proxy_filter_test.cc | 171 ++++++++++++++++++ tools/spelling/spelling_dictionary.txt | 1 + 7 files changed, 377 insertions(+), 62 deletions(-) diff --git a/api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto b/api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto index 9d410e28afe3..81d2bf54d494 100644 --- a/api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto +++ b/api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto @@ -20,7 +20,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // [#extension: envoy.filters.udp_listener.udp_proxy] // Configuration for the UDP proxy filter. -// [#next-free-field: 7] +// [#next-free-field: 8] message UdpProxyConfig { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.udp.udp_proxy.v2alpha.UdpProxyConfig"; @@ -82,4 +82,9 @@ message UdpProxyConfig { // :ref:`prefer_gro ` is true for upstream // sockets as the assumption is datagrams will be received from a single source. config.core.v3.UdpSocketConfig upstream_socket_config = 6; + + // Perform per packet load balancing (upstream host selection) on each received data chunk. + // The default if not specified is false, that means each data chunk is forwarded + // to upstream host selected on first chunk receival for that "session" (identified by source IP/port and local IP/port). + bool use_per_packet_load_balancing = 7; } diff --git a/docs/root/configuration/listeners/udp_filters/udp_proxy.rst b/docs/root/configuration/listeners/udp_filters/udp_proxy.rst index 0c731c7e297c..44250b0e0bef 100644 --- a/docs/root/configuration/listeners/udp_filters/udp_proxy.rst +++ b/docs/root/configuration/listeners/udp_filters/udp_proxy.rst @@ -20,17 +20,26 @@ Each session is index by the 4-tuple consisting of source IP/port and local IP/p datagram is received on. Sessions last until the :ref:`idle timeout ` is reached. +Above *session stickness* could be disabled by setting :ref:`use_per_packet_load_balancing +` to true. +In that case, *per packet load balancing* is enabled. It means that upstream host is selected on every single data chunk +received by udp proxy using currently used load balancing policy. + The UDP proxy listener filter also can operate as a *transparent* proxy if the :ref:`use_original_src_ip ` -field is set. But please keep in mind that it does not forward the port to upstreams. It forwards only the IP address to upstreams. +field is set to true. But please keep in mind that it does not forward the port to upstreams. It forwards only the IP address to upstreams. Load balancing and unhealthy host handling ------------------------------------------ Envoy will fully utilize the configured load balancer for the configured upstream cluster when -load balancing UDP datagrams. When a new session is created, Envoy will associate the session +load balancing UDP datagrams. By default, when a new session is created, Envoy will associate the session with an upstream host selected using the configured load balancer. All future datagrams that -belong to the session will be routed to the same upstream host. +belong to the session will be routed to the same upstream host. However, if :ref:`use_per_packet_load_balancing +` +field is set to true, Envoy selects another upstream host on next datagram using the configured load balancer +and creates a new session if such does not exist. So in case of several upstream hosts available for the load balancer +each data chunk is forwarded to a different host. When an upstream host becomes unhealthy (due to :ref:`active health checking `), Envoy will attempt to create a new session to a healthy host diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 71f0ae1b6068..1ffd6ac11714 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -84,6 +84,7 @@ New Features * tls_inspector filter: added :ref:`enable_ja3_fingerprinting ` to create JA3 fingerprint hash from Client Hello message. * transport_socket: added :ref:`envoy.transport_sockets.tcp_stats ` which generates additional statistics gathered from the OS TCP stack. * udp: add support for multiple listener filters. +* udp_proxy: added :ref:`use_per_packet_load_balancing ` option to enable per packet load balancing (selection of upstream host on each data chunk). * upstream: added the ability to :ref:`configure max connection duration ` for upstream clusters. * vcl_socket_interface: added VCL socket interface extension for fd.io VPP integration to :ref:`contrib images `. This can be enabled via :ref:`VCL ` configuration. * xds: re-introduced unified delta and sotw xDS multiplexers that share most of the implementation. Added a new runtime config ``envoy.reloadable_features.unified_mux`` (disabled by default) that when enabled, switches xDS to use unified multiplexers. diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index 9be3f1003a37..d67e52e6b410 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -27,8 +27,13 @@ void UdpProxyFilter::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) } ENVOY_LOG(debug, "udp proxy: attaching to cluster {}", cluster.info()->name()); - ASSERT(cluster_info_ == absl::nullopt || &cluster_info_.value().cluster_ != &cluster); - cluster_info_.emplace(*this, cluster); + ASSERT(cluster_info_ == absl::nullopt || &cluster_info_.value()->cluster_ != &cluster); + + if (config_->usingPerPacketLoadBalancing()) { + cluster_info_.emplace(std::make_unique(*this, cluster)); + } else { + cluster_info_.emplace(std::make_unique(*this, cluster)); + } } void UdpProxyFilter::onClusterRemoval(const std::string& cluster) { @@ -46,7 +51,7 @@ Network::FilterStatus UdpProxyFilter::onData(Network::UdpRecvData& data) { return Network::FilterStatus::StopIteration; } - return cluster_info_.value().onData(data); + return cluster_info_.value()->onData(data); } Network::FilterStatus UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode) { @@ -56,9 +61,10 @@ Network::FilterStatus UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode) } UdpProxyFilter::ClusterInfo::ClusterInfo(UdpProxyFilter& filter, - Upstream::ThreadLocalCluster& cluster) + Upstream::ThreadLocalCluster& cluster, + SessionStorageType&& sessions) : filter_(filter), cluster_(cluster), - cluster_stats_(generateStats(cluster.info()->statsScope())), + cluster_stats_(generateStats(cluster.info()->statsScope())), sessions_(std::move(sessions)), member_update_cb_handle_(cluster.prioritySet().addMemberUpdateCb( [this](const Upstream::HostVector&, const Upstream::HostVector& hosts_removed) { for (const auto& host : hosts_removed) { @@ -85,36 +91,84 @@ UdpProxyFilter::ClusterInfo::~ClusterInfo() { ASSERT(host_to_sessions_.empty()); } -Network::FilterStatus UdpProxyFilter::ClusterInfo::onData(Network::UdpRecvData& data) { +void UdpProxyFilter::ClusterInfo::removeSession(const ActiveSession* session) { + // First remove from the host to sessions map. + ASSERT(host_to_sessions_[&session->host()].count(session) == 1); + auto host_sessions_it = host_to_sessions_.find(&session->host()); + host_sessions_it->second.erase(session); + if (host_sessions_it->second.empty()) { + host_to_sessions_.erase(host_sessions_it); + } + + // Now remove it from the primary map. + ASSERT(sessions_.count(session) == 1); + sessions_.erase(session); +} + +UdpProxyFilter::ActiveSession* +UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses, + const Upstream::HostConstSharedPtr& optional_host) { + if (!cluster_.info() + ->resourceManager(Upstream::ResourcePriority::Default) + .connections() + .canCreate()) { + ENVOY_LOG(debug, "cannot create new connection."); + cluster_.info()->stats().upstream_cx_overflow_.inc(); + return nullptr; + } + + if (optional_host) { + return createSessionWithHost(std::move(addresses), optional_host); + } + + auto host = chooseHost(addresses.peer_); + if (host == nullptr) { + ENVOY_LOG(debug, "cannot find any valid host."); + cluster_.info()->stats().upstream_cx_none_healthy_.inc(); + return nullptr; + } + return createSessionWithHost(std::move(addresses), host); +} + +UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSessionWithHost( + Network::UdpRecvData::LocalPeerAddresses&& addresses, + const Upstream::HostConstSharedPtr& host) { + ASSERT(host); + auto new_session = std::make_unique(*this, std::move(addresses), host); + auto new_session_ptr = new_session.get(); + sessions_.emplace(std::move(new_session)); + host_to_sessions_[host.get()].emplace(new_session_ptr); + return new_session_ptr; +} + +Upstream::HostConstSharedPtr UdpProxyFilter::ClusterInfo::chooseHost( + const Network::Address::InstanceConstSharedPtr& peer_address) const { + UdpLoadBalancerContext context(filter_.config_->hashPolicy(), peer_address); + Upstream::HostConstSharedPtr host = cluster_.loadBalancer().chooseHost(&context); + return host; +} + +UdpProxyFilter::StickySessionClusterInfo::StickySessionClusterInfo( + UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster) + : ClusterInfo(filter, cluster, + SessionStorageType(1, HeterogeneousActiveSessionHash(false), + HeterogeneousActiveSessionEqual(false))) {} + +Network::FilterStatus UdpProxyFilter::StickySessionClusterInfo::onData(Network::UdpRecvData& data) { const auto active_session_it = sessions_.find(data.addresses_); ActiveSession* active_session; if (active_session_it == sessions_.end()) { - if (!cluster_.info() - ->resourceManager(Upstream::ResourcePriority::Default) - .connections() - .canCreate()) { - cluster_.info()->stats().upstream_cx_overflow_.inc(); + active_session = createSession(std::move(data.addresses_)); + if (active_session == nullptr) { return Network::FilterStatus::StopIteration; } - - UdpLoadBalancerContext context(filter_.config_->hashPolicy(), data.addresses_.peer_); - Upstream::HostConstSharedPtr host = cluster_.loadBalancer().chooseHost(&context); - if (host == nullptr) { - ENVOY_LOG(debug, "cannot find any valid host. failed to create a session."); - cluster_.info()->stats().upstream_cx_none_healthy_.inc(); - return Network::FilterStatus::StopIteration; - } - - active_session = createSession(std::move(data.addresses_), host); } else { active_session = active_session_it->get(); if (active_session->host().health() == Upstream::Host::Health::Unhealthy) { // If a host becomes unhealthy, we optimally would like to replace it with a new session // to a healthy host. We may eventually want to make this behavior configurable, but for now // this will be the universal behavior. - - UdpLoadBalancerContext context(filter_.config_->hashPolicy(), data.addresses_.peer_); - Upstream::HostConstSharedPtr host = cluster_.loadBalancer().chooseHost(&context); + auto host = chooseHost(data.addresses_.peer_); if (host != nullptr && host->health() != Upstream::Host::Health::Unhealthy && host.get() != &active_session->host()) { ENVOY_LOG(debug, "upstream session unhealthy, recreating the session"); @@ -132,28 +186,40 @@ Network::FilterStatus UdpProxyFilter::ClusterInfo::onData(Network::UdpRecvData& return Network::FilterStatus::StopIteration; } -UdpProxyFilter::ActiveSession* -UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses, - const Upstream::HostConstSharedPtr& host) { - auto new_session = std::make_unique(*this, std::move(addresses), host); - auto new_session_ptr = new_session.get(); - sessions_.emplace(std::move(new_session)); - host_to_sessions_[host.get()].emplace(new_session_ptr); - return new_session_ptr; -} +UdpProxyFilter::PerPacketLoadBalancingClusterInfo::PerPacketLoadBalancingClusterInfo( + UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster) + : ClusterInfo(filter, cluster, + SessionStorageType(1, HeterogeneousActiveSessionHash(true), + HeterogeneousActiveSessionEqual(true))) {} + +Network::FilterStatus +UdpProxyFilter::PerPacketLoadBalancingClusterInfo::onData(Network::UdpRecvData& data) { + auto host = chooseHost(data.addresses_.peer_); + if (host == nullptr) { + ENVOY_LOG(debug, "cannot find any valid host."); + cluster_.info()->stats().upstream_cx_none_healthy_.inc(); + return Network::FilterStatus::StopIteration; + } -void UdpProxyFilter::ClusterInfo::removeSession(const ActiveSession* session) { - // First remove from the host to sessions map. - ASSERT(host_to_sessions_[&session->host()].count(session) == 1); - auto host_sessions_it = host_to_sessions_.find(&session->host()); - host_sessions_it->second.erase(session); - if (host_sessions_it->second.empty()) { - host_to_sessions_.erase(host_sessions_it); + ENVOY_LOG(debug, "selected {} host as upstream.", host->address()->asStringView()); + + LocalPeerHostAddresses key{data.addresses_, *host}; + const auto active_session_it = sessions_.find(key); + ActiveSession* active_session; + if (active_session_it == sessions_.end()) { + active_session = createSession(std::move(data.addresses_), host); + if (active_session == nullptr) { + return Network::FilterStatus::StopIteration; + } + } else { + active_session = active_session_it->get(); + ENVOY_LOG(trace, "found already existing session on host {}.", + active_session->host().address()->asStringView()); } - // Now remove it from the primary map. - ASSERT(sessions_.count(session) == 1); - sessions_.erase(session); + active_session->write(*data.buffer_); + + return Network::FilterStatus::StopIteration; } UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster, diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h index 0d349fe3fd25..99c9708543e5 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -70,6 +70,7 @@ class UdpProxyFilterConfig { : cluster_manager_(cluster_manager), time_source_(time_source), cluster_(config.cluster()), session_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, idle_timeout, 60 * 1000)), use_original_src_ip_(config.use_original_src_ip()), + use_per_packet_load_balancing_(config.use_per_packet_load_balancing()), stats_(generateStats(config.stat_prefix(), root_scope)), // Default prefer_gro to true for upstream client traffic. upstream_socket_config_(config.upstream_socket_config(), true) { @@ -87,6 +88,7 @@ class UdpProxyFilterConfig { Upstream::ClusterManager& clusterManager() const { return cluster_manager_; } std::chrono::milliseconds sessionTimeout() const { return session_timeout_; } bool usingOriginalSrcIp() const { return use_original_src_ip_; } + bool usingPerPacketLoadBalancing() const { return use_per_packet_load_balancing_; } const Udp::HashPolicy* hashPolicy() const { return hash_policy_.get(); } UdpProxyDownstreamStats& stats() const { return stats_; } TimeSource& timeSource() const { return time_source_; } @@ -107,6 +109,7 @@ class UdpProxyFilterConfig { const std::string cluster_; const std::chrono::milliseconds session_timeout_; const bool use_original_src_ip_; + const bool use_per_packet_load_balancing_; std::unique_ptr hash_policy_; mutable UdpProxyDownstreamStats stats_; const Network::ResolvedUdpSocketConfig upstream_socket_config_; @@ -200,6 +203,11 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, using ActiveSessionPtr = std::unique_ptr; + struct LocalPeerHostAddresses { + const Network::UdpRecvData::LocalPeerAddresses& local_peer_addresses_; + const Upstream::Host& host_; + }; + struct HeterogeneousActiveSessionHash { // Specifying is_transparent indicates to the library infrastructure that // type-conversions should not be applied when calling find(), but instead @@ -210,31 +218,52 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, // using it in the context of absl. using is_transparent = void; // NOLINT(readability-identifier-naming) + HeterogeneousActiveSessionHash(const bool consider_host) : consider_host_(consider_host) {} + size_t operator()(const Network::UdpRecvData::LocalPeerAddresses& value) const { return absl::Hash()(value); } - size_t operator()(const ActiveSessionPtr& value) const { - return absl::Hash()(value->addresses()); + size_t operator()(const LocalPeerHostAddresses& value) const { + auto hash = this->operator()(value.local_peer_addresses_); + if (consider_host_) { + hash = absl::HashOf(hash, value.host_.address()->asStringView()); + } + return hash; } size_t operator()(const ActiveSession* value) const { - return absl::Hash()(value->addresses()); + LocalPeerHostAddresses key{value->addresses(), value->host()}; + return this->operator()(key); } + size_t operator()(const ActiveSessionPtr& value) const { return this->operator()(value.get()); } + + private: + const bool consider_host_; }; struct HeterogeneousActiveSessionEqual { // See description for HeterogeneousActiveSessionHash::is_transparent. using is_transparent = void; // NOLINT(readability-identifier-naming) + HeterogeneousActiveSessionEqual(const bool consider_host) : consider_host_(consider_host) {} + bool operator()(const ActiveSessionPtr& lhs, const Network::UdpRecvData::LocalPeerAddresses& rhs) const { return lhs->addresses() == rhs; } - bool operator()(const ActiveSessionPtr& lhs, const ActiveSessionPtr& rhs) const { - return lhs->addresses() == rhs->addresses(); + bool operator()(const ActiveSessionPtr& lhs, const LocalPeerHostAddresses& rhs) const { + return this->operator()(lhs, rhs.local_peer_addresses_) && + (consider_host_ ? &lhs->host() == &rhs.host_ : true); } bool operator()(const ActiveSessionPtr& lhs, const ActiveSession* rhs) const { - return lhs->addresses() == rhs->addresses(); + LocalPeerHostAddresses key{rhs->addresses(), rhs->host()}; + return this->operator()(lhs, key); } + bool operator()(const ActiveSessionPtr& lhs, const ActiveSessionPtr& rhs) const { + return this->operator()(lhs, rhs.get()); + } + + private: + const bool consider_host_; }; /** @@ -242,32 +271,65 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, * we will very likely support different types of routing to multiple upstream clusters. */ class ClusterInfo { + protected: + using SessionStorageType = absl::flat_hash_set; + public: - ClusterInfo(UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster); - ~ClusterInfo(); - Network::FilterStatus onData(Network::UdpRecvData& data); + ClusterInfo(UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster, + SessionStorageType&& sessions); + virtual ~ClusterInfo(); + virtual Network::FilterStatus onData(Network::UdpRecvData& data) PURE; void removeSession(const ActiveSession* session); UdpProxyFilter& filter_; Upstream::ThreadLocalCluster& cluster_; UdpProxyUpstreamStats cluster_stats_; - private: + protected: ActiveSession* createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses, - const Upstream::HostConstSharedPtr& host); + const Upstream::HostConstSharedPtr& optional_host = nullptr); + Upstream::HostConstSharedPtr + chooseHost(const Network::Address::InstanceConstSharedPtr& peer_address) const; + + SessionStorageType sessions_; + + private: static UdpProxyUpstreamStats generateStats(Stats::Scope& scope) { const auto final_prefix = "udp"; return {ALL_UDP_PROXY_UPSTREAM_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))}; } + ActiveSession* createSessionWithHost(Network::UdpRecvData::LocalPeerAddresses&& addresses, + const Upstream::HostConstSharedPtr& host); Envoy::Common::CallbackHandlePtr member_update_cb_handle_; - absl::flat_hash_set - sessions_; absl::flat_hash_map> host_to_sessions_; }; + using ClusterInfoPtr = std::unique_ptr; + + /** + * Performs forwarding and replying data to one upstream host, selected when the first datagram + * for a session is received. If the upstream host becomes unhealthy, a new one is selected. + */ + class StickySessionClusterInfo : public ClusterInfo { + public: + StickySessionClusterInfo(UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster); + Network::FilterStatus onData(Network::UdpRecvData& data) override; + }; + + /** + * On each data chunk selects another host using underlying load balancing method and communicates + * with that host. + */ + class PerPacketLoadBalancingClusterInfo : public ClusterInfo { + public: + PerPacketLoadBalancingClusterInfo(UdpProxyFilter& filter, + Upstream::ThreadLocalCluster& cluster); + Network::FilterStatus onData(Network::UdpRecvData& data) override; + }; + virtual Network::SocketPtr createSocket(const Upstream::HostConstSharedPtr& host) { // Virtual so this can be overridden in unit tests. return std::make_unique(Network::Socket::Type::Datagram, host->address(), @@ -283,7 +345,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, // Right now we support a single cluster to route to. It is highly likely in the future that // we will support additional routing options either using filter chain matching, weighting, // etc. - absl::optional cluster_info_; + absl::optional cluster_info_; }; } // namespace UdpProxy diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc index f59a51f7a833..60a01885b00f 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -21,6 +21,7 @@ using testing::AtLeast; using testing::ByMove; using testing::DoAll; +using testing::DoDefault; using testing::InSequence; using testing::InvokeWithoutArgs; using testing::Return; @@ -639,6 +640,176 @@ TEST_F(UdpProxyFilterTest, SocketOptionForUseOriginalSrcIp) { ensureIpTransparentSocketOptions(upstream_address_, "10.0.0.2:80", 1, 0); } +// Verify that on second data packet sent from the client, another upstream host is selected. +TEST_F(UdpProxyFilterTest, PerPacketLoadBalancingBasicFlow) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster +use_per_packet_load_balancing: true + )EOF"); + + // Allow for two sessions. + cluster_manager_.thread_local_cluster_.cluster_.info_->resetResourceManager(2, 0, 0, 0, 0); + + expectSessionCreate(upstream_address_); + test_sessions_[0].expectWriteToUpstream("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + checkTransferStats(5 /*rx_bytes*/, 1 /*rx_datagrams*/, 0 /*tx_bytes*/, 0 /*tx_datagrams*/); + test_sessions_[0].recvDataFromUpstream("world"); + checkTransferStats(5 /*rx_bytes*/, 1 /*rx_datagrams*/, 5 /*tx_bytes*/, 1 /*tx_datagrams*/); + + auto new_host_address = Network::Utility::parseInternetAddressAndPort("20.0.0.2:443"); + auto new_host = createHost(new_host_address); + EXPECT_CALL(cluster_manager_.thread_local_cluster_.lb_, chooseHost(_)).WillOnce(Return(new_host)); + expectSessionCreate(new_host_address); + test_sessions_[1].expectWriteToUpstream("hello2"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello2"); + EXPECT_EQ(2, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(2, config_->stats().downstream_sess_active_.value()); + checkTransferStats(11 /*rx_bytes*/, 2 /*rx_datagrams*/, 5 /*tx_bytes*/, 1 /*tx_datagrams*/); + + // On next datagram, first session should be used + EXPECT_CALL(cluster_manager_.thread_local_cluster_.lb_, chooseHost(_)) + .WillRepeatedly(DoDefault()); + test_sessions_[0].expectWriteToUpstream("hello3"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello3"); + EXPECT_EQ(2, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(2, config_->stats().downstream_sess_active_.value()); + checkTransferStats(17 /*rx_bytes*/, 3 /*rx_datagrams*/, 5 /*tx_bytes*/, 1 /*tx_datagrams*/); +} + +// Verify that when no host is available, message is dropped. +TEST_F(UdpProxyFilterTest, PerPacketLoadBalancingFirstInvalidHost) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster +use_per_packet_load_balancing: true + )EOF"); + + EXPECT_CALL(cluster_manager_.thread_local_cluster_.lb_, chooseHost(_)).WillOnce(Return(nullptr)); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(0, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(0, config_->stats().downstream_sess_active_.value()); + EXPECT_EQ(1, cluster_manager_.thread_local_cluster_.cluster_.info_->stats_ + .upstream_cx_none_healthy_.value()); +} + +// Verify that when on second packet no host is available, message is dropped. +TEST_F(UdpProxyFilterTest, PerPacketLoadBalancingSecondInvalidHost) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster +use_per_packet_load_balancing: true + )EOF"); + + expectSessionCreate(upstream_address_); + test_sessions_[0].expectWriteToUpstream("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + EXPECT_EQ(0, cluster_manager_.thread_local_cluster_.cluster_.info_->stats_ + .upstream_cx_none_healthy_.value()); + + EXPECT_CALL(cluster_manager_.thread_local_cluster_.lb_, chooseHost(_)).WillOnce(Return(nullptr)); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello2"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + EXPECT_EQ(1, cluster_manager_.thread_local_cluster_.cluster_.info_->stats_ + .upstream_cx_none_healthy_.value()); +} + +// Verify that all sessions for a host are removed when a host is removed. +TEST_F(UdpProxyFilterTest, PerPacketLoadBalancingRemoveHostSessions) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster +use_per_packet_load_balancing: true + )EOF"); + + expectSessionCreate(upstream_address_); + test_sessions_[0].expectWriteToUpstream("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + + cluster_manager_.thread_local_cluster_.cluster_.priority_set_.runUpdateCallbacks( + 0, {}, {cluster_manager_.thread_local_cluster_.lb_.host_}); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(0, config_->stats().downstream_sess_active_.value()); + + expectSessionCreate(upstream_address_); + test_sessions_[1].expectWriteToUpstream("hello2"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello2"); + EXPECT_EQ(2, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); +} + +// Verify that all sessions for hosts in cluster are removed when a cluster is removed. +TEST_F(UdpProxyFilterTest, PerPacketLoadBalancingRemoveCluster) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster +use_per_packet_load_balancing: true + )EOF"); + + // Allow for two sessions. + cluster_manager_.thread_local_cluster_.cluster_.info_->resetResourceManager(2, 0, 0, 0, 0); + + expectSessionCreate(upstream_address_); + test_sessions_[0].expectWriteToUpstream("hello"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ(1, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(1, config_->stats().downstream_sess_active_.value()); + + auto new_host_address = Network::Utility::parseInternetAddressAndPort("20.0.0.2:443"); + auto new_host = createHost(new_host_address); + EXPECT_CALL(cluster_manager_.thread_local_cluster_.lb_, chooseHost(_)).WillOnce(Return(new_host)); + expectSessionCreate(new_host_address); + test_sessions_[1].expectWriteToUpstream("hello2"); + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello2"); + EXPECT_EQ(2, config_->stats().downstream_sess_total_.value()); + EXPECT_EQ(2, config_->stats().downstream_sess_active_.value()); + + // Remove a cluster we don't care about. + cluster_update_callbacks_->onClusterRemoval("other_cluster"); + EXPECT_EQ(2, config_->stats().downstream_sess_active_.value()); + + // Remove the cluster we do care about. This should purge all sessions. + cluster_update_callbacks_->onClusterRemoval("fake_cluster"); + EXPECT_EQ(0, config_->stats().downstream_sess_active_.value()); +} + +// Verify that specific stat is included when connection limit is hit. +TEST_F(UdpProxyFilterTest, PerPacketLoadBalancingCannotCreateConnection) { + InSequence s; + + setup(R"EOF( +stat_prefix: foo +cluster: fake_cluster +use_per_packet_load_balancing: true + )EOF"); + + // Don't allow for any session. + cluster_manager_.thread_local_cluster_.cluster_.info_->resetResourceManager(0, 0, 0, 0, 0); + + recvDataFromDownstream("10.0.0.1:1000", "10.0.0.2:80", "hello"); + EXPECT_EQ( + 1, + cluster_manager_.thread_local_cluster_.cluster_.info_->stats_.upstream_cx_overflow_.value()); +} + // Make sure socket option is set correctly if use_original_src_ip is set in case of ipv6. TEST_F(UdpProxyFilterIpv6Test, SocketOptionForUseOriginalSrcIpInCaseOfIpv6) { if (!isTransparentSocketOptionsSupported()) { diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 20ebff34f2a4..6df79bdbfbf6 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -1011,6 +1011,7 @@ rebalancer rebalancing rebuffer rebuilder +receival reconnection recurse recv