Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion api/envoy/config/bootstrap/v3/bootstrap.proto
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ message Admin {
}

// Cluster manager :ref:`architecture overview <arch_overview_cluster_manager>`.
// [#next-free-field: 6]
// [#next-free-field: 7]
message ClusterManager {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.bootstrap.v2.ClusterManager";
Expand Down Expand Up @@ -500,6 +500,13 @@ message ClusterManager {
// <envoy_v3_api_enum_value_config.core.v3.ApiConfigSource.ApiType.GRPC>`.
core.v3.ApiConfigSource load_stats_config = 4;

// If set Envoy will use the :ref:`ads_config
// <envoy_v3_api_field_config.bootstrap.v3.Bootstrap.DynamicResources.ads_config>`
// config source as the server and use the same connection.
// If ``ads_config`` is not defined the bootstrap will be rejected.
// Cannot be used if ``load_stats_config`` is defined.
bool load_stats_over_ads_config_connection = 6;

// Whether the ClusterManager will create clusters on the worker threads
// inline during requests. This will save memory and CPU cycles in cases where
// there are lots of inactive clusters and > 1 worker thread.
Expand Down
6 changes: 6 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ new_features:
Added support for dropping stats via
:ref:`DropAction <envoy_v3_api_msg_extensions.stat_sinks.open_telemetry.v3.SinkConfig.DropAction>` during
custom metric conversion.
- area: lrs
change: |
Added :ref:`load_stats_over_ads_config_connection
<envoy_v3_api_field_config.bootstrap.v3.ClusterManager.load_stats_over_ads_config_connection>`
that makes the load reporting server use the primary server connection specified by the
:ref:`ads_config <envoy_v3_api_field_config.bootstrap.v3.Bootstrap.DynamicResources.ads_config>`.
- area: http
change: |
Added :ref:`vhost_header <envoy_v3_api_field_config.route.v3.RouteConfiguration.vhost_header>` to
Expand Down
8 changes: 8 additions & 0 deletions envoy/config/xds_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ class XdsManager {
virtual absl::Status
setAdsConfigSource(const envoy::config::core::v3::ApiConfigSource& config_source) PURE;

using AdsClientChangeCallback = std::function<void(Grpc::RawAsyncClientSharedPtr client)>;
/**
* Sets a callback that will be invoked when the ADS config source is replaced
* (via `setAdsConfigSource`).
* @param cb the callback function that will be invoked.
*/
virtual void setAdsClientChangeCallback(AdsClientChangeCallback cb) PURE;

/**
* Returns a shared_ptr to the singleton xDS-over-gRPC provider for upstream control plane muxing
* of xDS. This is treated somewhat as a special case in ClusterManager, since it does not relate
Expand Down
34 changes: 27 additions & 7 deletions source/common/config/xds_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@ absl::Status createGrpcClients(Grpc::AsyncClientManager& async_client_manager,
Stats::Scope& stats_scope, bool skip_cluster_check,
bool xdstp_config_source,
Grpc::RawAsyncClientSharedPtr& primary_client,
Grpc::RawAsyncClientSharedPtr& failover_client) {
Grpc::RawAsyncClientSharedPtr& failover_client,
bool create_shared_clients) {

if (Runtime::runtimeFeatureEnabled("envoy.restart_features.use_cached_grpc_client_for_xds")) {
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.use_cached_grpc_client_for_xds") ||
create_shared_clients) {
RETURN_IF_NOT_OK(createSharedClients(async_client_manager, api_config_source, stats_scope,
skip_cluster_check, xdstp_config_source, primary_client,
failover_client));
Expand Down Expand Up @@ -211,6 +213,10 @@ XdsManagerImpl::initializeAdsConnections(const envoy::config::bootstrap::v3::Boo
OptRef<XdsConfigTracker> xds_config_tracker =
makeOptRefFromPtr<XdsConfigTracker>(xds_config_tracker_.get());

// TODO(adisuissa): convert the use of create_shared_clients to a
// runtime-feature that will eventually be "true" by default.
const bool create_shared_clients =
bootstrap.cluster_manager().load_stats_over_ads_config_connection();
if (dyn_resources.ads_config().api_type() ==
envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) {
absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
Expand All @@ -230,7 +236,7 @@ XdsManagerImpl::initializeAdsConnections(const envoy::config::bootstrap::v3::Boo
RETURN_IF_NOT_OK(createGrpcClients(cm_->grpcAsyncClientManager(), dyn_resources.ads_config(),
*stats_.rootScope(), /*skip_cluster_check*/ false,
/*xdstp_config_source*/ false, primary_client,
failover_client));
failover_client, create_shared_clients));

