Skip to content

Commit

Permalink
backport: DFP: move CM callbacks to thread local objects (envoyproxy#…
Browse files Browse the repository at this point in the history
…33303)

Signed-off-by: Christoph Pakulski <paker8848@gmail.com>
  • Loading branch information
cpakulski committed Apr 10, 2024
1 parent a96624b commit 2b33a2d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ LoadClusterEntryHandlePtr ProxyFilterConfig::addDynamicCluster(
cluster_name, callbacks);
}

Upstream::ClusterUpdateCallbacksHandlePtr
ProxyFilterConfig::addThreadLocalClusterUpdateCallbacks() {
return cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
}

ProxyFilterConfig::ThreadLocalClusterInfo::~ThreadLocalClusterInfo() {
for (const auto& it : pending_clusters_) {
for (auto cluster : it.second) {
Expand All @@ -111,24 +106,24 @@ ProxyFilterConfig::ThreadLocalClusterInfo::~ThreadLocalClusterInfo() {
}
}

void ProxyFilterConfig::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) {
void ProxyFilterConfig::ThreadLocalClusterInfo::onClusterAddOrUpdate(
Upstream::ThreadLocalCluster& cluster) {
const std::string& cluster_name = cluster.info()->name();
ENVOY_LOG(debug, "thread local cluster {} added or updated", cluster_name);
ThreadLocalClusterInfo& tls_cluster_info = *tls_slot_;
auto it = tls_cluster_info.pending_clusters_.find(cluster_name);
if (it != tls_cluster_info.pending_clusters_.end()) {
auto it = pending_clusters_.find(cluster_name);
if (it != pending_clusters_.end()) {
for (auto* cluster : it->second) {
auto& callbacks = cluster->callbacks_;
cluster->cancel();
callbacks.onLoadClusterComplete();
}
tls_cluster_info.pending_clusters_.erase(it);
pending_clusters_.erase(it);
} else {
ENVOY_LOG(debug, "but not pending request waiting on {}", cluster_name);
}
}

void ProxyFilterConfig::onClusterRemoval(const std::string&) {
void ProxyFilterConfig::ThreadLocalClusterInfo::onClusterRemoval(const std::string&) {
// do nothing, should have no pending clusters.
}

Expand Down
33 changes: 20 additions & 13 deletions source/extensions/filters/http/dynamic_forward_proxy/proxy_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class LoadClusterEntryCallbacks {
virtual void onLoadClusterComplete() PURE;
};

class ProxyFilterConfig : public Upstream::ClusterUpdateCallbacks,
Logger::Loggable<Logger::Id::forward_proxy> {
class ProxyFilterConfig : Logger::Loggable<Logger::Id::forward_proxy> {
public:
ProxyFilterConfig(
const envoy::extensions::filters::http::dynamic_forward_proxy::v3::FilterConfig& proto_config,
Expand All @@ -55,12 +54,6 @@ class ProxyFilterConfig : public Upstream::ClusterUpdateCallbacks,
addDynamicCluster(Extensions::Common::DynamicForwardProxy::DfpClusterSharedPtr cluster,
const std::string& cluster_name, const std::string& host, const int port,
LoadClusterEntryCallbacks& callback);
// run in each worker thread.
Upstream::ClusterUpdateCallbacksHandlePtr addThreadLocalClusterUpdateCallbacks();

// Upstream::ClusterUpdateCallbacks
void onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) override;
void onClusterRemoval(const std::string&) override;

private:
struct LoadClusterEntryHandleImpl
Expand All @@ -75,14 +68,28 @@ class ProxyFilterConfig : public Upstream::ClusterUpdateCallbacks,
LoadClusterEntryCallbacks& callbacks_;
};

// Per-thread cluster info including pending callbacks.
struct ThreadLocalClusterInfo : public ThreadLocal::ThreadLocalObject {
ThreadLocalClusterInfo(ProxyFilterConfig& parent) : parent_{parent} {
handle_ = parent.addThreadLocalClusterUpdateCallbacks();
// Per-thread cluster info including pending clusters.
// The lifetime of ThreadLocalClusterInfo, which is allocated on each working thread
// may exceed lifetime of the parent object (ProxyFilterConfig), which is allocated
// and deleted on the main thread.
// Currently ThreadLocalClusterInfo does not hold any references to the parent object
// and therefore does not need to check if the parent object is still valid.
// IMPORTANT: If a reference to the parent object is added here, the validity of
// that object must be checked before using it. It is best achieved via
// combination of shared and weak pointers.
struct ThreadLocalClusterInfo : public ThreadLocal::ThreadLocalObject,
public Envoy::Upstream::ClusterUpdateCallbacks,
Logger::Loggable<Logger::Id::forward_proxy> {
ThreadLocalClusterInfo(ProxyFilterConfig& parent) {
// run in each worker thread.
handle_ = parent.cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
}
~ThreadLocalClusterInfo() override;

void onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) override;
void onClusterRemoval(const std::string& name) override;

absl::flat_hash_map<std::string, std::list<LoadClusterEntryHandleImpl*>> pending_clusters_;
ProxyFilterConfig& parent_;
Upstream::ClusterUpdateCallbacksHandlePtr handle_;
};

Expand Down

0 comments on commit 2b33a2d

Please sign in to comment.