Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for explicit wildcard resource #16855

Merged
merged 59 commits into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
719661c
docs: Add an explicit wildcard mode
krnowak Jun 7, 2021
9b84dfd
config: Implement explicit wildcard mode in delta gRPC
krnowak Jun 7, 2021
a5f4ed7
test: Add tests for explicit wildcar mode in delta gRPC
krnowak Jun 7, 2021
d78ec97
docs: Reword wildcard mode docs
krnowak Jun 7, 2021
2e8ba91
config: Fix build
krnowak Jun 7, 2021
f4ca629
config: Fix build
krnowak Jun 8, 2021
c008bcc
Merge remote-tracking branch 'origin/main' into krnowak/explicit-wild…
krnowak Jul 2, 2021
3fa6a37
Drop the entry in version history
krnowak Jul 2, 2021
3914615
Always send an asterisk as a wildcard subscription
krnowak Jul 6, 2021
1d53769
Better comments, drop redundant tests
krnowak Jul 6, 2021
fa83949
Add constants for wildcards
krnowak Jul 6, 2021
6748f53
Test fixes
krnowak Jul 6, 2021
02e5370
Merge remote-tracking branch 'origin/main' into krnowak/explicit-wild…
krnowak Jul 7, 2021
872e737
Revert "Test fixes"
krnowak Jul 8, 2021
502de45
Revert "Better comments, drop redundant tests"
krnowak Jul 8, 2021
4ba0fc4
Revert "Always send an asterisk as a wildcard subscription"
krnowak Jul 8, 2021
4e34600
New attempt at implementing new wildcard subscriptions
krnowak Jul 8, 2021
98c408b
Merge remote-tracking branch 'origin/main' into krnowak/explicit-wild…
krnowak Jul 14, 2021
18e9f70
Formatting fixes
krnowak Jul 14, 2021
d6e273f
Update wildcard handling in xds delta subscription state
krnowak Jul 14, 2021
db884fe
Change Wildcard into constexpr string view
krnowak Jul 16, 2021
9339a0f
Merge remote-tracking branch 'origin/main' into krnowak/explicit-wild…
krnowak Aug 10, 2021
cb44431
Rework legacy wildcard subscription handling
krnowak Aug 5, 2021
fcc53aa
Merge remote-tracking branch 'origin/main' into krnowak/explicit-wild…
krnowak Aug 12, 2021
9a1855d
Fix typos
krnowak Aug 12, 2021
e71317f
Rename function to placate clang tidy
krnowak Aug 12, 2021
cb93839
Try to improve coverage
krnowak Aug 13, 2021
b7dee94
Factor out legacy wildcard checks to a separate function
krnowak Aug 13, 2021
61361b8
Document the tests
krnowak Aug 13, 2021
63bba65
Fix formatting
krnowak Aug 13, 2021
da12ed5
Merge remote-tracking branch 'origin/main' into krnowak/explicit-wild…
krnowak Aug 31, 2021
e249133
Fix build after merge
krnowak Aug 31, 2021
88587b0
Get rid of containerContains
krnowak Aug 31, 2021
da99fc7
Fix formatting
krnowak Sep 1, 2021
d9f0803
Document the resource state machine
krnowak Sep 1, 2021
74532e9
Ignore resources that we did not request
krnowak Sep 3, 2021
3d55048
Add a test for ignoring superfluous resources
krnowak Sep 3, 2021
f8b758f
Fix formatting
krnowak Sep 3, 2021
fd76a69
Drop one assert
krnowak Sep 9, 2021
4d20de1
Try to preserve the legacy wildcard status on ineffective unsubscript…
krnowak Sep 9, 2021
ab65125
Expand a bit more on the ambiguous resource category in comments
krnowak Sep 17, 2021
c726d04
Update the comments in the xds_mux variant too
krnowak Sep 20, 2021
f9d846d
Constify some variables
krnowak Sep 28, 2021
bae8643
Drop unused member
krnowak Sep 28, 2021
d10ec21
Gate the explicit wildcard resource support
krnowak Oct 1, 2021
508fc9c
Merge remote-tracking branch 'origin/main' into krnowak/explicit-wild…
krnowak Oct 1, 2021
726d655
Fixes
krnowak Oct 1, 2021
649af01
Build fixes
krnowak Oct 4, 2021
6f4724b
Test the old implementation too
krnowak Oct 4, 2021
7d78a00
docs: Add a note about xds changes
krnowak Oct 4, 2021
93ec187
Fix clang tidy
krnowak Oct 5, 2021
52a66ee
Run ADS integration tests together with old and new DSS
krnowak Oct 5, 2021
044bceb
Add DSS to dictionary
krnowak Oct 5, 2021
559e50a
Merge remote-tracking branch 'origin/main' into krnowak/explicit-wild…
krnowak Oct 5, 2021
9a34a6b
Fix version history after merge
krnowak Oct 5, 2021
5e9a130
Merge remote-tracking branch 'origin/main' into krnowak/explicit-wild…
krnowak Oct 19, 2021
36b872b
test: Fix redis ADS integration test params
krnowak Oct 19, 2021
5a2aac2
Merge remote-tracking branch 'origin/main' into krnowak/explicit-wild…
krnowak Oct 20, 2021
55dfdfa
Shard the redis integration test to avoid timeouts
krnowak Oct 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Incompatible Behavior Changes
-----------------------------
*Changes that are expected to cause an incompatibility if applicable; deployment changes are likely required*

