Skip to content

Commit

Permalink
udp_proxy: added per packet load balancing possibility (#18605)
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maka <m.maka@partner.samsung.com>
  • Loading branch information
michalmaka authored Nov 19, 2021
1 parent dc2ac22 commit 23e5fc2
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// [#extension: envoy.filters.udp_listener.udp_proxy]

// Configuration for the UDP proxy filter.
// [#next-free-field: 7]
// [#next-free-field: 8]
message UdpProxyConfig {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.udp.udp_proxy.v2alpha.UdpProxyConfig";
Expand Down Expand Up @@ -82,4 +82,9 @@ message UdpProxyConfig {
// :ref:`prefer_gro <envoy_v3_api_field_config.core.v3.UdpSocketConfig.prefer_gro>` is true for upstream
// sockets as the assumption is datagrams will be received from a single source.
config.core.v3.UdpSocketConfig upstream_socket_config = 6;

// Perform per packet load balancing (upstream host selection) on each received data chunk.
// The default if not specified is false, that means each data chunk is forwarded
// to upstream host selected on first chunk receival for that "session" (identified by source IP/port and local IP/port).
bool use_per_packet_load_balancing = 7;
}
15 changes: 12 additions & 3 deletions docs/root/configuration/listeners/udp_filters/udp_proxy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,26 @@ Each session is index by the 4-tuple consisting of source IP/port and local IP/p
datagram is received on. Sessions last until the :ref:`idle timeout
<envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.idle_timeout>` is reached.

Above *session stickness* could be disabled by setting :ref:`use_per_packet_load_balancing
<envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.use_per_packet_load_balancing>` to true.
In that case, *per packet load balancing* is enabled. It means that upstream host is selected on every single data chunk
received by udp proxy using currently used load balancing policy.

The UDP proxy listener filter also can operate as a *transparent* proxy if the
:ref:`use_original_src_ip <envoy_v3_api_msg_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig>`
field is set. But please keep in mind that it does not forward the port to upstreams. It forwards only the IP address to upstreams.
field is set to true. But please keep in mind that it does not forward the port to upstreams. It forwards only the IP address to upstreams.

Load balancing and unhealthy host handling
------------------------------------------

Envoy will fully utilize the configured load balancer for the configured upstream cluster when
load balancing UDP datagrams. When a new session is created, Envoy will associate the session
load balancing UDP datagrams. By default, when a new session is created, Envoy will associate the session
with an upstream host selected using the configured load balancer. All future datagrams that
belong to the session will be routed to the same upstream host.
belong to the session will be routed to the same upstream host. However, if :ref:`use_per_packet_load_balancing
<envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.use_per_packet_load_balancing>`
field is set to true, Envoy selects another upstream host on next datagram using the configured load balancer
and creates a new session if such does not exist. So in case of several upstream hosts available for the load balancer
each data chunk is forwarded to a different host.

When an upstream host becomes unhealthy (due to :ref:`active health checking
<arch_overview_health_checking>`), Envoy will attempt to create a new session to a healthy host
Expand Down
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ New Features
* tls_inspector filter: added :ref:`enable_ja3_fingerprinting <envoy_v3_api_field_extensions.filters.listener.tls_inspector.v3.TlsInspector.enable_ja3_fingerprinting>` to create JA3 fingerprint hash from Client Hello message.
* transport_socket: added :ref:`envoy.transport_sockets.tcp_stats <envoy_v3_api_msg_extensions.transport_sockets.tcp_stats.v3.Config>` which generates additional statistics gathered from the OS TCP stack.
* udp: add support for multiple listener filters.
* udp_proxy: added :ref:`use_per_packet_load_balancing <envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.use_per_packet_load_balancing>` option to enable per packet load balancing (selection of upstream host on each data chunk).
* upstream: added the ability to :ref:`configure max connection duration <envoy_v3_api_field_config.core.v3.HttpProtocolOptions.max_connection_duration>` for upstream clusters.
* vcl_socket_interface: added VCL socket interface extension for fd.io VPP integration to :ref:`contrib images <install_contrib>`. This can be enabled via :ref:`VCL <envoy_v3_api_msg_extensions.vcl.v3alpha.VclSocketInterface>` configuration.
* xds: re-introduced unified delta and sotw xDS multiplexers that share most of the implementation. Added a new runtime config ``envoy.reloadable_features.unified_mux`` (disabled by default) that when enabled, switches xDS to use unified multiplexers.
Expand Down
152 changes: 109 additions & 43 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ void UdpProxyFilter::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster)
}

