Skip to content

Commit

Permalink
[router] Add SRDS configUpdate impl (#7451)
Browse files Browse the repository at this point in the history
This PR contains changes on the xRDS side for SRDS impl, cribbed from http://go/gh/stevenzzzz/pull/8/files#diff-2071ab0887162eac1fd177e89d83175a

* Add onConfigUpdate impl for SRDS subscription
* Remove scoped_config_manager as it's not used now.
* Move ScopedConfigInfo to scoped_config_impl.h/cc
* Add a hash to scopeKey and scopeKeyFragment, so we can look up scopekey by hash value in constant time when SRDS has many scopes.
* Add a initManager parameter to RDS createRdsRouteConfigProvider API interface, when creating RouteConfigProvider after listener/server warmed up, we need to specify a different initManager than the one from factoryContext to avoid an assertion failure. see related:#7617

This PR only latches a SRDS provider into the connection manager, the "conn manager using SRDS to make route decision" plus integration tests will be covered in a following PR.

Risk Level: LOW [not fully implemented].
Testing: unit tests

Signed-off-by: Xin Zhuang <stevenzzz@google.com>
  • Loading branch information
stevenzzzz authored and htuch committed Aug 22, 2019
1 parent b44a00b commit 69f805c
Show file tree
Hide file tree
Showing 20 changed files with 1,137 additions and 304 deletions.
5 changes: 4 additions & 1 deletion include/envoy/router/route_config_provider_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ class RouteConfigProviderManager {
* @param rds supplies the proto configuration of an RDS-configured RouteConfigProvider.
* @param factory_context is the context to use for the route config provider.
* @param stat_prefix supplies the stat_prefix to use for the provider stats.
* @param init_manager the Init::Manager used to coordinate initialization of a the underlying RDS
* subscription.
*/
virtual RouteConfigProviderPtr createRdsRouteConfigProvider(
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds,
Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix) PURE;
Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix,
Init::Manager& init_manager) PURE;

/**
* Get a RouteConfigSharedPtr for a statically defined route. Ownership is as described for
Expand Down
44 changes: 17 additions & 27 deletions source/common/config/config_provider_impl.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once

#include <memory>

#include "envoy/config/config_provider.h"
#include "envoy/config/config_provider_manager.h"
#include "envoy/init/manager.h"
Expand Down Expand Up @@ -146,9 +144,7 @@ class MutableConfigProviderCommonBase;
* shared ownership of the underlying subscription.
*
*/
class ConfigSubscriptionCommonBase
: protected Logger::Loggable<Logger::Id::config>,
public std::enable_shared_from_this<ConfigSubscriptionCommonBase> {
class ConfigSubscriptionCommonBase : protected Logger::Loggable<Logger::Id::config> {
public:
// Callback for updating a Config implementation held in each worker thread, the callback is
// called in applyConfigUpdate() with the current version Config, and is expected to return the
Expand Down Expand Up @@ -224,21 +220,14 @@ class ConfigSubscriptionCommonBase
*/
void applyConfigUpdate(
const ConfigUpdateCb& update_fn, const Event::PostCb& complete_cb = []() {}) {
// It is safe to call shared_from_this here as this is in main thread, and destruction of a
// ConfigSubscriptionCommonBase owner (i.e., a provider) happens in main thread as well.
auto shared_this = shared_from_this();
tls_->runOnAllThreads(
[this, update_fn]() {
tls_->getTyped<ThreadLocalConfig>().config_ = update_fn(this->getConfig());
// NOTE: there is a known race condition between *this* subscription being teared down in
// main thread and the posted callback being executed before the destruction. See more
// details in https://github.com/envoyproxy/envoy/issues/7902
tls_->getTyped<ThreadLocalConfig>().config_ = update_fn(getConfig());
},
// During the update propagation, a subscription may get teared down in main thread due to
// all owners/providers destructed in a xDS update (e.g. LDS demolishes a
// RouteConfigProvider and its subscription).
// If such a race condition happens, holding a reference to the "*this" subscription
// instance in this cb will ensure the shared "*this" gets posted back to main thread, after
// all the workers finish calling the update_fn, at which point it's safe to destruct
// "*this" instance.
[shared_this, complete_cb]() { complete_cb(); });
complete_cb);
}

void setLastUpdated() { last_updated_ = time_source_.systemTime(); }
Expand Down Expand Up @@ -287,8 +276,8 @@ class ConfigSubscriptionInstance : public ConfigSubscriptionCommonBase {

/**
* Must be called by the derived class' constructor.
* @param initial_config supplies an initial Envoy::Config::ConfigProvider::Config associated with
* the underlying subscription, shared across all providers and workers.
* @param initial_config supplies an initial Envoy::Config::ConfigProvider::Config associated
* with the underlying subscription, shared across all providers and workers.
*/
void initialize(const ConfigProvider::ConfigConstSharedPtr& initial_config) {
tls_->set([initial_config](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
Expand All @@ -302,7 +291,8 @@ class ConfigSubscriptionInstance : public ConfigSubscriptionCommonBase {
* @param config_proto supplies the newly received config proto.
* @param config_name supplies the name associated with the config.
* @param version_info supplies the version associated with the config.
* @return bool false when the config proto has no delta from the previous config, true otherwise.
* @return bool false when the config proto has no delta from the previous config, true
* otherwise.
*/
bool checkAndApplyConfigUpdate(const Protobuf::Message& config_proto,
const std::string& config_name, const std::string& version_info);
Expand Down Expand Up @@ -369,8 +359,8 @@ class MutableConfigProviderCommonBase : public ConfigProvider {
};

/**
* Provides generic functionality required by all config provider managers, such as managing shared
* lifetime of subscriptions and dynamic config providers, along with determining which
* Provides generic functionality required by all config provider managers, such as managing
* shared lifetime of subscriptions and dynamic config providers, along with determining which
* subscriptions should be associated with newly instantiated providers.
*
* The implementation of this class is not thread safe. Note that ImmutableConfigProviderBase
Expand All @@ -379,9 +369,9 @@ class MutableConfigProviderCommonBase : public ConfigProvider {
*
* All config processing is done on the main thread, so instantiation of *ConfigProvider* objects
* via createStaticConfigProvider() and createXdsConfigProvider() is naturally thread safe. Care
* must be taken with regards to destruction of these objects, since it must also happen on the main
* thread _prior_ to destruction of the ConfigProviderManagerImplBase object from which they were
* created.
* must be taken with regards to destruction of these objects, since it must also happen on the
* main thread _prior_ to destruction of the ConfigProviderManagerImplBase object from which they
* were created.
*
* This class can not be instantiated directly; instead, it provides the foundation for
* dynamic config provider implementations which derive from it.
Expand Down Expand Up @@ -415,8 +405,8 @@ class ConfigProviderManagerImplBase : public ConfigProviderManager, public Singl
const ConfigProviderSet& immutableConfigProviders(ConfigProviderInstanceType type) const;

/**
* Returns the subscription associated with the config_source_proto; if none exists, a new one is
* allocated according to the subscription_factory_fn.
* Returns the subscription associated with the config_source_proto; if none exists, a new one
* is allocated according to the subscription_factory_fn.
* @param config_source_proto supplies the proto specifying the config subscription parameters.
* @param init_manager supplies the init manager.
* @param subscription_factory_fn supplies a function to be called when a new subscription needs
Expand Down
20 changes: 10 additions & 10 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ envoy_cc_library(
"//include/envoy/singleton:instance_interface",
"//include/envoy/thread_local:thread_local_interface",
"//source/common/common:assert_lib",
"//source/common/common:callback_impl_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/config:rds_json_lib",
"//source/common/config:subscription_factory_lib",
Expand All @@ -146,22 +147,16 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "scoped_config_manager_lib",
srcs = ["scoped_config_manager.cc"],
hdrs = ["scoped_config_manager.h"],
deps = [
"@envoy_api//envoy/api/v2:srds_cc",
],
)

envoy_cc_library(
name = "scoped_config_lib",
srcs = ["scoped_config_impl.cc"],
hdrs = ["scoped_config_impl.h"],
external_deps = [
"abseil_str_format",
],
deps = [
":config_lib",
":scoped_config_manager_lib",
"//include/envoy/router:rds_interface",
"//include/envoy/router:scopes_interface",
"//include/envoy/thread_local:thread_local_interface",
"@envoy_api//envoy/api/v2:srds_cc",
Expand All @@ -174,13 +169,18 @@ envoy_cc_library(
srcs = ["scoped_rds.cc"],
hdrs = ["scoped_rds.h"],
deps = [
":rds_lib",
":scoped_config_lib",
"//include/envoy/config:config_provider_interface",
"//include/envoy/config:subscription_interface",
"//include/envoy/router:route_config_provider_manager_interface",
"//include/envoy/stats:stats_interface",
"//source/common/common:assert_lib",
"//source/common/common:cleanup_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/config:config_provider_lib",
"//source/common/init:manager_lib",
"//source/common/init:watcher_lib",
"@envoy_api//envoy/admin/v2alpha:config_dump_cc",
"@envoy_api//envoy/api/v2:srds_cc",
],
Expand Down
13 changes: 6 additions & 7 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ RouteConfigProviderPtr RouteConfigProviderUtil::create(
return route_config_provider_manager.createStaticRouteConfigProvider(config.route_config(),
factory_context);
case envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager::kRds:
return route_config_provider_manager.createRdsRouteConfigProvider(config.rds(), factory_context,
stat_prefix);
return route_config_provider_manager.createRdsRouteConfigProvider(
config.rds(), factory_context, stat_prefix, factory_context.initManager());
default:
NOT_REACHED_GCOVR_EXCL_LINE;
}
Expand Down Expand Up @@ -124,6 +124,7 @@ void RdsRouteConfigSubscription::onConfigUpdate(
}
vhds_subscription_.release();
}
update_callback_manager_.runCallbacks();
}

init_target_.ready();
Expand Down Expand Up @@ -218,8 +219,8 @@ RouteConfigProviderManagerImpl::RouteConfigProviderManagerImpl(Server::Admin& ad

Router::RouteConfigProviderPtr RouteConfigProviderManagerImpl::createRdsRouteConfigProvider(
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds,
Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix) {

Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix,
Init::Manager& init_manager) {
// RdsRouteConfigSubscriptions are unique based on their serialized RDS config.
const uint64_t manager_identifier = MessageUtil::hash(rds);

Expand All @@ -232,9 +233,7 @@ Router::RouteConfigProviderPtr RouteConfigProviderManagerImpl::createRdsRouteCon
// of simplicity.
subscription.reset(new RdsRouteConfigSubscription(rds, manager_identifier, factory_context,
stat_prefix, *this));

factory_context.initManager().add(subscription->init_target_);

init_manager.add(subscription->init_target_);
route_config_subscriptions_.insert({manager_identifier, subscription});
} else {
// Because the RouteConfigProviderManager's weak_ptrs only get cleaned up
Expand Down
15 changes: 13 additions & 2 deletions source/common/router/rds_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "envoy/stats/scope.h"
#include "envoy/thread_local/thread_local.h"

#include "common/common/callback_impl.h"
#include "common/common/logger.h"
#include "common/init/target_impl.h"
#include "common/protobuf/utility.h"
Expand All @@ -31,6 +32,9 @@
namespace Envoy {
namespace Router {

// For friend class declaration in RdsRouteConfigSubscription.
class ScopedRdsConfigSubscription;

/**
* Route configuration provider utilities.
*/
Expand Down Expand Up @@ -121,6 +125,10 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks,
return MessageUtil::anyConvert<envoy::api::v2::RouteConfiguration>(resource).name();
}

Common::CallbackHandle* addUpdateCallback(std::function<void()> callback) {
return update_callback_manager_.add(callback);
}

RdsRouteConfigSubscription(
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds,
const uint64_t manager_identifier, Server::Configuration::FactoryContext& factory_context,
Expand All @@ -142,8 +150,11 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks,
VhdsSubscriptionPtr vhds_subscription_;
RouteConfigUpdatePtr config_update_info_;
ProtobufMessage::ValidationVisitor& validation_visitor_;
Common::CallbackManager<> update_callback_manager_;

friend class RouteConfigProviderManagerImpl;
// Access to addUpdateCallback
friend class ScopedRdsConfigSubscription;
};

using RdsRouteConfigSubscriptionSharedPtr = std::shared_ptr<RdsRouteConfigSubscription>;
Expand Down Expand Up @@ -195,8 +206,8 @@ class RouteConfigProviderManagerImpl : public RouteConfigProviderManager,
// RouteConfigProviderManager
RouteConfigProviderPtr createRdsRouteConfigProvider(
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds,
Server::Configuration::FactoryContext& factory_context,
const std::string& stat_prefix) override;
Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix,
Init::Manager& init_manager) override;

RouteConfigProviderPtr
createStaticRouteConfigProvider(const envoy::api::v2::RouteConfiguration& route_config,
Expand Down
57 changes: 46 additions & 11 deletions source/common/router/scoped_config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@ bool ScopeKey::operator==(const ScopeKey& other) const {
// An empty key equals to nothing, "NULL" != "NULL".
return false;
}
return std::equal(fragments_.begin(), fragments_.end(), other.fragments_.begin(),
other.fragments_.end(),
[](const std::unique_ptr<ScopeKeyFragmentBase>& left,
const std::unique_ptr<ScopeKeyFragmentBase>& right) -> bool {
// Both should be non-NULL now.
return *left == *right;
});
return this->hash() == other.hash();
}

HeaderValueExtractorImpl::HeaderValueExtractorImpl(
Expand Down Expand Up @@ -76,6 +70,22 @@ HeaderValueExtractorImpl::computeFragment(const Http::HeaderMap& headers) const
return nullptr;
}

ScopedRouteInfo::ScopedRouteInfo(envoy::api::v2::ScopedRouteConfiguration&& config_proto,
ConfigConstSharedPtr&& route_config)
: config_proto_(std::move(config_proto)), route_config_(std::move(route_config)) {
// TODO(stevenzzzz): Maybe worth a KeyBuilder abstraction when there are more than one type of
// Fragment.
for (const auto& fragment : config_proto_.key().fragments()) {
switch (fragment.type_case()) {
case envoy::api::v2::ScopedRouteConfiguration::Key::Fragment::kStringKey:
scope_key_.addFragment(std::make_unique<StringKeyFragment>(fragment.string_key()));
break;
default:
NOT_REACHED_GCOVR_EXCL_LINE;
}
}
}

ScopeKeyBuilderImpl::ScopeKeyBuilderImpl(ScopedRoutes::ScopeKeyBuilder&& config)
: ScopeKeyBuilderBase(std::move(config)) {
for (const auto& fragment_builder : config_.fragments()) {
Expand Down Expand Up @@ -104,12 +114,37 @@ ScopeKeyBuilderImpl::computeScopeKey(const Http::HeaderMap& headers) const {
return std::make_unique<ScopeKey>(std::move(key));
}

void ScopedConfigImpl::addOrUpdateRoutingScope(const ScopedRouteInfoConstSharedPtr&) {}
void ScopedConfigImpl::addOrUpdateRoutingScope(
const ScopedRouteInfoConstSharedPtr& scoped_route_info) {
const auto iter = scoped_route_info_by_name_.find(scoped_route_info->scopeName());
if (iter != scoped_route_info_by_name_.end()) {
ASSERT(scoped_route_info_by_key_.contains(iter->second->scopeKey().hash()));
scoped_route_info_by_key_.erase(iter->second->scopeKey().hash());
}
scoped_route_info_by_name_[scoped_route_info->scopeName()] = scoped_route_info;
scoped_route_info_by_key_[scoped_route_info->scopeKey().hash()] = scoped_route_info;
}

void ScopedConfigImpl::removeRoutingScope(const std::string&) {}
void ScopedConfigImpl::removeRoutingScope(const std::string& scope_name) {
const auto iter = scoped_route_info_by_name_.find(scope_name);
if (iter != scoped_route_info_by_name_.end()) {
ASSERT(scoped_route_info_by_key_.contains(iter->second->scopeKey().hash()));
scoped_route_info_by_key_.erase(iter->second->scopeKey().hash());
scoped_route_info_by_name_.erase(iter);
}
}

Router::ConfigConstSharedPtr ScopedConfigImpl::getRouteConfig(const Http::HeaderMap&) const {
return std::make_shared<const NullConfigImpl>();
Router::ConfigConstSharedPtr
ScopedConfigImpl::getRouteConfig(const Http::HeaderMap& headers) const {
std::unique_ptr<ScopeKey> scope_key = scope_key_builder_.computeScopeKey(headers);
if (scope_key == nullptr) {
return nullptr;
}
auto iter = scoped_route_info_by_key_.find(scope_key->hash());
if (iter != scoped_route_info_by_key_.end()) {
return iter->second->routeConfig();
}
return nullptr;
}

} // namespace Router
Expand Down
Loading

0 comments on commit 69f805c

Please sign in to comment.