ads_mux_ = factory->create(std::move(primary_client), std::move(failover_client),
main_thread_dispatcher_, random_, *stats_.rootScope(),
Expand All @@ -256,7 +262,7 @@ XdsManagerImpl::initializeAdsConnections(const envoy::config::bootstrap::v3::Boo
RETURN_IF_NOT_OK(createGrpcClients(cm_->grpcAsyncClientManager(), dyn_resources.ads_config(),
*stats_.rootScope(), /*skip_cluster_check*/ false,
/*xdstp_config_source*/ false, primary_client,
failover_client));
failover_client, create_shared_clients));
OptRef<XdsResourcesDelegate> xds_resources_delegate =
makeOptRefFromPtr<XdsResourcesDelegate>(xds_resources_delegate_.get());
ads_mux_ = factory->create(std::move(primary_client), std::move(failover_client),
Expand Down Expand Up @@ -382,6 +388,10 @@ XdsManagerImpl::setAdsConfigSource(const envoy::config::core::v3::ApiConfigSourc
return replaceAdsMux(config_source);
}

void XdsManagerImpl::setAdsClientChangeCallback(AdsClientChangeCallback cb) {
ads_client_change_callback_ = std::move(cb);
}

absl::StatusOr<XdsManagerImpl::AuthorityData>
XdsManagerImpl::createAuthority(const envoy::config::core::v3::ConfigSource& config_source,
bool allow_no_authority_names) {
Expand Down Expand Up @@ -455,7 +465,7 @@ XdsManagerImpl::createAuthority(const envoy::config::core::v3::ConfigSource& con
RETURN_IF_NOT_OK(createGrpcClients(cm_->grpcAsyncClientManager(), api_config_source,
*stats_.rootScope(), /*skip_cluster_check*/ false,
/*xdstp_config_source*/ true, primary_client,
failover_client));
failover_client, true));
authority_mux = factory->create(
std::move(primary_client), std::move(failover_client), main_thread_dispatcher_, random_,
*stats_.rootScope(), api_config_source, local_info_, std::move(custom_config_validators),
Expand All @@ -481,7 +491,7 @@ XdsManagerImpl::createAuthority(const envoy::config::core::v3::ConfigSource& con
RETURN_IF_NOT_OK(createGrpcClients(cm_->grpcAsyncClientManager(), api_config_source,
*stats_.rootScope(), /*skip_cluster_check*/ false,
/*xdstp_config_source*/ true, primary_client,
failover_client));
failover_client, true));
OptRef<XdsResourcesDelegate> xds_resources_delegate =
makeOptRefFromPtr<XdsResourcesDelegate>(xds_resources_delegate_.get());
authority_mux = factory->create(
Expand Down Expand Up @@ -556,19 +566,29 @@ XdsManagerImpl::replaceAdsMux(const envoy::config::core::v3::ApiConfigSource& ad
absl::Status status = Config::Utility::checkTransportVersion(ads_config);
RETURN_IF_NOT_OK(status);

// TODO(adisuissa): convert the use of create_shared_clients to a
// runtime-feature that will eventually be "true" by default.
const bool create_shared_clients =
bootstrap.cluster_manager().load_stats_over_ads_config_connection();
Grpc::RawAsyncClientSharedPtr primary_client;
Grpc::RawAsyncClientSharedPtr failover_client;
RETURN_IF_NOT_OK(createGrpcClients(
cm_->grpcAsyncClientManager(), ads_config, *stats_.rootScope(), /*skip_cluster_check*/ false,
/*xdstp_config_source*/ false, primary_client, failover_client));
/*xdstp_config_source*/ false, primary_client, failover_client, create_shared_clients));

// Primary client must not be null, as the primary xDS source must be a valid one.
// The failover_client may be null (no failover defined).
ASSERT(primary_client != nullptr);
Grpc::RawAsyncClientSharedPtr primary_client_for_callback = primary_client;

// This will cause a disconnect from the current sources, and replacement of the clients.
status = ads_mux_->updateMuxSource(std::move(primary_client), std::move(failover_client),
*stats_.rootScope(), std::move(backoff_strategy), ads_config);

// Notify an client modification watcher if defined.
if (status.ok() && ads_client_change_callback_) {
ads_client_change_callback_(std::move(primary_client_for_callback));
}
return status;
}