* xds: ``*`` became a reserved name for a wildcard resource that can be subscribed to and unsubscribed from at any time. This is a requirement for implementing the on-demand xDSes (like on-demand CDS) that can subscribe to specific resources next to their wildcard subscription. If such xDS is subscribed to both wildcard resource and to other specific resource, then in stream reconnection scenario, the xDS will not send an empty initial request, but a request containing ``*`` for wildcard subscription and the rest of the resources the xDS is subscribed to. If the xDS is only subscribed to wildcard resource, it will try to send a legacy wildcard request. This behavior implements the recent changes in :ref:`xDS protocol <xds_protocol>` and can be temporarily reverted by setting the ``envoy.restart_features.explicit_wildcard_resource`` runtime guard to false.

Minor Behavior Changes
----------------------
*Changes that may cause incompatibilities for some users, but should not for most*
Expand Down
13 changes: 11 additions & 2 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,16 @@ envoy_cc_library(

envoy_cc_library(
name = "delta_subscription_state_lib",
srcs = ["delta_subscription_state.cc"],
hdrs = ["delta_subscription_state.h"],
srcs = [
"delta_subscription_state.cc",
"new_delta_subscription_state.cc",
"old_delta_subscription_state.cc",
],
hdrs = [
"delta_subscription_state.h",
"new_delta_subscription_state.h",
"old_delta_subscription_state.h",
],
deps = [
":api_version_lib",
":pausable_ack_queue_lib",
Expand Down Expand Up @@ -402,6 +410,7 @@ envoy_cc_library(
hdrs = ["watch_map.h"],
deps = [
":decoded_resource_lib",
":utility_lib",
":xds_resource_lib",
"//envoy/config:subscription_interface",
"//source/common/common:assert_lib",
Expand Down
260 changes: 60 additions & 200 deletions source/common/config/delta_subscription_state.cc
Original file line number Diff line number Diff line change
@@ -1,243 +1,103 @@
#include "source/common/config/delta_subscription_state.h"

#include "envoy/event/dispatcher.h"
#include "envoy/service/discovery/v3/discovery.pb.h"

#include "source/common/common/assert.h"
#include "source/common/common/hash.h"
#include "source/common/config/utility.h"
#include "source/common/runtime/runtime_features.h"

namespace Envoy {
namespace Config {
namespace {

DeltaSubscriptionStateVariant getState(std::string type_url,
UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info,
Event::Dispatcher& dispatcher) {
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.explicit_wildcard_resource")) {
return DeltaSubscriptionStateVariant(absl::in_place_type<NewDeltaSubscriptionState>,
std::move(type_url), watch_map, local_info, dispatcher);
} else {
return DeltaSubscriptionStateVariant(absl::in_place_type<OldDeltaSubscriptionState>,
std::move(type_url), watch_map, local_info, dispatcher);
}
}

} // namespace

DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info,
Event::Dispatcher& dispatcher, const bool wildcard)
// TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on
// empty resources as updates.
: supports_heartbeats_(type_url != "envoy.config.route.v3.VirtualHost"),
ttl_(
[this](const auto& expired) {
Protobuf::RepeatedPtrField<std::string> removed_resources;
for (const auto& resource : expired) {
setResourceWaitingForServer(resource);
removed_resources.Add(std::string(resource));
}

watch_map_.onConfigUpdate({}, removed_resources, "");
},
dispatcher, dispatcher.timeSource()),
type_url_(std::move(type_url)), wildcard_(wildcard), watch_map_(watch_map),
local_info_(local_info), dispatcher_(dispatcher) {}
Event::Dispatcher& dispatcher)
: state_(getState(std::move(type_url), watch_map, local_info, dispatcher)) {}

void DeltaSubscriptionState::updateSubscriptionInterest(
const absl::flat_hash_set<std::string>& cur_added,
const absl::flat_hash_set<std::string>& cur_removed) {
for (const auto& a : cur_added) {
setResourceWaitingForServer(a);
// If interest in a resource is removed-then-added (all before a discovery request
// can be sent), we must treat it as a "new" addition: our user may have forgotten its
// copy of the resource after instructing us to remove it, and need to be reminded of it.
names_removed_.erase(a);
names_added_.insert(a);
}
for (const auto& r : cur_removed) {
removeResourceState(r);
// Ideally, when interest in a resource is added-then-removed in between requests,
// we would avoid putting a superfluous "unsubscribe [resource that was never subscribed]"
// in the request. However, the removed-then-added case *does* need to go in the request,
// and due to how we accomplish that, it's difficult to distinguish remove-add-remove from
// add-remove (because "remove-add" has to be treated as equivalent to just "add").
names_added_.erase(r);
names_removed_.insert(r);
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
state->updateSubscriptionInterest(cur_added, cur_removed);
return;
}
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
state.updateSubscriptionInterest(cur_added, cur_removed);
}

// Not having sent any requests yet counts as an "update pending" since you're supposed to resend
// the entirety of your interest at the start of a stream, even if nothing has changed.
bool DeltaSubscriptionState::subscriptionUpdatePending() const {
return !names_added_.empty() || !names_removed_.empty() ||
!any_request_sent_yet_in_current_stream_ || must_send_discovery_request_;
void DeltaSubscriptionState::setMustSendDiscoveryRequest() {
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
state->setMustSendDiscoveryRequest();
return;
}
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
state.setMustSendDiscoveryRequest();
}

UpdateAck DeltaSubscriptionState::handleResponse(
const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
// We *always* copy the response's nonce into the next request, even if we're going to make that
// request a NACK by setting error_detail.
UpdateAck ack(message.nonce(), type_url_);
TRY_ASSERT_MAIN_THREAD { handleGoodResponse(message); }
END_TRY
catch (const EnvoyException& e) {
handleBadResponse(e, ack);
bool DeltaSubscriptionState::subscriptionUpdatePending() const {
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
return state->subscriptionUpdatePending();
}
return ack;
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
return state.subscriptionUpdatePending();
}

bool DeltaSubscriptionState::isHeartbeatResponse(
const envoy::service::discovery::v3::Resource& resource) const {
if (!supports_heartbeats_ &&
!Runtime::runtimeFeatureEnabled("envoy.reloadable_features.vhds_heartbeats")) {
return false;
}
const auto itr = resource_state_.find(resource.name());
if (itr == resource_state_.end()) {
return false;
void DeltaSubscriptionState::markStreamFresh() {
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
state->markStreamFresh();
return;
}

return !resource.has_resource() && !itr->second.waitingForServer() &&
resource.version() == itr->second.version();
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
state.markStreamFresh();
}

void DeltaSubscriptionState::handleGoodResponse(
UpdateAck DeltaSubscriptionState::handleResponse(
const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
absl::flat_hash_set<std::string> names_added_removed;
Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> non_heartbeat_resources;
for (const auto& resource : message.resources()) {
if (!names_added_removed.insert(resource.name()).second) {
throw EnvoyException(
fmt::format("duplicate name {} found among added/updated resources", resource.name()));
}
if (isHeartbeatResponse(resource)) {
continue;
}
non_heartbeat_resources.Add()->CopyFrom(resource);
// DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource
if (!resource.has_resource() && resource.aliases_size() > 0) {
continue;
}
if (message.type_url() != resource.resource().type_url()) {
throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match "
"the message-wide type URL {} in DeltaDiscoveryResponse {}",
resource.resource().type_url(), message.type_url(),
message.DebugString()));
}
}
for (const auto& name : message.removed_resources()) {
if (!names_added_removed.insert(name).second) {
throw EnvoyException(
fmt::format("duplicate name {} found in the union of added+removed resources", name));
}
}

{
const auto scoped_update = ttl_.scopedTtlUpdate();
for (const auto& resource : message.resources()) {
if (wildcard_ || resource_state_.contains(resource.name())) {
// Only consider tracked resources.
// NOTE: This is not gonna work for xdstp resources with glob resource matching.
addResourceState(resource);
}
}
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
return state->handleResponse(message);
}

watch_map_.onConfigUpdate(non_heartbeat_resources, message.removed_resources(),
message.system_version_info());

// If a resource is gone, there is no longer a meaningful version for it that makes sense to
// provide to the server upon stream reconnect: either it will continue to not exist, in which
// case saying nothing is fine, or the server will bring back something new, which we should
// receive regardless (which is the logic that not specifying a version will get you).
//
// So, leave the version map entry present but blank. It will be left out of
// initial_resource_versions messages, but will remind us to explicitly tell the server "I'm
// cancelling my subscription" when we lose interest.
for (const auto& resource_name : message.removed_resources()) {
if (resource_names_.find(resource_name) != resource_names_.end()) {
setResourceWaitingForServer(resource_name);
}
}
ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", type_url_,
message.resources().size(), message.removed_resources().size());
}

void DeltaSubscriptionState::handleBadResponse(const EnvoyException& e, UpdateAck& ack) {
// Note that error_detail being set is what indicates that a DeltaDiscoveryRequest is a NACK.
ack.error_detail_.set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
ack.error_detail_.set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
ENVOY_LOG(warn, "delta config for {} rejected: {}", type_url_, e.what());
watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
return state.handleResponse(message);
}

void DeltaSubscriptionState::handleEstablishmentFailure() {
watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
nullptr);
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
state->handleEstablishmentFailure();
return;
}
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
state.handleEstablishmentFailure();
}

envoy::service::discovery::v3::DeltaDiscoveryRequest
DeltaSubscriptionState::getNextRequestAckless() {
envoy::service::discovery::v3::DeltaDiscoveryRequest request;
must_send_discovery_request_ = false;
if (!any_request_sent_yet_in_current_stream_) {
any_request_sent_yet_in_current_stream_ = true;
// initial_resource_versions "must be populated for first request in a stream".
// Also, since this might be a new server, we must explicitly state *all* of our subscription
// interest.
for (auto const& [resource_name, resource_state] : resource_state_) {
// Populate initial_resource_versions with the resource versions we currently have.
// Resources we are interested in, but are still waiting to get any version of from the
// server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
if (!resource_state.waitingForServer()) {
(*request.mutable_initial_resource_versions())[resource_name] = resource_state.version();
}
// As mentioned above, fill resource_names_subscribe with everything, including names we
// have yet to receive any resource for unless this is a wildcard subscription, for which
// the first request on a stream must be without any resource names.
if (!wildcard_) {
names_added_.insert(resource_name);
}
}
// Wildcard subscription initial requests must have no resource_names_subscribe.
if (wildcard_) {
names_added_.clear();
}
names_removed_.clear();
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
return state->getNextRequestAckless();
}
std::copy(names_added_.begin(), names_added_.end(),
Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_subscribe()));
std::copy(names_removed_.begin(), names_removed_.end(),
Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_unsubscribe()));
names_added_.clear();
names_removed_.clear();

request.set_type_url(type_url_);
request.mutable_node()->MergeFrom(local_info_.node());
return request;
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
return state.getNextRequestAckless();
}

envoy::service::discovery::v3::DeltaDiscoveryRequest
DeltaSubscriptionState::getNextRequestWithAck(const UpdateAck& ack) {
envoy::service::discovery::v3::DeltaDiscoveryRequest request = getNextRequestAckless();
request.set_response_nonce(ack.nonce_);
if (ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
// Don't needlessly make the field present-but-empty if status is ok.
request.mutable_error_detail()->CopyFrom(ack.error_detail_);
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
return state->getNextRequestWithAck(ack);
}
return request;
}

void DeltaSubscriptionState::addResourceState(
const envoy::service::discovery::v3::Resource& resource) {
if (resource.has_ttl()) {
ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())),
resource.name());
} else {
ttl_.clear(resource.name());
}

resource_state_[resource.name()] = ResourceState(resource);
resource_names_.insert(resource.name());
}

void DeltaSubscriptionState::setResourceWaitingForServer(const std::string& resource_name) {
resource_state_[resource_name] = ResourceState();
resource_names_.insert(resource_name);
}

void DeltaSubscriptionState::removeResourceState(const std::string& resource_name) {
resource_state_.erase(resource_name);
resource_names_.erase(resource_name);
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
return state.getNextRequestWithAck(ack);
}

} // namespace Config
Expand Down
Loading