-
Notifications
You must be signed in to change notification settings - Fork 4.8k
/
cds_api_impl.cc
127 lines (112 loc) · 5.08 KB
/
cds_api_impl.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include "common/upstream/cds_api_impl.h"
#include <string>
#include "envoy/api/v2/cluster.pb.h"
#include "envoy/config/core/v3/config_source.pb.h"
#include "envoy/service/discovery/v3/discovery.pb.h"
#include "envoy/stats/scope.h"
#include "common/common/assert.h"
#include "common/common/cleanup.h"
#include "common/common/utility.h"
#include "common/config/api_version.h"
#include "common/config/utility.h"
#include "common/protobuf/utility.h"
#include "absl/container/node_hash_set.h"
#include "absl/strings/str_join.h"
namespace Envoy {
namespace Upstream {
CdsApiPtr CdsApiImpl::create(const envoy::config::core::v3::ConfigSource& cds_config,
ClusterManager& cm, Stats::Scope& scope,
ProtobufMessage::ValidationVisitor& validation_visitor) {
return CdsApiPtr{new CdsApiImpl(cds_config, cm, scope, validation_visitor)};
}
CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config, ClusterManager& cm,
Stats::Scope& scope, ProtobufMessage::ValidationVisitor& validation_visitor)
: Envoy::Config::SubscriptionBase<envoy::config::cluster::v3::Cluster>(
cds_config.resource_api_version(), validation_visitor, "name"),
cm_(cm), scope_(scope.createScope("cluster_manager.cds.")) {
const auto resource_name = getResourceName();
subscription_ = cm_.subscriptionFactory().subscriptionFromConfigSource(
cds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_);
}
void CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) {
auto all_existing_clusters = cm_.clusters();
// Exclude the clusters which CDS wants to add.
for (const auto& resource : resources) {
all_existing_clusters.active_clusters_.erase(resource.get().name());
all_existing_clusters.warming_clusters_.erase(resource.get().name());
}
Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
for (const auto& [cluster_name, _] : all_existing_clusters.active_clusters_) {
*to_remove_repeated.Add() = cluster_name;
}
for (const auto& [cluster_name, _] : all_existing_clusters.warming_clusters_) {
// Do not add the cluster twice when the cluster is both active and warming.
if (all_existing_clusters.active_clusters_.count(cluster_name) == 0) {
*to_remove_repeated.Add() = cluster_name;
}
}
onConfigUpdate(resources, to_remove_repeated, version_info);
}
void CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) {
Config::ScopedResume maybe_resume_eds;
if (cm_.adsMux()) {
const auto type_urls =
Config::getAllVersionTypeUrls<envoy::config::endpoint::v3::ClusterLoadAssignment>();
maybe_resume_eds = cm_.adsMux()->pause(type_urls);
}
ENVOY_LOG(info, "cds: add {} cluster(s), remove {} cluster(s)", added_resources.size(),
removed_resources.size());
std::vector<std::string> exception_msgs;
absl::flat_hash_set<std::string> cluster_names(added_resources.size());
bool any_applied = false;
for (const auto& resource : added_resources) {
envoy::config::cluster::v3::Cluster cluster;
try {
cluster = dynamic_cast<const envoy::config::cluster::v3::Cluster&>(resource.get().resource());
if (!cluster_names.insert(cluster.name()).second) {
// NOTE: at this point, the first of these duplicates has already been successfully applied.
throw EnvoyException(fmt::format("duplicate cluster {} found", cluster.name()));
}
if (cm_.addOrUpdateCluster(cluster, resource.get().version())) {
any_applied = true;
ENVOY_LOG(info, "cds: add/update cluster '{}'", cluster.name());
} else {
ENVOY_LOG(debug, "cds: add/update cluster '{}' skipped", cluster.name());
}
} catch (const EnvoyException& e) {
exception_msgs.push_back(fmt::format("{}: {}", cluster.name(), e.what()));
}
}
for (const auto& resource_name : removed_resources) {
if (cm_.removeCluster(resource_name)) {
any_applied = true;
ENVOY_LOG(info, "cds: remove cluster '{}'", resource_name);
}
}
if (any_applied) {
system_version_info_ = system_version_info;
}
runInitializeCallbackIfAny();
if (!exception_msgs.empty()) {
throw EnvoyException(
fmt::format("Error adding/updating cluster(s) {}", absl::StrJoin(exception_msgs, ", ")));
}
}
void CdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
runInitializeCallbackIfAny();
}
void CdsApiImpl::runInitializeCallbackIfAny() {
if (initialize_callback_) {
initialize_callback_();
initialize_callback_ = nullptr;
}
}
} // namespace Upstream
} // namespace Envoy