Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delta-xds: avoid sending resource names for wildcard requests on stream reconnect #16153

Merged
merged 6 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions api/xds_protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -771,10 +771,13 @@ Later the xDS client spontaneously requests the "wc" resource.
:alt: Incremental session example

On reconnect the Incremental xDS client may tell the server of its known
resources to avoid resending them over the network. Because no state is
assumed to be preserved from the previous stream, the reconnecting
client must provide the server with all resource names it is interested
in.
resources to avoid resending them over the network by sending them in
:ref:`initial_resource_versions <envoy_api_field_DeltaDiscoveryRequest.initial_resource_versions>`.
Because no state is assumed to be preserved from the previous stream, the reconnecting
client must provide the server with all resource names it is interested in. Note that for wildcard
requests (CDS/LDS/SRDS), the request must have no resources in both
:ref:`resource_names_subscribe <envoy_api_field_DeltaDiscoveryRequest.resource_names_subscribe>` and
:ref:`resource_names_unsubscribe <envoy_api_field_DeltaDiscoveryRequest.resource_names_unsubscribe>`.

.. figure:: diagrams/incremental-reconnect.svg
:alt: Incremental reconnect example
Expand Down
17 changes: 12 additions & 5 deletions source/common/config/delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Config {
DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info,
Event::Dispatcher& dispatcher)
Event::Dispatcher& dispatcher, const bool wildcard)
// TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on
// empty resources as updates.
: supports_heartbeats_(type_url != "envoy.config.route.v3.VirtualHost"),
Expand All @@ -29,8 +29,8 @@ DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
watch_map_.onConfigUpdate({}, removed_resources, "");
},
dispatcher, dispatcher.timeSource()),
type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info),
dispatcher_(dispatcher) {}
type_url_(std::move(type_url)), wildcard_(wildcard), watch_map_(watch_map),
local_info_(local_info), dispatcher_(dispatcher) {}

void DeltaSubscriptionState::updateSubscriptionInterest(
const absl::flat_hash_set<std::string>& cur_added,
Expand Down Expand Up @@ -178,8 +178,15 @@ DeltaSubscriptionState::getNextRequestAckless() {
(*request.mutable_initial_resource_versions())[resource_name] = resource_state.version();
}
// As mentioned above, fill resource_names_subscribe with everything, including names we
// have yet to receive any resource for.
names_added_.insert(resource_name);
// have yet to receive any resource for unless this is a wildcard subscription, for which
// the first request on a stream must be without any resource names.
if (!wildcard_) {
names_added_.insert(resource_name);
}
}
// Wildcard subscription initial requests must have no resource_names_subscribe.
if (wildcard_) {
names_added_.clear();
}
names_removed_.clear();
}
Expand Down
5 changes: 4 additions & 1 deletion source/common/config/delta_subscription_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ namespace Config {
class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
public:
DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher);
const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher,
const bool wildcard);

// Update which resources we're interested in subscribing to.
void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added,
Expand Down Expand Up @@ -103,6 +104,8 @@ class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
absl::flat_hash_set<std::string> resource_names_;

const std::string type_url_;
// Is the subscription is for a wildcard request.
const bool wildcard_;
UntypedConfigUpdateCallbacks& watch_map_;
const LocalInfo::LocalInfo& local_info_;
Event::Dispatcher& dispatcher_;
Expand Down
12 changes: 7 additions & 5 deletions source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
if (enable_type_url_downgrade_and_upgrade_) {
registerVersionedTypeUrl(type_url);
}
addSubscription(type_url, options.use_namespace_matching_);
// No resources implies that this is a wildcard request subscription.
addSubscription(type_url, options.use_namespace_matching_, resources.empty());
return addWatch(type_url, resources, callbacks, resource_decoder, options);
}

Expand Down Expand Up @@ -225,10 +226,11 @@ void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
entry->second->watch_map_.removeWatch(watch);
}

