Skip to content

Commit

Permalink
DFP: move CM callbacks to thread local objects (#33303)
Browse files Browse the repository at this point in the history
Signed-off-by: Christoph Pakulski <paker8848@gmail.com>
  • Loading branch information
cpakulski authored Apr 10, 2024
1 parent 5f68428 commit 59da9ee
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,37 +100,30 @@ LoadClusterEntryHandlePtr ProxyFilterConfig::addDynamicCluster(
cluster_name, callbacks);
}

Upstream::ClusterUpdateCallbacksHandlePtr
ProxyFilterConfig::addThreadLocalClusterUpdateCallbacks() {
return cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
}

ProxyFilterConfig::ThreadLocalClusterInfo::~ThreadLocalClusterInfo() {
for (const auto& it : pending_clusters_) {
for (auto cluster : it.second) {
cluster->cancel();
}
}
}

void ProxyFilterConfig::onClusterAddOrUpdate(absl::string_view cluster_name,
Upstream::ThreadLocalClusterCommand&) {
void ProxyFilterConfig::ThreadLocalClusterInfo::onClusterAddOrUpdate(
absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand&) {
ENVOY_LOG(debug, "thread local cluster {} added or updated", cluster_name);
ThreadLocalClusterInfo& tls_cluster_info = *tls_slot_;
auto it = tls_cluster_info.pending_clusters_.find(cluster_name);
if (it != tls_cluster_info.pending_clusters_.end()) {
auto it = pending_clusters_.find(cluster_name);
if (it != pending_clusters_.end()) {
for (auto* cluster : it->second) {
auto& callbacks = cluster->callbacks_;
cluster->cancel();
callbacks.onLoadClusterComplete();
}
tls_cluster_info.pending_clusters_.erase(it);
pending_clusters_.erase(it);
} else {
ENVOY_LOG(debug, "but not pending request waiting on {}", cluster_name);
}
}

void ProxyFilterConfig::onClusterRemoval(const std::string&) {
void ProxyFilterConfig::ThreadLocalClusterInfo::onClusterRemoval(const std::string&) {
// do nothing, should have no pending clusters.
}

Expand Down
35 changes: 21 additions & 14 deletions source/extensions/filters/http/dynamic_forward_proxy/proxy_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class LoadClusterEntryCallbacks {
virtual void onLoadClusterComplete() PURE;
};

class ProxyFilterConfig : public Upstream::ClusterUpdateCallbacks,
Logger::Loggable<Logger::Id::forward_proxy> {
class ProxyFilterConfig : Logger::Loggable<Logger::Id::forward_proxy> {
public:
ProxyFilterConfig(
const envoy::extensions::filters::http::dynamic_forward_proxy::v3::FilterConfig& proto_config,
Expand All @@ -56,13 +55,6 @@ class ProxyFilterConfig : public Upstream::ClusterUpdateCallbacks,
addDynamicCluster(Extensions::Common::DynamicForwardProxy::DfpClusterSharedPtr cluster,
const std::string& cluster_name, const std::string& host, const int port,
LoadClusterEntryCallbacks& callback);
// run in each worker thread.
Upstream::ClusterUpdateCallbacksHandlePtr addThreadLocalClusterUpdateCallbacks();

// Upstream::ClusterUpdateCallbacks
void onClusterAddOrUpdate(absl::string_view cluster_name,
Upstream::ThreadLocalClusterCommand&) override;
void onClusterRemoval(const std::string&) override;

private:
struct LoadClusterEntryHandleImpl
Expand All @@ -77,14 +69,29 @@ class ProxyFilterConfig : public Upstream::ClusterUpdateCallbacks,
LoadClusterEntryCallbacks& callbacks_;
};

// Per-thread cluster info including pending callbacks.
struct ThreadLocalClusterInfo : public ThreadLocal::ThreadLocalObject {
ThreadLocalClusterInfo(ProxyFilterConfig& parent) : parent_{parent} {
handle_ = parent.addThreadLocalClusterUpdateCallbacks();
// Per-thread cluster info including pending clusters.
// The lifetime of ThreadLocalClusterInfo, which is allocated on each working thread
// may exceed lifetime of the parent object (ProxyFilterConfig), which is allocated
// and deleted on the main thread.
// Currently ThreadLocalClusterInfo does not hold any references to the parent object
// and therefore does not need to check if the parent object is still valid.
// IMPORTANT: If a reference to the parent object is added here, the validity of
// that object must be checked before using it. It is best achieved via
// combination of shared and weak pointers.
struct ThreadLocalClusterInfo : public ThreadLocal::ThreadLocalObject,
public Envoy::Upstream::ClusterUpdateCallbacks,
Logger::Loggable<Logger::Id::forward_proxy> {
ThreadLocalClusterInfo(ProxyFilterConfig& parent) {
// run in each worker thread.
handle_ = parent.cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
}
~ThreadLocalClusterInfo() override;

void onClusterAddOrUpdate(absl::string_view cluster_name,
Upstream::ThreadLocalClusterCommand& command) override;
void onClusterRemoval(const std::string& name) override;

absl::flat_hash_map<std::string, std::list<LoadClusterEntryHandleImpl*>> pending_clusters_;
ProxyFilterConfig& parent_;
Upstream::ClusterUpdateCallbacksHandlePtr handle_;
};

Expand Down

0 comments on commit 59da9ee

Please sign in to comment.