ENVOY_LOG(debug, "udp proxy: attaching to cluster {}", cluster.info()->name());
ASSERT(cluster_info_ == absl::nullopt || &cluster_info_.value().cluster_ != &cluster);
cluster_info_.emplace(*this, cluster);
ASSERT(cluster_info_ == absl::nullopt || &cluster_info_.value()->cluster_ != &cluster);

if (config_->usingPerPacketLoadBalancing()) {
cluster_info_.emplace(std::make_unique<PerPacketLoadBalancingClusterInfo>(*this, cluster));
} else {
cluster_info_.emplace(std::make_unique<StickySessionClusterInfo>(*this, cluster));
}
}

void UdpProxyFilter::onClusterRemoval(const std::string& cluster) {
Expand All @@ -46,7 +51,7 @@ Network::FilterStatus UdpProxyFilter::onData(Network::UdpRecvData& data) {
return Network::FilterStatus::StopIteration;
}

return cluster_info_.value().onData(data);
return cluster_info_.value()->onData(data);
}

Network::FilterStatus UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode) {
Expand All @@ -56,9 +61,10 @@ Network::FilterStatus UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode)
}

UdpProxyFilter::ClusterInfo::ClusterInfo(UdpProxyFilter& filter,
Upstream::ThreadLocalCluster& cluster)
Upstream::ThreadLocalCluster& cluster,
SessionStorageType&& sessions)
: filter_(filter), cluster_(cluster),
cluster_stats_(generateStats(cluster.info()->statsScope())),
cluster_stats_(generateStats(cluster.info()->statsScope())), sessions_(std::move(sessions)),
member_update_cb_handle_(cluster.prioritySet().addMemberUpdateCb(
[this](const Upstream::HostVector&, const Upstream::HostVector& hosts_removed) {
for (const auto& host : hosts_removed) {
Expand All @@ -85,36 +91,84 @@ UdpProxyFilter::ClusterInfo::~ClusterInfo() {
ASSERT(host_to_sessions_.empty());
}

Network::FilterStatus UdpProxyFilter::ClusterInfo::onData(Network::UdpRecvData& data) {
void UdpProxyFilter::ClusterInfo::removeSession(const ActiveSession* session) {
// First remove from the host to sessions map.
ASSERT(host_to_sessions_[&session->host()].count(session) == 1);
auto host_sessions_it = host_to_sessions_.find(&session->host());
host_sessions_it->second.erase(session);
if (host_sessions_it->second.empty()) {
host_to_sessions_.erase(host_sessions_it);
}

// Now remove it from the primary map.
ASSERT(sessions_.count(session) == 1);
sessions_.erase(session);
}

UdpProxyFilter::ActiveSession*
UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& optional_host) {
if (!cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.canCreate()) {
ENVOY_LOG(debug, "cannot create new connection.");
cluster_.info()->stats().upstream_cx_overflow_.inc();
return nullptr;
}

if (optional_host) {
return createSessionWithHost(std::move(addresses), optional_host);
}

auto host = chooseHost(addresses.peer_);
if (host == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host.");
cluster_.info()->stats().upstream_cx_none_healthy_.inc();
return nullptr;
}
return createSessionWithHost(std::move(addresses), host);
}

UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSessionWithHost(
Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host) {
ASSERT(host);
auto new_session = std::make_unique<ActiveSession>(*this, std::move(addresses), host);
auto new_session_ptr = new_session.get();
sessions_.emplace(std::move(new_session));
host_to_sessions_[host.get()].emplace(new_session_ptr);
return new_session_ptr;
}

Upstream::HostConstSharedPtr UdpProxyFilter::ClusterInfo::chooseHost(
const Network::Address::InstanceConstSharedPtr& peer_address) const {
UdpLoadBalancerContext context(filter_.config_->hashPolicy(), peer_address);
Upstream::HostConstSharedPtr host = cluster_.loadBalancer().chooseHost(&context);
return host;
}

UdpProxyFilter::StickySessionClusterInfo::StickySessionClusterInfo(
UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster)
: ClusterInfo(filter, cluster,
SessionStorageType(1, HeterogeneousActiveSessionHash(false),
HeterogeneousActiveSessionEqual(false))) {}

Network::FilterStatus UdpProxyFilter::StickySessionClusterInfo::onData(Network::UdpRecvData& data) {
const auto active_session_it = sessions_.find(data.addresses_);
ActiveSession* active_session;
if (active_session_it == sessions_.end()) {
if (!cluster_.info()
->resourceManager(Upstream::ResourcePriority::Default)
.connections()
.canCreate()) {
cluster_.info()->stats().upstream_cx_overflow_.inc();
active_session = createSession(std::move(data.addresses_));
if (active_session == nullptr) {
return Network::FilterStatus::StopIteration;
}

UdpLoadBalancerContext context(filter_.config_->hashPolicy(), data.addresses_.peer_);
Upstream::HostConstSharedPtr host = cluster_.loadBalancer().chooseHost(&context);
if (host == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host. failed to create a session.");
cluster_.info()->stats().upstream_cx_none_healthy_.inc();
return Network::FilterStatus::StopIteration;
}

active_session = createSession(std::move(data.addresses_), host);
} else {
active_session = active_session_it->get();
if (active_session->host().health() == Upstream::Host::Health::Unhealthy) {
// If a host becomes unhealthy, we optimally would like to replace it with a new session
// to a healthy host. We may eventually want to make this behavior configurable, but for now
// this will be the universal behavior.

UdpLoadBalancerContext context(filter_.config_->hashPolicy(), data.addresses_.peer_);
Upstream::HostConstSharedPtr host = cluster_.loadBalancer().chooseHost(&context);
auto host = chooseHost(data.addresses_.peer_);
if (host != nullptr && host->health() != Upstream::Host::Health::Unhealthy &&
host.get() != &active_session->host()) {
ENVOY_LOG(debug, "upstream session unhealthy, recreating the session");
Expand All @@ -132,28 +186,40 @@ Network::FilterStatus UdpProxyFilter::ClusterInfo::onData(Network::UdpRecvData&
return Network::FilterStatus::StopIteration;
}

UdpProxyFilter::ActiveSession*
UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host) {
auto new_session = std::make_unique<ActiveSession>(*this, std::move(addresses), host);
auto new_session_ptr = new_session.get();
sessions_.emplace(std::move(new_session));
host_to_sessions_[host.get()].emplace(new_session_ptr);
return new_session_ptr;
}
UdpProxyFilter::PerPacketLoadBalancingClusterInfo::PerPacketLoadBalancingClusterInfo(
UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster)
: ClusterInfo(filter, cluster,
SessionStorageType(1, HeterogeneousActiveSessionHash(true),
HeterogeneousActiveSessionEqual(true))) {}

Network::FilterStatus
UdpProxyFilter::PerPacketLoadBalancingClusterInfo::onData(Network::UdpRecvData& data) {
auto host = chooseHost(data.addresses_.peer_);
if (host == nullptr) {
ENVOY_LOG(debug, "cannot find any valid host.");
cluster_.info()->stats().upstream_cx_none_healthy_.inc();
return Network::FilterStatus::StopIteration;
}

void UdpProxyFilter::ClusterInfo::removeSession(const ActiveSession* session) {
// First remove from the host to sessions map.
ASSERT(host_to_sessions_[&session->host()].count(session) == 1);
auto host_sessions_it = host_to_sessions_.find(&session->host());
host_sessions_it->second.erase(session);
if (host_sessions_it->second.empty()) {
host_to_sessions_.erase(host_sessions_it);
ENVOY_LOG(debug, "selected {} host as upstream.", host->address()->asStringView());

LocalPeerHostAddresses key{data.addresses_, *host};
const auto active_session_it = sessions_.find(key);
ActiveSession* active_session;
if (active_session_it == sessions_.end()) {
active_session = createSession(std::move(data.addresses_), host);
if (active_session == nullptr) {
return Network::FilterStatus::StopIteration;
}
} else {
active_session = active_session_it->get();
ENVOY_LOG(trace, "found already existing session on host {}.",
active_session->host().address()->asStringView());
}

// Now remove it from the primary map.
ASSERT(sessions_.count(session) == 1);
sessions_.erase(session);
active_session->write(*data.buffer_);

return Network::FilterStatus::StopIteration;
}

UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster,
Expand Down
Loading

0 comments on commit 23e5fc2

Please sign in to comment.