Skip to content

Commit

Permalink
CM: Add on-demand cluster discovery functionality (#18723)
Browse files Browse the repository at this point in the history
OdCdsApi interface is introduced as an API used for the on-demand discovery. The implementation of it is provided using the CdsApiHelper class, so OdCdsApiImpl handles the discovered cluster in the same way as CdsApiImpl does. On the ClusterManagerImpl side, the discovery manager (ClusterDiscoveryManager) is added to help in deduplicating the requests for the same cluster from within the same worker thread. Further deduplication of the requests coming from different worker threads is done in ClusterManagerImpl in the main thread. Each unique request for a cluster also receives a timeout to catch a case when a discovery fails, thus allowing to let the worker threads to handle the failure.

This is a continuation of #15857 - I could not reopen it, so I'm opening a new PR. I used the opportunity to rebase my changes on top of main.

Risk Level:
Low. A new feature not wired up anywhere yet.

Signed-off-by: Krzesimir Nowak <knowak@microsoft.com>
  • Loading branch information
krnowak authored Feb 4, 2022
1 parent 0d110de commit a34bd8b
Show file tree
Hide file tree
Showing 27 changed files with 2,026 additions and 7 deletions.
85 changes: 84 additions & 1 deletion envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace Envoy {
namespace Upstream {

/**
* ClusterUpdateCallbacks provide a way to exposes Cluster lifecycle events in the
* ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the
* ClusterManager.
*/
class ClusterUpdateCallbacks {
Expand Down Expand Up @@ -72,6 +72,76 @@ class ClusterUpdateCallbacksHandle {

using ClusterUpdateCallbacksHandlePtr = std::unique_ptr<ClusterUpdateCallbacksHandle>;

/**
* Status enum for the result of an attempted cluster discovery.
*/
enum class ClusterDiscoveryStatus {
/**
* The discovery process timed out. This means that we haven't yet received any reply from
* on-demand CDS about it.
*/
Timeout,
/**
* The discovery process has concluded and on-demand CDS has no such cluster.
*/
Missing,
/**
* Cluster found and currently available through ClusterManager.
*/
Available,
};

/**
* ClusterDiscoveryCallback is a callback called at the end of the on-demand cluster discovery
* process. The status of the discovery is sent as a parameter.
*/
using ClusterDiscoveryCallback = std::function<void(ClusterDiscoveryStatus)>;
using ClusterDiscoveryCallbackPtr = std::unique_ptr<ClusterDiscoveryCallback>;

/**
* ClusterDiscoveryCallbackHandle is a RAII wrapper for a ClusterDiscoveryCallback. Deleting the
* ClusterDiscoveryCallbackHandle will remove the callbacks from ClusterManager.
*/
class ClusterDiscoveryCallbackHandle {
public:
virtual ~ClusterDiscoveryCallbackHandle() = default;
};

using ClusterDiscoveryCallbackHandlePtr = std::unique_ptr<ClusterDiscoveryCallbackHandle>;

/**
* A handle to an on-demand CDS.
*/
class OdCdsApiHandle {
public:
virtual ~OdCdsApiHandle() = default;

/**
* Request an on-demand discovery of a cluster with a passed name. This ODCDS may be used to
* perform the discovery process in the main thread if there is no discovery going on for this
* cluster. When the requested cluster is added and warmed up, the passed callback will be invoked
* in the same thread that invoked this function.
*
* The returned handle can be destroyed to prevent the callback from being invoked. Note that the
* handle can only be destroyed in the same thread that invoked the function. Destroying the
* handle might not stop the discovery process, though. As soon as the callback is invoked,
* destroying the handle does nothing. It is a responsibility of the caller to make sure that the
* objects captured in the callback outlive the callback.
*
* This function is thread-safe.
*
* @param name is the name of the cluster to be discovered.
* @param callback will be called when the discovery is finished.
* @param timeout describes how long the operation may take before failing.
* @return the discovery process handle.
*/
virtual ClusterDiscoveryCallbackHandlePtr
requestOnDemandClusterDiscovery(absl::string_view name, ClusterDiscoveryCallbackPtr callback,
std::chrono::milliseconds timeout) PURE;
};

using OdCdsApiHandlePtr = std::unique_ptr<OdCdsApiHandle>;

class ClusterManagerFactory;

// These are per-cluster per-thread, so not "global" stats.
Expand Down Expand Up @@ -326,6 +396,19 @@ class ClusterManager {
* @param cluster, the cluster to check.
*/
virtual void checkActiveStaticCluster(const std::string& cluster) PURE;

/**
* Allocates an on-demand CDS API provider from configuration proto or locator.
*
* @param odcds_config is a configuration proto. Used when odcds_resources_locator is a nullopt.
* @param odcds_resources_locator is a locator for ODCDS. Used over odcds_config if not a nullopt.
* @param validation_visitor
* @return OdCdsApiHandlePtr the ODCDS handle.
*/
virtual OdCdsApiHandlePtr
allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config,
OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
ProtobufMessage::ValidationVisitor& validation_visitor) PURE;
};

using ClusterManagerPtr = std::unique_ptr<ClusterManager>;
Expand Down
32 changes: 32 additions & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,46 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "od_cds_api_lib",
srcs = ["od_cds_api_impl.cc"],
hdrs = ["od_cds_api_impl.h"],
deps = [
":cds_api_helper_lib",
"//envoy/config:subscription_interface",
"//envoy/protobuf:message_validator_interface",
"//envoy/stats:stats_interface",
"//envoy/upstream:cluster_manager_interface",
"//source/common/common:minimal_logger_lib",
"//source/common/config:subscription_base_interface",
"//source/common/grpc:common_lib",
"//source/common/protobuf",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "cluster_discovery_manager_lib",
srcs = ["cluster_discovery_manager.cc"],
hdrs = ["cluster_discovery_manager.h"],
deps = [
"//envoy/upstream:cluster_manager_interface",
"//source/common/common:enum_to_int",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "cluster_manager_lib",
srcs = ["cluster_manager_impl.cc"],
hdrs = ["cluster_manager_impl.h"],
deps = [
":cds_api_lib",
":cluster_discovery_manager_lib",
":load_balancer_lib",
":load_stats_reporter_lib",
":od_cds_api_lib",
":ring_hash_lb_lib",
":subset_lb_lib",
"//envoy/api:api_interface",
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cds_api_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CdsApiHelper : Logger::Loggable<Logger::Id::upstream> {

private:
ClusterManager& cm_;
std::string name_;
const std::string name_;
std::string system_version_info_;
};

Expand Down
171 changes: 171 additions & 0 deletions source/common/upstream/cluster_discovery_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#include "source/common/upstream/cluster_discovery_manager.h"

#include <functional>

#include "source/common/common/enum_to_int.h"

namespace Envoy {
namespace Upstream {

namespace {

using ClusterAddedCb = std::function<void(ThreadLocalCluster&)>;

class ClusterCallbacks : public ClusterUpdateCallbacks {
public:
ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {}

void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override { cb_(cluster); };

void onClusterRemoval(const std::string&) override {}

private:
ClusterAddedCb cb_;
};

} // namespace

ClusterDiscoveryManager::ClusterDiscoveryManager(
std::string thread_name, ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler)
: thread_name_(std::move(thread_name)) {
callbacks_ = std::make_unique<ClusterCallbacks>([this](ThreadLocalCluster& cluster) {
ENVOY_LOG(trace,
"cm cdm: starting processing cluster name {} (status {}) from cluster lifecycle "
"callback in {}",
cluster.info()->name(), enumToInt(ClusterDiscoveryStatus::Available), thread_name_);
processClusterName(cluster.info()->name(), ClusterDiscoveryStatus::Available);
});
callbacks_handle_ = lifecycle_callbacks_handler.addClusterUpdateCallbacks(*callbacks_);
}

void ClusterDiscoveryManager::processClusterName(absl::string_view name,
ClusterDiscoveryStatus cluster_status) {
auto callback_items = extractCallbackList(name);
if (callback_items.empty()) {
ENVOY_LOG(trace, "cm cdm: no callbacks for the cluster name {} in {}", name, thread_name_);
return;
}
ENVOY_LOG(trace, "cm cdm: invoking {} callbacks for the cluster name {} in {}",
callback_items.size(), name, thread_name_);
for (auto& item : callback_items) {
auto callback = std::move(item->callback_);
// This invalidates the handle and the invoker.
item.reset();
// The callback could be null when handle was destroyed during the
// previous callback.
if (callback != nullptr) {
(*callback)(cluster_status);
}
}
}

ClusterDiscoveryManager::AddedCallbackData
ClusterDiscoveryManager::addCallback(std::string name, ClusterDiscoveryCallbackPtr callback) {
ENVOY_LOG(trace, "cm cdm: adding callback for the cluster name {} in {}", name, thread_name_);
auto& callbacks_list = pending_clusters_[name];
auto item_weak_ptr = addCallbackInternal(callbacks_list, std::move(callback));
auto handle = std::make_unique<ClusterDiscoveryCallbackHandleImpl>(*this, name, item_weak_ptr);
CallbackInvoker invoker(*this, std::move(name), std::move(item_weak_ptr));
auto discovery_in_progress = (callbacks_list.size() > 1);
return {std::move(handle), discovery_in_progress, std::move(invoker)};
}

void ClusterDiscoveryManager::swap(ClusterDiscoveryManager& other) {
thread_name_.swap(other.thread_name_);
pending_clusters_.swap(other.pending_clusters_);
callbacks_.swap(other.callbacks_);
callbacks_handle_.swap(other.callbacks_handle_);
}

void ClusterDiscoveryManager::invokeCallbackFromItem(absl::string_view name,
CallbackListItemWeakPtr item_weak_ptr,
ClusterDiscoveryStatus cluster_status) {
auto item_ptr = item_weak_ptr.lock();
if (item_ptr == nullptr) {
ENVOY_LOG(trace, "cm cdm: not invoking an already stale callback for cluster {} in {}", name,
thread_name_);
return;
}
ENVOY_LOG(trace, "cm cdm: invoking a callback for cluster {} in {}", name, thread_name_);
auto callback = std::move(item_ptr->callback_);
if (item_ptr->self_iterator_.has_value()) {
eraseItem(name, std::move(item_ptr));
} else {
ENVOY_LOG(trace,
"cm cdm: the callback for cluster {} in {} is prepared for invoking during "
"processing, yet some other callback tries to invoke this callback earlier",
name, thread_name_);
}
if (callback != nullptr) {
(*callback)(cluster_status);
} else {
ENVOY_LOG(trace, "cm cdm: the callback for cluster {} in {} is prepared for invoking during "
"processing, yet some other callback destroyed its handle in the meantime");
}
}

ClusterDiscoveryManager::CallbackList
ClusterDiscoveryManager::extractCallbackList(absl::string_view name) {
auto map_node_handle = pending_clusters_.extract(name);
if (map_node_handle.empty()) {
return {};
}
CallbackList extracted;
map_node_handle.mapped().swap(extracted);
for (auto& item : extracted) {
item->self_iterator_.reset();
}
return extracted;
}

ClusterDiscoveryManager::CallbackListItemWeakPtr
ClusterDiscoveryManager::addCallbackInternal(CallbackList& list,
ClusterDiscoveryCallbackPtr callback) {
auto item = std::make_shared<CallbackListItem>(std::move(callback));
auto it = list.emplace(list.end(), item);
item->self_iterator_ = std::move(it);
return item;
}

void ClusterDiscoveryManager::erase(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr) {
auto item_ptr = item_weak_ptr.lock();
if (item_ptr == nullptr) {
ENVOY_LOG(trace, "cm cdm: not dropping a stale callback for the cluster name {} in {}", name,
thread_name_);
return;
}
ENVOY_LOG(trace, "cm cdm: dropping callback for the cluster name {} in {}", name, thread_name_);
if (!item_ptr->self_iterator_.has_value()) {
ENVOY_LOG(trace,
"cm cdm: callback for the cluster name {} in {} is not on the callbacks list "
"anymore, which means it is about to be invoked; preventing it",
name, thread_name_);
item_ptr->callback_.reset();
return;
}
eraseItem(name, std::move(item_ptr));
}

void ClusterDiscoveryManager::eraseItem(absl::string_view name,
CallbackListItemSharedPtr item_ptr) {
ASSERT(item_ptr != nullptr);
ASSERT(item_ptr->self_iterator_.has_value());
const bool drop_list = eraseFromList(name, item_ptr->self_iterator_.value());
item_ptr->self_iterator_.reset();
if (drop_list) {
ENVOY_LOG(trace, "cm cdm: dropped last callback for the cluster name {} in {}", name,
thread_name_);
pending_clusters_.erase(name);
}
}

bool ClusterDiscoveryManager::eraseFromList(absl::string_view name, CallbackListIterator it) {
auto map_it = pending_clusters_.find(name);
ASSERT(map_it != pending_clusters_.end());
auto& list = map_it->second;
list.erase(it);
return list.empty();
}

} // namespace Upstream
} // namespace Envoy
Loading

0 comments on commit a34bd8b

Please sign in to comment.