void NewGrpcMuxImpl::addSubscription(const std::string& type_url,
const bool use_namespace_matching) {
subscriptions_.emplace(type_url, std::make_unique<SubscriptionStuff>(
type_url, local_info_, use_namespace_matching, dispatcher_));
void NewGrpcMuxImpl::addSubscription(const std::string& type_url, const bool use_namespace_matching,
const bool wildcard) {
subscriptions_.emplace(type_url, std::make_unique<SubscriptionStuff>(type_url, local_info_,
use_namespace_matching,
dispatcher_, wildcard));
subscription_ordering_.emplace_back(type_url);
}

Expand Down
9 changes: 6 additions & 3 deletions source/common/config/new_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ class NewGrpcMuxImpl

struct SubscriptionStuff {
SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
const bool use_namespace_matching, Event::Dispatcher& dispatcher)
const bool use_namespace_matching, Event::Dispatcher& dispatcher,
const bool wildcard)
: watch_map_(use_namespace_matching),
sub_state_(type_url, watch_map_, local_info, dispatcher) {}
sub_state_(type_url, watch_map_, local_info, dispatcher, wildcard) {}

WatchMap watch_map_;
DeltaSubscriptionState sub_state_;
Expand Down Expand Up @@ -129,7 +130,9 @@ class NewGrpcMuxImpl
const absl::flat_hash_set<std::string>& resources,
const SubscriptionOptions& options);

void addSubscription(const std::string& type_url, bool use_namespace_matching);
// Adds a subscription for the type_url to the subscriptions map and order list.
void addSubscription(const std::string& type_url, bool use_namespace_matching,
const bool wildcard);

void trySendDiscoveryRequests();

Expand Down
59 changes: 47 additions & 12 deletions test/common/config/delta_subscription_state_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using testing::NiceMock;
using testing::Throw;
using testing::UnorderedElementsAre;
using testing::UnorderedElementsAreArray;

