Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport to rel1.28: DFP - move CM callbacks to thread local objects (#33303) #33445

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,37 +98,30 @@ LoadClusterEntryHandlePtr ProxyFilterConfig::addDynamicCluster(
cluster_name, callbacks);
}

Upstream::ClusterUpdateCallbacksHandlePtr
ProxyFilterConfig::addThreadLocalClusterUpdateCallbacks() {
return cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
}

ProxyFilterConfig::ThreadLocalClusterInfo::~ThreadLocalClusterInfo() {
for (const auto& it : pending_clusters_) {
for (auto cluster : it.second) {
cluster->cancel();
}
}
}

void ProxyFilterConfig::onClusterAddOrUpdate(absl::string_view cluster_name,
Upstream::ThreadLocalClusterCommand&) {
void ProxyFilterConfig::ThreadLocalClusterInfo::onClusterAddOrUpdate(
absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand&) {
ENVOY_LOG(debug, "thread local cluster {} added or updated", cluster_name);
ThreadLocalClusterInfo& tls_cluster_info = *tls_slot_;
auto it = tls_cluster_info.pending_clusters_.find(cluster_name);
if (it != tls_cluster_info.pending_clusters_.end()) {
auto it = pending_clusters_.find(cluster_name);
if (it != pending_clusters_.end()) {
for (auto* cluster : it->second) {
auto& callbacks = cluster->callbacks_;
cluster->cancel();
callbacks.onLoadClusterComplete();
}
tls_cluster_info.pending_clusters_.erase(it);
pending_clusters_.erase(it);
} else {
ENVOY_LOG(debug, "but not pending request waiting on {}", cluster_name);
}
}

void ProxyFilterConfig::onClusterRemoval(const std::string&) {
void ProxyFilterConfig::ThreadLocalClusterInfo::onClusterRemoval(const std::string&) {
// do nothing, should have no pending clusters.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class LoadClusterEntryCallbacks {
virtual void onLoadClusterComplete() PURE;
};

class ProxyFilterConfig : public Upstream::ClusterUpdateCallbacks,
Logger::Loggable<Logger::Id::forward_proxy> {
class ProxyFilterConfig : Logger::Loggable<Logger::Id::forward_proxy> {
public:
ProxyFilterConfig(
const envoy::extensions::filters::http::dynamic_forward_proxy::v3::FilterConfig& proto_config,
Expand All @@ -55,13 +54,6 @@ class ProxyFilterConfig : public Upstream::ClusterUpdateCallbacks,
addDynamicCluster(Extensions::Common::DynamicForwardProxy::DfpClusterSharedPtr cluster,
const std::string& cluster_name, const std::string& host, const int port,
LoadClusterEntryCallbacks& callback);
// run in each worker thread.
Upstream::ClusterUpdateCallbacksHandlePtr addThreadLocalClusterUpdateCallbacks();

// Upstream::ClusterUpdateCallbacks
void onClusterAddOrUpdate(absl::string_view cluster_name,
Upstream::ThreadLocalClusterCommand&) override;
void onClusterRemoval(const std::string&) override;

private:
struct LoadClusterEntryHandleImpl
Expand All @@ -76,14 +68,29 @@ class ProxyFilterConfig : public Upstream::ClusterUpdateCallbacks,
LoadClusterEntryCallbacks& callbacks_;
};

// Per-thread cluster info including pending callbacks.
struct ThreadLocalClusterInfo : public ThreadLocal::ThreadLocalObject {
ThreadLocalClusterInfo(ProxyFilterConfig& parent) : parent_{parent} {
handle_ = parent.addThreadLocalClusterUpdateCallbacks();
// Per-thread cluster info including pending clusters.
// The lifetime of ThreadLocalClusterInfo, which is allocated on each working thread
// may exceed lifetime of the parent object (ProxyFilterConfig), which is allocated
// and deleted on the main thread.
// Currently ThreadLocalClusterInfo does not hold any references to the parent object
// and therefore does not need to check if the parent object is still valid.
// IMPORTANT: If a reference to the parent object is added here, the validity of
// that object must be checked before using it. It is best achieved via
// combination of shared and weak pointers.
struct ThreadLocalClusterInfo : public ThreadLocal::ThreadLocalObject,
public Envoy::Upstream::ClusterUpdateCallbacks,
Logger::Loggable<Logger::Id::forward_proxy> {
ThreadLocalClusterInfo(ProxyFilterConfig& parent) {
// run in each worker thread.
handle_ = parent.cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
}
~ThreadLocalClusterInfo() override;

void onClusterAddOrUpdate(absl::string_view cluster_name,
Upstream::ThreadLocalClusterCommand& command) override;
void onClusterRemoval(const std::string& name) override;

absl::flat_hash_map<std::string, std::list<LoadClusterEntryHandleImpl*>> pending_clusters_;
ProxyFilterConfig& parent_;
Upstream::ClusterUpdateCallbacksHandlePtr handle_;
};

Expand Down
Loading