Skip to content

Commit

Permalink
Revert "init: order dynamic resource initialization to make RTDS alwa…
Browse files Browse the repository at this point in the history
…ys be first (#10362)" (#10919)

This reverts commit aaba081.

Signed-off-by: Raul Gutierrez Segales <rgs@pinterest.com>
  • Loading branch information
Raúl Gutiérrez Segalés authored Apr 23, 2020
1 parent 62777e8 commit 7f165e8
Show file tree
Hide file tree
Showing 21 changed files with 93 additions and 373 deletions.
8 changes: 0 additions & 8 deletions include/envoy/runtime/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,6 @@ class Loader {
public:
virtual ~Loader() = default;

using ReadyCallback = std::function<void()>;

/**
* Post-construction initialization. Runtime will be generally available after
* the constructor is finished, with the exception of dynamic RTDS layers,
Expand Down Expand Up @@ -288,12 +286,6 @@ class Loader {
* @param values the values to merge
*/
virtual void mergeValues(const std::unordered_map<std::string, std::string>& values) PURE;

/**
* Initiate all RTDS subscriptions. The `on_done` callback is invoked when all RTDS requests
* have either received and applied their responses or timed out.
*/
virtual void startRtdsSubscriptions(ReadyCallback on_done) PURE;
};

using LoaderPtr = std::unique_ptr<Loader>;
Expand Down
15 changes: 0 additions & 15 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ class ClusterManagerFactory;
/**
* Manages connection pools and load balancing for upstream clusters. The cluster manager is
* persistent and shared among multiple ongoing requests/connections.
* Cluster manager is initialized in two phases. In the first phase which begins at the construction
* all primary clusters (i.e. with endpoint assignments provisioned statically in bootstrap,
* discovered through DNS or file based CDS) are initialized.
* After the first phase has completed the server instance initializes services (i.e. RTDS) needed
* to successfully deploy the rest of dynamic configuration.
* In the second phase all secondary clusters (with endpoint assignments provisioned by xDS servers)
* are initialized and then the rest of the configuration provisioned through xDS.
*/
class ClusterManager {
public:
Expand All @@ -103,14 +96,6 @@ class ClusterManager {
*/
virtual void setInitializedCb(std::function<void()> callback) PURE;

/**
* Start initialization of secondary clusters and then dynamically configured clusters.
* The "initialized callback" set in the method above is invoked when secondary and
* dynamically provisioned clusters have finished initializing.
*/
virtual void
initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) PURE;

using ClusterInfoMap = std::unordered_map<std::string, std::reference_wrapper<const Cluster>>;

/**
Expand Down
2 changes: 0 additions & 2 deletions source/common/runtime/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ envoy_cc_library(
"//source/common/config:subscription_base_interface",
"//source/common/filesystem:directory_lib",
"//source/common/grpc:common_lib",
"//source/common/init:manager_lib",
"//source/common/init:target_lib",
"//source/common/init:watcher_lib",
"//source/common/protobuf:message_validator_lib",
"//source/common/protobuf:utility_lib",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
Expand Down
19 changes: 4 additions & 15 deletions source/common/runtime/runtime_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,11 @@ void ProtoLayer::walkProtoValue(const ProtobufWkt::Value& v, const std::string&

LoaderImpl::LoaderImpl(Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator& tls,
const envoy::config::bootstrap::v3::LayeredRuntime& config,
const LocalInfo::LocalInfo& local_info, Stats::Store& store,
RandomGenerator& generator,
const LocalInfo::LocalInfo& local_info, Init::Manager& init_manager,
Stats::Store& store, RandomGenerator& generator,
ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api)
: generator_(generator), stats_(generateStats(store)), tls_(tls.allocateSlot()),
config_(config), service_cluster_(local_info.clusterName()), api_(api),
init_watcher_("RDTS", [this]() { onRdtsReady(); }) {
config_(config), service_cluster_(local_info.clusterName()), api_(api) {
std::unordered_set<std::string> layer_names;
for (const auto& layer : config_.layers()) {
auto ret = layer_names.insert(layer.name());
Expand Down Expand Up @@ -498,7 +497,7 @@ LoaderImpl::LoaderImpl(Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::kRtdsLayer:
subscriptions_.emplace_back(
std::make_unique<RtdsSubscription>(*this, layer.rtds_layer(), store, validation_visitor));
init_manager_.add(subscriptions_.back()->init_target_);
init_manager.add(subscriptions_.back()->init_target_);
break;
default:
NOT_REACHED_GCOVR_EXCL_LINE;
Expand All @@ -510,16 +509,6 @@ LoaderImpl::LoaderImpl(Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator

void LoaderImpl::initialize(Upstream::ClusterManager& cm) { cm_ = &cm; }

void LoaderImpl::startRtdsSubscriptions(ReadyCallback on_done) {
on_rtds_initialized_ = on_done;
init_manager_.initialize(init_watcher_);
}

void LoaderImpl::onRdtsReady() {
ENVOY_LOG(info, "RTDS has finished initialization");
on_rtds_initialized_();
}

RtdsSubscription::RtdsSubscription(
LoaderImpl& parent, const envoy::config::bootstrap::v3::RuntimeLayer::RtdsLayer& rtds_layer,
Stats::Store& store, ProtobufMessage::ValidationVisitor& validation_visitor)
Expand Down
12 changes: 3 additions & 9 deletions source/common/runtime/runtime_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "common/common/logger.h"
#include "common/common/thread.h"
#include "common/config/subscription_base.h"
#include "common/init/manager_impl.h"
#include "common/init/target_impl.h"
#include "common/singleton/threadsafe_singleton.h"

Expand Down Expand Up @@ -243,16 +242,15 @@ class LoaderImpl : public Loader, Logger::Loggable<Logger::Id::runtime> {
public:
LoaderImpl(Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator& tls,
const envoy::config::bootstrap::v3::LayeredRuntime& config,
const LocalInfo::LocalInfo& local_info, Stats::Store& store,
RandomGenerator& generator, ProtobufMessage::ValidationVisitor& validation_visitor,
Api::Api& api);
const LocalInfo::LocalInfo& local_info, Init::Manager& init_manager,
Stats::Store& store, RandomGenerator& generator,
ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api);

// Runtime::Loader
void initialize(Upstream::ClusterManager& cm) override;
const Snapshot& snapshot() override;
std::shared_ptr<const Snapshot> threadsafeSnapshot() override;
void mergeValues(const std::unordered_map<std::string, std::string>& values) override;
void startRtdsSubscriptions(ReadyCallback on_done) override;

private:
friend RtdsSubscription;
Expand All @@ -262,7 +260,6 @@ class LoaderImpl : public Loader, Logger::Loggable<Logger::Id::runtime> {
// Load a new Snapshot into TLS
void loadNewSnapshot();
RuntimeStats generateStats(Stats::Store& store);
void onRdtsReady();

RandomGenerator& generator_;
RuntimeStats stats_;
Expand All @@ -272,9 +269,6 @@ class LoaderImpl : public Loader, Logger::Loggable<Logger::Id::runtime> {
const std::string service_cluster_;
Filesystem::WatcherPtr watcher_;
Api::Api& api_;
ReadyCallback on_rtds_initialized_;
Init::WatcherImpl init_watcher_;
Init::ManagerImpl init_manager_{"RTDS"};
std::vector<RtdsSubscriptionPtr> subscriptions_;
Upstream::ClusterManager* cm_{};

Expand Down
32 changes: 9 additions & 23 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ void ClusterManagerInitHelper::maybeFinishInitialize() {
// Do not do anything if we are still doing the initial static load or if we are waiting for
// CDS initialize.
ENVOY_LOG(debug, "maybe finish initialize state: {}", enumToInt(state_));
if (state_ == State::Loading || state_ == State::WaitingToStartCdsInitialization) {
if (state_ == State::Loading || state_ == State::WaitingForCdsInitialize) {
return;
}

// If we are still waiting for primary clusters to initialize, do nothing.
ASSERT(state_ == State::WaitingToStartSecondaryInitialization || state_ == State::CdsInitialized);
ASSERT(state_ == State::WaitingForStaticInitialize || state_ == State::CdsInitialized);
ENVOY_LOG(debug, "maybe finish initialize primary init clusters empty: {}",
primary_init_clusters_.empty());
if (!primary_init_clusters_.empty()) {
Expand Down Expand Up @@ -162,9 +162,9 @@ void ClusterManagerInitHelper::maybeFinishInitialize() {
// directly to initialized.
started_secondary_initialize_ = false;
ENVOY_LOG(debug, "maybe finish initialize cds api ready: {}", cds_ != nullptr);
if (state_ == State::WaitingToStartSecondaryInitialization && cds_) {
if (state_ == State::WaitingForStaticInitialize && cds_) {
ENVOY_LOG(info, "cm init: initializing cds");
state_ = State::WaitingToStartCdsInitialization;
state_ = State::WaitingForCdsInitialize;
cds_->initialize();
} else {
ENVOY_LOG(info, "cm init: all clusters initialized");
Expand All @@ -177,14 +177,7 @@ void ClusterManagerInitHelper::maybeFinishInitialize() {

void ClusterManagerInitHelper::onStaticLoadComplete() {
ASSERT(state_ == State::Loading);
// After initialization of primary clusters has completed, transition to
// waiting for signal to initialize secondary clusters and then CDS.
state_ = State::WaitingToStartSecondaryInitialization;
}

void ClusterManagerInitHelper::startInitializingSecondaryClusters() {
ASSERT(state_ == State::WaitingToStartSecondaryInitialization);
ENVOY_LOG(debug, "continue initializing secondary clusters");
state_ = State::WaitingForStaticInitialize;
maybeFinishInitialize();
}

Expand All @@ -193,7 +186,7 @@ void ClusterManagerInitHelper::setCds(CdsApi* cds) {
cds_ = cds;
if (cds_) {
cds_->setInitializedCb([this]() -> void {
ASSERT(state_ == State::WaitingToStartCdsInitialization);
ASSERT(state_ == State::WaitingForCdsInitialize);
state_ = State::CdsInitialized;
maybeFinishInitialize();
});
Expand Down Expand Up @@ -353,22 +346,15 @@ ClusterManagerImpl::ClusterManagerImpl(
init_helper_.onStaticLoadComplete();

ads_mux_->start();
}

void ClusterManagerImpl::initializeSecondaryClusters(
const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
init_helper_.startInitializingSecondaryClusters();

const auto& cm_config = bootstrap.cluster_manager();
if (cm_config.has_load_stats_config()) {
const auto& load_stats_config = cm_config.load_stats_config();

load_stats_reporter_ = std::make_unique<LoadStatsReporter>(
local_info_, *this, stats_,
local_info, *this, stats,
Config::Utility::factoryForGrpcApiConfigSource(*async_client_manager_, load_stats_config,
stats_, false)
stats, false)
->create(),
load_stats_config.transport_api_version(), dispatcher_);
load_stats_config.transport_api_version(), main_thread_dispatcher);
}
}

Expand Down
22 changes: 7 additions & 15 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,15 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
: cm_(cm), per_cluster_init_callback_(per_cluster_init_callback) {}

enum class State {
// Initial state. During this state all static clusters are loaded. Any primary clusters
// Initial state. During this state all static clusters are loaded. Any phase 1 clusters
// are immediately initialized.
Loading,
// During this state cluster manager waits to start initializing secondary clusters. In this
// state all
// primary clusters have completed initialization. Initialization of the secondary clusters
// is started by the `initializeSecondaryClusters` method.
WaitingToStartSecondaryInitialization,
// In this state cluster manager waits for all secondary clusters (if configured) to finish
// initialization. Then, if CDS is configured, this state tracks waiting for the first CDS
// response to populate dynamically configured clusters.
WaitingToStartCdsInitialization,
// During this state we wait for all static clusters to fully initialize. This requires
// completing phase 1 clusters, initializing phase 2 clusters, and then waiting for them.
WaitingForStaticInitialize,
// If CDS is configured, this state tracks waiting for the first CDS response to populate
// clusters.
WaitingForCdsInitialize,
// During this state, all CDS populated clusters are undergoing either phase 1 or phase 2
// initialization.
CdsInitialized,
Expand All @@ -136,8 +133,6 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
void setInitializedCb(std::function<void()> callback);
State state() const { return state_; }

void startInitializingSecondaryClusters();

private:
// To enable invariant assertions on the cluster lists.
friend ClusterManagerImpl;
Expand Down Expand Up @@ -247,9 +242,6 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

Config::SubscriptionFactory& subscriptionFactory() override { return subscription_factory_; }

void
initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) override;

protected:
virtual void postThreadLocalDrainConnections(const Cluster& cluster,
const HostVector& hosts_removed);
Expand Down
55 changes: 16 additions & 39 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -410,21 +410,6 @@ void InstanceImpl::initialize(const Options& options,
dispatcher_->initializeStats(stats_store_, "server.");
}

// The broad order of initialization from this point on is the following:
// 1. Statically provisioned configuration (bootstrap) are loaded.
// 2. Cluster manager is created and all primary clusters (i.e. with endpoint assignments
// provisioned statically in bootstrap, discovered through DNS or file based CDS) are
// initialized.
// 3. Various services are initialized and configured using the bootstrap config.
// 4. RTDS is initialized using primary clusters. This allows runtime overrides to be fully
// configured before the rest of xDS configuration is provisioned.
// 5. Secondary clusters (with endpoint assignments provisioned by xDS servers) are initialized.
// 6. The rest of the dynamic configuration is provisioned.
//
// Please note: this order requires that RTDS is provisioned using a primary cluster. If RTDS is
// provisioned through ADS then ADS must use primary cluster as well. This invariant is enforced
// during RTDS initialization and invalid configuration will be rejected.

// Runtime gets initialized before the main configuration since during main configuration
// load things may grab a reference to the loader for later use.
runtime_singleton_ = std::make_unique<Runtime::ScopedLoaderSingleton>(
Expand Down Expand Up @@ -459,27 +444,6 @@ void InstanceImpl::initialize(const Options& options,
// instantiated (which in turn relies on runtime...).
Runtime::LoaderSingleton::get().initialize(clusterManager());

// If RTDS was not configured the `onRuntimeReady` callback is immediately invoked.
Runtime::LoaderSingleton::get().startRtdsSubscriptions([this]() { onRuntimeReady(); });

for (Stats::SinkPtr& sink : config_.statsSinks()) {
stats_store_.addSink(*sink);
}

// Some of the stat sinks may need dispatcher support so don't flush until the main loop starts.
// Just setup the timer.
stat_flush_timer_ = dispatcher_->createTimer([this]() -> void { flushStats(); });
stat_flush_timer_->enableTimer(config_.statsFlushInterval());

// GuardDog (deadlock detection) object and thread setup before workers are
// started and before our own run() loop runs.
guard_dog_ = std::make_unique<Server::GuardDogImpl>(stats_store_, config_, *api_);
}

void InstanceImpl::onRuntimeReady() {
// Begin initializing secondary clusters after RTDS configuration has been applied.
clusterManager().initializeSecondaryClusters(bootstrap_);

if (bootstrap_.has_hds_config()) {
const auto& hds_config = bootstrap_.hds_config();
async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(
Expand All @@ -494,6 +458,19 @@ void InstanceImpl::onRuntimeReady() {
*config_.clusterManager(), *local_info_, *admin_, *singleton_manager_, thread_local_,
messageValidationContext().dynamicValidationVisitor(), *api_);
}

for (Stats::SinkPtr& sink : config_.statsSinks()) {
stats_store_.addSink(*sink);
}

// Some of the stat sinks may need dispatcher support so don't flush until the main loop starts.
// Just setup the timer.
stat_flush_timer_ = dispatcher_->createTimer([this]() -> void { flushStats(); });
stat_flush_timer_->enableTimer(config_.statsFlushInterval());

// GuardDog (deadlock detection) object and thread setup before workers are
// started and before our own run() loop runs.
guard_dog_ = std::make_unique<Server::GuardDogImpl>(stats_store_, config_, *api_);
}

void InstanceImpl::startWorkers() {
Expand All @@ -513,8 +490,8 @@ Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
ENVOY_LOG(info, "runtime: {}", MessageUtil::getYamlStringFromMessage(config.runtime()));
return std::make_unique<Runtime::LoaderImpl>(
server.dispatcher(), server.threadLocal(), config.runtime(), server.localInfo(),
server.stats(), server.random(), server.messageValidationContext().dynamicValidationVisitor(),
server.api());
server.initManager(), server.stats(), server.random(),
server.messageValidationContext().dynamicValidationVisitor(), server.api());
}

void InstanceImpl::loadServerFlags(const absl::optional<std::string>& flags_path) {
Expand Down Expand Up @@ -735,4 +712,4 @@ ProtobufTypes::MessagePtr InstanceImpl::dumpBootstrapConfig() {
}

} // namespace Server
} // namespace Envoy
} // namespace Envoy
5 changes: 1 addition & 4 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,
void terminate();
void notifyCallbacksForStage(
Stage stage, Event::PostCb completion_cb = [] {});
void onRuntimeReady();

using LifecycleNotifierCallbacks = std::list<StageCallback>;
using LifecycleNotifierCompletionCallbacks = std::list<StageCallbackWithCompletion>;
Expand All @@ -306,9 +305,6 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,
const Options& options_;
ProtobufMessage::ProdValidationContextImpl validation_context_;
TimeSource& time_source_;
// Delete local_info_ as late as possible as some members below may reference it during their
// destruction.
LocalInfo::LocalInfoPtr local_info_;
HotRestart& restarter_;
const time_t start_time_;
time_t original_start_time_;
Expand All @@ -332,6 +328,7 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,
Configuration::MainImpl config_;
Network::DnsResolverSharedPtr dns_resolver_;
Event::TimerPtr stat_flush_timer_;
LocalInfo::LocalInfoPtr local_info_;
DrainManagerPtr drain_manager_;
AccessLog::AccessLogManagerImpl access_log_manager_;
std::unique_ptr<Upstream::ClusterManagerFactory> cluster_manager_factory_;
Expand Down
Loading

0 comments on commit 7f165e8

Please sign in to comment.