Expand Down
4 changes: 4 additions & 0 deletions source/common/config/xds_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class XdsManagerImpl : public XdsManager {
void shutdown() override { ads_mux_.reset(); }
absl::Status
setAdsConfigSource(const envoy::config::core::v3::ApiConfigSource& config_source) override;
void setAdsClientChangeCallback(AdsClientChangeCallback cb) override;

Config::GrpcMuxSharedPtr adsMux() override { return ads_mux_; }
SubscriptionFactory& subscriptionFactory() override { return *subscription_factory_; }
Expand Down Expand Up @@ -96,6 +97,9 @@ class XdsManagerImpl : public XdsManager {
// exist, or doesn't match. This will only be populated if default_config_source
// is defined in the bootstrap.
std::unique_ptr<AuthorityData> default_authority_;

// An optional callback that will be invoked when the ADS is replaced.
AdsClientChangeCallback ads_client_change_callback_;
};

} // namespace Config
Expand Down
42 changes: 42 additions & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,21 @@ absl::Status ClusterManagerImpl::initializeSecondaryClusters(
const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
init_helper_.startInitializingSecondaryClusters();

// Initialize the load-stats-reporter (LRS).
const auto& cm_config = bootstrap.cluster_manager();
if (cm_config.has_load_stats_config() && cm_config.load_stats_over_ads_config_connection()) {
return absl::InvalidArgumentError("Cannot configure both 'load_stats_config' and "
"'load_stats_over_xds_config_connection' in the bootstrap");
}

if (cm_config.has_load_stats_config()) {
const auto& load_stats_config = cm_config.load_stats_config();

absl::Status status = Config::Utility::checkTransportVersion(load_stats_config);
RETURN_IF_NOT_OK(status);
absl::StatusOr<Grpc::RawAsyncClientSharedPtr> client_or_error;
// TODO(adisuissa): remove this runtime flag code as it will be replaced by the
// 'load_stats_over_ads_config_connection' config knob.
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.use_cached_grpc_client_for_xds")) {
absl::StatusOr<Envoy::OptRef<const envoy::config::core::v3::GrpcService>> maybe_grpc_service =
Envoy::Config::Utility::getGrpcConfigFromApiConfigSource(load_stats_config,
Expand All @@ -528,6 +536,40 @@ absl::Status ClusterManagerImpl::initializeSecondaryClusters(
RETURN_IF_NOT_OK_REF(client_or_error.status());
load_stats_reporter_ = std::make_unique<LoadStatsReporter>(
local_info_, *this, *stats_.rootScope(), std::move(client_or_error.value()), dispatcher_);
} else if (cm_config.load_stats_over_ads_config_connection()) {
// Enabled load-stats over the ADS config.
if (!bootstrap.dynamic_resources().has_ads_config()) {
return absl::InvalidArgumentError("Cannot configure 'load_stats_over_xds_config_connection' "
"without configuring 'ads_config' in the bootstrap");
}
ENVOY_LOG_MISC(info, "Setting load-reports using the ads_config connection");
// The ADS mux should have been created prior to the load reporter
// initialization.
ASSERT(xds_manager_.adsMux() != nullptr);
const auto& ads_config_source = bootstrap.dynamic_resources().ads_config();
absl::StatusOr<Envoy::OptRef<const envoy::config::core::v3::GrpcService>> maybe_grpc_service =
Envoy::Config::Utility::getGrpcConfigFromApiConfigSource(ads_config_source,
/*grpc_service_idx*/ 0,
/*xdstp_config_source*/ false);
RETURN_IF_NOT_OK_REF(maybe_grpc_service.status());
absl::StatusOr<Grpc::RawAsyncClientSharedPtr> client_or_error;
if (maybe_grpc_service.value().has_value()) {
client_or_error = async_client_manager_->getOrCreateRawAsyncClientWithHashKey(
Grpc::GrpcServiceConfigWithHashKey(*maybe_grpc_service.value()), *stats_.rootScope(),
/*skip_cluster_check*/ false);
} else {
return absl::InvalidArgumentError(
"Invalid grpc service in the 'ads_config' when initializing LRS over ADS.");
}
RETURN_IF_NOT_OK_REF(client_or_error.status());
// Create the load reporter using the ADS client.
load_stats_reporter_ = std::make_unique<LoadStatsReporter>(
local_info_, *this, *stats_.rootScope(), std::move(client_or_error.value()), dispatcher_);
// Allow the xds-manager to replace the load_stats_reporter async-client if an update occurs in
// it.
xds_manager_.setAdsClientChangeCallback([&](Grpc::RawAsyncClientSharedPtr client) {
load_stats_reporter_->replaceAsyncClient(std::move(client));
});
}
return absl::OkStatus();
}
Expand Down
15 changes: 14 additions & 1 deletion source/common/upstream/load_stats_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ void LoadStatsReporter::setRetryTimer() {
}

void LoadStatsReporter::establishNewStream() {
ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString());
ENVOY_LOG(debug, "Establishing new gRPC bidi stream to {} for {}", async_client_.destination(),
service_method_.DebugString());
stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
if (stream_ == nullptr) {
ENVOY_LOG(warn, "Unable to establish new stream");
Expand All @@ -47,6 +48,18 @@ void LoadStatsReporter::establishNewStream() {
sendLoadStatsRequest();
}

void LoadStatsReporter::replaceAsyncClient(Grpc::RawAsyncClientSharedPtr&& async_client) {
ASSERT(async_client != nullptr);
ENVOY_LOG(info, "Load reporter stats client is being replaced. Disconnecting from the old one "
"and connecting to the new");
// Disconnect from current async_client and disable the retry timer. Resetting
stream_->resetStream();

// Set the new async_client, and try to connect.
async_client_ = std::move(async_client);
establishNewStream();
}

void LoadStatsReporter::sendLoadStatsRequest() {
// TODO(htuch): This sends load reports for only the set of clusters in clusters_, which
// was initialized in startLoadReportPeriod() the last time we either sent a load report
Expand Down
5 changes: 5 additions & 0 deletions source/common/upstream/load_stats_reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ class LoadStatsReporter
std::unique_ptr<envoy::service::load_stats::v3::LoadStatsResponse>&& message) override;
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override;
void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;

const LoadReporterStats& getStats() { return stats_; };

// Disconnects from the current async-client that is used by the reporter,
// replaces it with a new one, and connects to it.
void replaceAsyncClient(Grpc::RawAsyncClientSharedPtr&& async_client);

// TODO(htuch): Make this configurable or some static.
const uint32_t RETRY_DELAY_MS = 5000;

Expand Down
Loading
Loading