namespace Envoy {
namespace Config {
Expand All @@ -28,14 +29,17 @@ const char TypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster";

class DeltaSubscriptionStateTestBase : public testing::Test {
protected:
DeltaSubscriptionStateTestBase(const std::string& type_url)
DeltaSubscriptionStateTestBase(
const std::string& type_url, const bool wildcard,
const absl::flat_hash_set<std::string> initial_resources = {"name1", "name2", "name3"})
: timer_(new Event::MockTimer(&dispatcher_)),
state_(type_url, callbacks_, local_info_, dispatcher_) {
state_.updateSubscriptionInterest({"name1", "name2", "name3"}, {});
state_(type_url, callbacks_, local_info_, dispatcher_, wildcard) {
state_.updateSubscriptionInterest(initial_resources, {});
envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request =
state_.getNextRequestAckless();
EXPECT_THAT(cur_request.resource_names_subscribe(),
UnorderedElementsAre("name1", "name2", "name3"));
// UnorderedElementsAre("name1", "name2", "name3"));
UnorderedElementsAreArray(initial_resources.cbegin(), initial_resources.cend()));
}

UpdateAck deliverDiscoveryResponse(
Expand Down Expand Up @@ -94,7 +98,13 @@ populateRepeatedResource(std::vector<std::pair<std::string, std::string>> items)

class DeltaSubscriptionStateTest : public DeltaSubscriptionStateTestBase {
public:
DeltaSubscriptionStateTest() : DeltaSubscriptionStateTestBase(TypeUrl) {}
DeltaSubscriptionStateTest() : DeltaSubscriptionStateTestBase(TypeUrl, false) {}
};

// Delta subscription state of a wildcard subscription request.
class WildcardDeltaSubscriptionStateTest : public DeltaSubscriptionStateTestBase {
public:
WildcardDeltaSubscriptionStateTest() : DeltaSubscriptionStateTestBase(TypeUrl, true, {}) {}
};

// Basic gaining/losing interest in resources should lead to subscription updates.
Expand Down Expand Up @@ -307,9 +317,10 @@ TEST_F(DeltaSubscriptionStateTest, ResourceGoneLeadsToBlankInitialVersion) {
}
}

// Upon a reconnection, the server is supposed to assume a blank slate for the Envoy's state
// (hence the need for initial_resource_versions). The resource_names_subscribe of the first
// message must therefore be every resource the Envoy is interested in.
// For non-wildcard subscription, upon a reconnection, the server is supposed to assume a
// blank slate for the Envoy's state (hence the need for initial_resource_versions).
// The resource_names_subscribe of the first message must therefore be every resource the
// Envoy is interested in.
//
// resource_names_unsubscribe, on the other hand, is always blank in the first request - even if,
// in between the last request of the last stream and the first request of the new stream, Envoy
Expand All @@ -326,16 +337,40 @@ TEST_F(DeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect) {
envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless();
// Regarding the resource_names_subscribe field:
// name1: do not include: we lost interest.
// name2: yes do include: we're interested and we have a version of it.
// name2: yes do include: we are interested, its non-wildcard, and we have a version of it.
// name3: yes do include: even though we don't have a version of it, we are interested.
// name4: yes do include: we are newly interested. (If this wasn't a stream reconnect, only
// name4
// would belong in this subscribe field).
// name4 would belong in this subscribe field).
EXPECT_THAT(cur_request.resource_names_subscribe(),
UnorderedElementsAre("name2", "name3", "name4"));
EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty());
}

// For wildcard subscription, upon a reconnection, the server is supposed to assume a
// blank slate for the Envoy's state (hence the need for initial_resource_versions), and
// the resource_names_subscribe and resource_names_unsubscribe must be empty (as is expected
// of every wildcard first message). This is true even if in between the last request of the
// last stream and the first request of the new stream, Envoy gained or lost interest in a
// resource. The subscription & unsubscription implicitly takes effect by simply requesting a
// wildcard subscription in the newly reconnected stream.
TEST_F(WildcardDeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect) {
Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> add1_2 =
populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}});
EXPECT_CALL(*timer_, disableTimer());
deliverDiscoveryResponse(add1_2, {}, "debugversion1");

state_.updateSubscriptionInterest({"name3"}, {"name1"});
state_.markStreamFresh(); // simulate a stream reconnection
envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless();
// Regarding the resource_names_subscribe field:
// name1: do not include: we lost interest.
// name2: do not include: we are interested, but for wildcard it shouldn't be provided.
// name4: do not include: although we are newly interested, an initial wildcard request
// must be with no resources.
EXPECT_TRUE(cur_request.resource_names_subscribe().empty());
EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty());
}

