Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "init: order dynamic resource initialization to make RTDS alwa… #10919

Merged
merged 1 commit into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions include/envoy/runtime/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,6 @@ class Loader {
public:
virtual ~Loader() = default;

using ReadyCallback = std::function<void()>;

/**
* Post-construction initialization. Runtime will be generally available after
* the constructor is finished, with the exception of dynamic RTDS layers,
Expand Down Expand Up @@ -288,12 +286,6 @@ class Loader {
* @param values the values to merge
*/
virtual void mergeValues(const std::unordered_map<std::string, std::string>& values) PURE;

/**
* Initiate all RTDS subscriptions. The `on_done` callback is invoked when all RTDS requests
* have either received and applied their responses or timed out.
*/
virtual void startRtdsSubscriptions(ReadyCallback on_done) PURE;
};

using LoaderPtr = std::unique_ptr<Loader>;
Expand Down
15 changes: 0 additions & 15 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ class ClusterManagerFactory;
/**
* Manages connection pools and load balancing for upstream clusters. The cluster manager is
* persistent and shared among multiple ongoing requests/connections.
* Cluster manager is initialized in two phases. In the first phase which begins at the construction
* all primary clusters (i.e. with endpoint assignments provisioned statically in bootstrap,
* discovered through DNS or file based CDS) are initialized.
* After the first phase has completed the server instance initializes services (i.e. RTDS) needed
* to successfully deploy the rest of dynamic configuration.
* In the second phase all secondary clusters (with endpoint assignments provisioned by xDS servers)
* are initialized and then the rest of the configuration provisioned through xDS.
*/
class ClusterManager {
public:
Expand All @@ -103,14 +96,6 @@ class ClusterManager {
*/
virtual void setInitializedCb(std::function<void()> callback) PURE;

/**
* Start initialization of secondary clusters and then dynamically configured clusters.
* The "initialized callback" set in the method above is invoked when secondary and
* dynamically provisioned clusters have finished initializing.
*/
virtual void
initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) PURE;

using ClusterInfoMap = std::unordered_map<std::string, std::reference_wrapper<const Cluster>>;

/**
Expand Down
2 changes: 0 additions & 2 deletions source/common/runtime/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ envoy_cc_library(
"//source/common/config:subscription_base_interface",
"//source/common/filesystem:directory_lib",
"//source/common/grpc:common_lib",
"//source/common/init:manager_lib",
"//source/common/init:target_lib",
"//source/common/init:watcher_lib",
"//source/common/protobuf:message_validator_lib",
"//source/common/protobuf:utility_lib",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
Expand Down
19 changes: 4 additions & 15 deletions source/common/runtime/runtime_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,11 @@ void ProtoLayer::walkProtoValue(const ProtobufWkt::Value& v, const std::string&

LoaderImpl::LoaderImpl(Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator& tls,
const envoy::config::bootstrap::v3::LayeredRuntime& config,
const LocalInfo::LocalInfo& local_info, Stats::Store& store,
RandomGenerator& generator,
const LocalInfo::LocalInfo& local_info, Init::Manager& init_manager,
Stats::Store& store, RandomGenerator& generator,
ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api)
: generator_(generator), stats_(generateStats(store)), tls_(tls.allocateSlot()),
config_(config), service_cluster_(local_info.clusterName()), api_(api),
init_watcher_("RDTS", [this]() { onRdtsReady(); }) {
config_(config), service_cluster_(local_info.clusterName()), api_(api) {
std::unordered_set<std::string> layer_names;
for (const auto& layer : config_.layers()) {
auto ret = layer_names.insert(layer.name());
Expand Down Expand Up @@ -498,7 +497,7 @@ LoaderImpl::LoaderImpl(Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::kRtdsLayer:
subscriptions_.emplace_back(
std::make_unique<RtdsSubscription>(*this, layer.rtds_layer(), store, validation_visitor));
init_manager_.add(subscriptions_.back()->init_target_);
init_manager.add(subscriptions_.back()->init_target_);
break;
default:
NOT_REACHED_GCOVR_EXCL_LINE;
Expand All @@ -510,16 +509,6 @@ LoaderImpl::LoaderImpl(Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator

void LoaderImpl::initialize(Upstream::ClusterManager& cm) { cm_ = &cm; }

void LoaderImpl::startRtdsSubscriptions(ReadyCallback on_done) {
on_rtds_initialized_ = on_done;
init_manager_.initialize(init_watcher_);
}

void LoaderImpl::onRdtsReady() {
ENVOY_LOG(info, "RTDS has finished initialization");
on_rtds_initialized_();
}

RtdsSubscription::RtdsSubscription(
LoaderImpl& parent, const envoy::config::bootstrap::v3::RuntimeLayer::RtdsLayer& rtds_layer,
Stats::Store& store, ProtobufMessage::ValidationVisitor& validation_visitor)
Expand Down
12 changes: 3 additions & 9 deletions source/common/runtime/runtime_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "common/common/logger.h"
#include "common/common/thread.h"
#include "common/config/subscription_base.h"
#include "common/init/manager_impl.h"
#include "common/init/target_impl.h"
#include "common/singleton/threadsafe_singleton.h"

Expand Down Expand Up @@ -243,16 +242,15 @@ class LoaderImpl : public Loader, Logger::Loggable<Logger::Id::runtime> {
public:
LoaderImpl(Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator& tls,
const envoy::config::bootstrap::v3::LayeredRuntime& config,
const LocalInfo::LocalInfo& local_info, Stats::Store& store,
RandomGenerator& generator, ProtobufMessage::ValidationVisitor& validation_visitor,
Api::Api& api);
const LocalInfo::LocalInfo& local_info, Init::Manager& init_manager,
Stats::Store& store, RandomGenerator& generator,
ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api);

// Runtime::Loader
void initialize(Upstream::ClusterManager& cm) override;
const Snapshot& snapshot() override;
std::shared_ptr<const Snapshot> threadsafeSnapshot() override;
void mergeValues(const std::unordered_map<std::string, std::string>& values) override;
void startRtdsSubscriptions(ReadyCallback on_done) override;

private:
friend RtdsSubscription;
Expand All @@ -262,7 +260,6 @@ class LoaderImpl : public Loader, Logger::Loggable<Logger::Id::runtime> {
// Load a new Snapshot into TLS
void loadNewSnapshot();
RuntimeStats generateStats(Stats::Store& store);
void onRdtsReady();

RandomGenerator& generator_;
RuntimeStats stats_;
Expand All @@ -272,9 +269,6 @@ class LoaderImpl : public Loader, Logger::Loggable<Logger::Id::runtime> {
const std::string service_cluster_;
Filesystem::WatcherPtr watcher_;
Api::Api& api_;
ReadyCallback on_rtds_initialized_;
Init::WatcherImpl init_watcher_;
Init::ManagerImpl init_manager_{"RTDS"};
std::vector<RtdsSubscriptionPtr> subscriptions_;
Upstream::ClusterManager* cm_{};

Expand Down
32 changes: 9 additions & 23 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ void ClusterManagerInitHelper::maybeFinishInitialize() {
// Do not do anything if we are still doing the initial static load or if we are waiting for
// CDS initialize.
ENVOY_LOG(debug, "maybe finish initialize state: {}", enumToInt(state_));
if (state_ == State::Loading || state_ == State::WaitingToStartCdsInitialization) {
if (state_ == State::Loading || state_ == State::WaitingForCdsInitialize) {
return;
}

// If we are still waiting for primary clusters to initialize, do nothing.
ASSERT(state_ == State::WaitingToStartSecondaryInitialization || state_ == State::CdsInitialized);
ASSERT(state_ == State::WaitingForStaticInitialize || state_ == State::CdsInitialized);
ENVOY_LOG(debug, "maybe finish initialize primary init clusters empty: {}",
primary_init_clusters_.empty());
if (!primary_init_clusters_.empty()) {
Expand Down Expand Up @@ -162,9 +162,9 @@ void ClusterManagerInitHelper::maybeFinishInitialize() {
// directly to initialized.
started_secondary_initialize_ = false;
ENVOY_LOG(debug, "maybe finish initialize cds api ready: {}", cds_ != nullptr);
if (state_ == State::WaitingToStartSecondaryInitialization && cds_) {
if (state_ == State::WaitingForStaticInitialize && cds_) {
ENVOY_LOG(info, "cm init: initializing cds");
state_ = State::WaitingToStartCdsInitialization;
state_ = State::WaitingForCdsInitialize;
cds_->initialize();
} else {
ENVOY_LOG(info, "cm init: all clusters initialized");
Expand All @@ -177,14 +177,7 @@ void ClusterManagerInitHelper::maybeFinishInitialize() {

void ClusterManagerInitHelper::onStaticLoadComplete() {
ASSERT(state_ == State::Loading);
// After initialization of primary clusters has completed, transition to
// waiting for signal to initialize secondary clusters and then CDS.
state_ = State::WaitingToStartSecondaryInitialization;
}

void ClusterManagerInitHelper::startInitializingSecondaryClusters() {
ASSERT(state_ == State::WaitingToStartSecondaryInitialization);
ENVOY_LOG(debug, "continue initializing secondary clusters");
state_ = State::WaitingForStaticInitialize;
maybeFinishInitialize();
}

Expand All @@ -193,7 +186,7 @@ void ClusterManagerInitHelper::setCds(CdsApi* cds) {
cds_ = cds;
if (cds_) {
cds_->setInitializedCb([this]() -> void {
ASSERT(state_ == State::WaitingToStartCdsInitialization);
ASSERT(state_ == State::WaitingForCdsInitialize);
state_ = State::CdsInitialized;
maybeFinishInitialize();
});
Expand Down Expand Up @@ -353,22 +346,15 @@ ClusterManagerImpl::ClusterManagerImpl(
init_helper_.onStaticLoadComplete();

ads_mux_->start();
}

void ClusterManagerImpl::initializeSecondaryClusters(
const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
init_helper_.startInitializingSecondaryClusters();

const auto& cm_config = bootstrap.cluster_manager();
if (cm_config.has_load_stats_config()) {
const auto& load_stats_config = cm_config.load_stats_config();

load_stats_reporter_ = std::make_unique<LoadStatsReporter>(
local_info_, *this, stats_,
local_info, *this, stats,
Config::Utility::factoryForGrpcApiConfigSource(*async_client_manager_, load_stats_config,
stats_, false)
stats, false)
->create(),
load_stats_config.transport_api_version(), dispatcher_);
load_stats_config.transport_api_version(), main_thread_dispatcher);
}
}

Expand Down
22 changes: 7 additions & 15 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,15 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
: cm_(cm), per_cluster_init_callback_(per_cluster_init_callback) {}

enum class State {
// Initial state. During this state all static clusters are loaded. Any primary clusters
// Initial state. During this state all static clusters are loaded. Any phase 1 clusters
// are immediately initialized.
Loading,
// During this state cluster manager waits to start initializing secondary clusters. In this
// state all
// primary clusters have completed initialization. Initialization of the secondary clusters
// is started by the `initializeSecondaryClusters` method.
WaitingToStartSecondaryInitialization,
// In this state cluster manager waits for all secondary clusters (if configured) to finish
// initialization. Then, if CDS is configured, this state tracks waiting for the first CDS
// response to populate dynamically configured clusters.
WaitingToStartCdsInitialization,
// During this state we wait for all static clusters to fully initialize. This requires
// completing phase 1 clusters, initializing phase 2 clusters, and then waiting for them.
WaitingForStaticInitialize,
// If CDS is configured, this state tracks waiting for the first CDS response to populate
// clusters.
WaitingForCdsInitialize,
// During this state, all CDS populated clusters are undergoing either phase 1 or phase 2
// initialization.
CdsInitialized,
Expand All @@ -136,8 +133,6 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
void setInitializedCb(std::function<void()> callback);
State state() const { return state_; }

void startInitializingSecondaryClusters();

private:
// To enable invariant assertions on the cluster lists.
friend ClusterManagerImpl;
Expand Down Expand Up @@ -247,9 +242,6 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

Config::SubscriptionFactory& subscriptionFactory() override { return subscription_factory_; }

void
initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) override;

protected:
virtual void postThreadLocalDrainConnections(const Cluster& cluster,
const HostVector& hosts_removed);
Expand Down
55 changes: 16 additions & 39 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,21 +378,6 @@ void InstanceImpl::initialize(const Options& options,
dispatcher_->initializeStats(stats_store_, "server.");
}

// The broad order of initialization from this point on is the following:
// 1. Statically provisioned configuration (bootstrap) are loaded.
// 2. Cluster manager is created and all primary clusters (i.e. with endpoint assignments
// provisioned statically in bootstrap, discovered through DNS or file based CDS) are
// initialized.
// 3. Various services are initialized and configured using the bootstrap config.
// 4. RTDS is initialized using primary clusters. This allows runtime overrides to be fully
// configured before the rest of xDS configuration is provisioned.
// 5. Secondary clusters (with endpoint assignments provisioned by xDS servers) are initialized.
// 6. The rest of the dynamic configuration is provisioned.
//
// Please note: this order requires that RTDS is provisioned using a primary cluster. If RTDS is
// provisioned through ADS then ADS must use primary cluster as well. This invariant is enforced
// during RTDS initialization and invalid configuration will be rejected.

// Runtime gets initialized before the main configuration since during main configuration
// load things may grab a reference to the loader for later use.
runtime_singleton_ = std::make_unique<Runtime::ScopedLoaderSingleton>(
Expand Down Expand Up @@ -427,27 +412,6 @@ void InstanceImpl::initialize(const Options& options,
// instantiated (which in turn relies on runtime...).
Runtime::LoaderSingleton::get().initialize(clusterManager());

// If RTDS was not configured the `onRuntimeReady` callback is immediately invoked.
Runtime::LoaderSingleton::get().startRtdsSubscriptions([this]() { onRuntimeReady(); });

for (Stats::SinkPtr& sink : config_.statsSinks()) {
stats_store_.addSink(*sink);
}

// Some of the stat sinks may need dispatcher support so don't flush until the main loop starts.
// Just setup the timer.
stat_flush_timer_ = dispatcher_->createTimer([this]() -> void { flushStats(); });
stat_flush_timer_->enableTimer(config_.statsFlushInterval());

// GuardDog (deadlock detection) object and thread setup before workers are
// started and before our own run() loop runs.
guard_dog_ = std::make_unique<Server::GuardDogImpl>(stats_store_, config_, *api_);
}

void InstanceImpl::onRuntimeReady() {
// Begin initializing secondary clusters after RTDS configuration has been applied.
clusterManager().initializeSecondaryClusters(bootstrap_);

if (bootstrap_.has_hds_config()) {
const auto& hds_config = bootstrap_.hds_config();
async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(
Expand All @@ -462,6 +426,19 @@ void InstanceImpl::onRuntimeReady() {
*config_.clusterManager(), *local_info_, *admin_, *singleton_manager_, thread_local_,
messageValidationContext().dynamicValidationVisitor(), *api_);
}

for (Stats::SinkPtr& sink : config_.statsSinks()) {
stats_store_.addSink(*sink);
}

// Some of the stat sinks may need dispatcher support so don't flush until the main loop starts.
// Just setup the timer.
stat_flush_timer_ = dispatcher_->createTimer([this]() -> void { flushStats(); });
stat_flush_timer_->enableTimer(config_.statsFlushInterval());

// GuardDog (deadlock detection) object and thread setup before workers are
// started and before our own run() loop runs.
guard_dog_ = std::make_unique<Server::GuardDogImpl>(stats_store_, config_, *api_);
}

void InstanceImpl::startWorkers() {
Expand All @@ -481,8 +458,8 @@ Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
ENVOY_LOG(info, "runtime: {}", MessageUtil::getYamlStringFromMessage(config.runtime()));
return std::make_unique<Runtime::LoaderImpl>(
server.dispatcher(), server.threadLocal(), config.runtime(), server.localInfo(),
server.stats(), server.random(), server.messageValidationContext().dynamicValidationVisitor(),
server.api());
server.initManager(), server.stats(), server.random(),
server.messageValidationContext().dynamicValidationVisitor(), server.api());
}

void InstanceImpl::loadServerFlags(const absl::optional<std::string>& flags_path) {
Expand Down Expand Up @@ -703,4 +680,4 @@ ProtobufTypes::MessagePtr InstanceImpl::dumpBootstrapConfig() {
}

} // namespace Server
} // namespace Envoy
} // namespace Envoy
5 changes: 1 addition & 4 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,
void terminate();
void notifyCallbacksForStage(
Stage stage, Event::PostCb completion_cb = [] {});
void onRuntimeReady();

using LifecycleNotifierCallbacks = std::list<StageCallback>;
using LifecycleNotifierCompletionCallbacks = std::list<StageCallbackWithCompletion>;
Expand All @@ -306,9 +305,6 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,
const Options& options_;
ProtobufMessage::ProdValidationContextImpl validation_context_;
TimeSource& time_source_;
// Delete local_info_ as late as possible as some members below may reference it during their
// destruction.
LocalInfo::LocalInfoPtr local_info_;
HotRestart& restarter_;
const time_t start_time_;
time_t original_start_time_;
Expand All @@ -332,6 +328,7 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,
Configuration::MainImpl config_;
Network::DnsResolverSharedPtr dns_resolver_;
Event::TimerPtr stat_flush_timer_;
LocalInfo::LocalInfoPtr local_info_;
DrainManagerPtr drain_manager_;
AccessLog::AccessLogManagerImpl access_log_manager_;
std::unique_ptr<Upstream::ClusterManagerFactory> cluster_manager_factory_;
Expand Down
Loading