// initial_resource_versions should not be present on messages after the first in a stream.
TEST_F(DeltaSubscriptionStateTest, InitialVersionMapFirstMessageOnly) {
// First, verify that the first message of a new stream sends initial versions.
Expand Down Expand Up @@ -484,7 +519,7 @@ TEST_F(DeltaSubscriptionStateTest, ResourceTTL) {
class VhdsDeltaSubscriptionStateTest : public DeltaSubscriptionStateTestBase {
public:
VhdsDeltaSubscriptionStateTest()
: DeltaSubscriptionStateTestBase("envoy.config.route.v3.VirtualHost") {}
: DeltaSubscriptionStateTestBase("envoy.config.route.v3.VirtualHost", false) {}
};

TEST_F(VhdsDeltaSubscriptionStateTest, ResourceTTL) {
Expand Down
86 changes: 85 additions & 1 deletion test/common/config/new_grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ TEST_F(NewGrpcMuxImplTest, ReconnectionResetsNonceAndAcks) {
add_response_resource("x", "2000", *response);
add_response_resource("y", "3000", *response);
// Pause EDS to allow the ACK to be cached.
auto resume_cds = grpc_mux_->pause(type_url);
auto resume_eds = grpc_mux_->pause(type_url);
grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_);
// Now disconnect.
// Grpc stream retry timer will kick in and reconnection will happen.
Expand All @@ -179,6 +179,90 @@ TEST_F(NewGrpcMuxImplTest, ReconnectionResetsNonceAndAcks) {
expectSendMessage(type_url, {}, {"x", "y"});
}

// Validate resources are not sent on wildcard watch reconnection.
// Regression test of https://github.com/envoyproxy/envoy/issues/16063.
TEST_F(NewGrpcMuxImplTest, ReconnectionResetsWildcardSubscription) {
Event::MockTimer* grpc_stream_retry_timer{new Event::MockTimer()};
Event::MockTimer* ttl_mgr_timer{new NiceMock<Event::MockTimer>()};
Event::TimerCb grpc_stream_retry_timer_cb;
EXPECT_CALL(dispatcher_, createTimer_(_))
.WillOnce(
testing::DoAll(SaveArg<0>(&grpc_stream_retry_timer_cb), Return(grpc_stream_retry_timer)))
// Happens when adding a type url watch.
.WillRepeatedly(Return(ttl_mgr_timer));
setup();
InSequence s;
const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
auto foo_sub = grpc_mux_->addWatch(type_url, {}, callbacks_, resource_decoder_, {});
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
// Send a wildcard request on new connection.
expectSendMessage(type_url, {}, {});
grpc_mux_->start();

// An helper function to create a response with a single load_assignment resource
// (load_assignment's cluster_name will be updated).
envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment;
auto create_response = [&load_assignment, &type_url](const std::string& name,
const std::string& version,
const std::string& nonce)
-> std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryResponse> {
auto response = std::make_unique<envoy::service::discovery::v3::DeltaDiscoveryResponse>();
response->set_type_url(type_url);
response->set_system_version_info(version);
response->set_nonce(nonce);
auto res = response->add_resources();
res->set_name(name);
res->set_version(version);
load_assignment.set_cluster_name(name);
res->mutable_resource()->PackFrom(load_assignment);
return response;
};

// Send a response with a single resource that should be received by Envoy,
// followed by an ack with the nonce.
{
auto response = create_response("x", "1000", "111");
EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "1000"))
.WillOnce(Invoke([&load_assignment](const std::vector<DecodedResourceRef>& added_resources,
const Protobuf::RepeatedPtrField<std::string>&,
const std::string&) {
EXPECT_EQ(1, added_resources.size());
EXPECT_TRUE(
TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment));
}));
// Expect an ack with the nonce.
expectSendMessage(type_url, {}, {}, "111");
grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_);
}
// Send another response with a different resource, but where EDS is paused.
auto resume_eds = grpc_mux_->pause(type_url);
{
auto response = create_response("y", "2000", "222");
EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "2000"))
.WillOnce(Invoke([&load_assignment](const std::vector<DecodedResourceRef>& added_resources,
const Protobuf::RepeatedPtrField<std::string>&,
const std::string&) {
EXPECT_EQ(1, added_resources.size());
EXPECT_TRUE(
TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment));
}));
// No ack reply is expected in this case, as EDS is suspended.
grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_);
}

// Now disconnect.
// Grpc stream retry timer will kick in and reconnection will happen.
EXPECT_CALL(*grpc_stream_retry_timer, enableTimer(_, _))
.WillOnce(Invoke(grpc_stream_retry_timer_cb));
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
// initial_resource_versions should contain client side all resource:version info, and no
// added resources because this is a wildcard request.
expectSendMessage(type_url, {}, {}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "",
{{"x", "1000"}, {"y", "2000"}});
grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Canceled, "");
// Destruction of wildcard will not issue unsubscribe requests for the resources.
}

// Test that we simply ignore a message for an unknown type_url, with no ill effects.
TEST_F(NewGrpcMuxImplTest, DiscoveryResponseNonexistentSub) {
setup();
Expand Down
Loading