Skip to content

Commit

Permalink
xds resolver: fix edge cases in interactions between LDS and RDS (#31668
Browse files Browse the repository at this point in the history
) (#31672)

* xds resolver: fix edge cases in interactions between LDS and RDS

* improve SwitchFromInlineRouteConfigToRds test

* clang-tidy

* Automated change: Fix sanity tests

Co-authored-by: markdroth <markdroth@users.noreply.github.com>

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
  • Loading branch information
markdroth and markdroth authored Nov 16, 2022
1 parent 393e48c commit 86fc3f0
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 13 deletions.
8 changes: 8 additions & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 30 additions & 12 deletions src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -895,26 +895,44 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) {
&current_listener_.route_config,
// RDS resource name
[&](std::string* rds_name) {
if (route_config_watcher_ != nullptr) {
XdsRouteConfigResourceType::CancelWatch(
xds_client_.get(), route_config_name_, route_config_watcher_,
/*delay_unsubscription=*/!rds_name->empty());
route_config_watcher_ = nullptr;
}
route_config_name_ = std::move(*rds_name);
if (!route_config_name_.empty()) {
current_virtual_host_.routes.clear();
// If the RDS name changed, update the RDS watcher.
// Note that this will be true on the initial update, because
// route_config_name_ will be empty.
if (route_config_name_ != *rds_name) {
// If we already had a watch (i.e., if the previous config had
// a different RDS name), stop the previous watch.
// There will be no previous watch if either (a) this is the
// initial resource update or (b) the previous Listener had an
// inlined RouteConfig.
if (route_config_watcher_ != nullptr) {
XdsRouteConfigResourceType::CancelWatch(
xds_client_.get(), route_config_name_, route_config_watcher_,
/*delay_unsubscription=*/true);
route_config_watcher_ = nullptr;
}
// Start watch for the new RDS resource name.
route_config_name_ = std::move(*rds_name);
auto watcher = MakeRefCounted<RouteConfigWatcher>(Ref());
route_config_watcher_ = watcher.get();
XdsRouteConfigResourceType::StartWatch(
xds_client_.get(), route_config_name_, std::move(watcher));
} else {
// RDS resource name has not changed, so no watch needs to be
// updated, but we still need to propagate any changes in the
// HCM config (e.g., the list of HTTP filters).
GenerateResult();
}
// HCM may contain newer filter config. We need to propagate the
// update as config selector to the channel.
GenerateResult();
},
// inlined RouteConfig
[&](XdsRouteConfigResource* route_config) {
// If the previous update specified an RDS resource instead of
// having an inlined RouteConfig, we need to cancel the RDS watch.
if (route_config_watcher_ != nullptr) {
XdsRouteConfigResourceType::CancelWatch(
xds_client_.get(), route_config_name_, route_config_watcher_);
route_config_watcher_ = nullptr;
route_config_name_.clear();
}
OnRouteConfigUpdate(std::move(*route_config));
});
}
Expand Down
1 change: 1 addition & 0 deletions test/cpp/end2end/xds/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ grpc_cc_test(
"//:gpr",
"//:grpc",
"//:grpc++",
"//src/proto/grpc/testing/xds/v3:fault_proto",
"//src/proto/grpc/testing/xds/v3:router_proto",
"//test/core/util:grpc_test_util",
],
Expand Down
239 changes: 238 additions & 1 deletion test/cpp/end2end/xds/xds_routing_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <gtest/gtest.h>

#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/proto/grpc/testing/xds/v3/fault.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/router.grpc.pb.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"

Expand Down Expand Up @@ -96,11 +97,13 @@ TEST_P(LdsDeletionTest, ListenerDeletionIgnored) {
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Unset LDS resource and wait for client to ACK the update.
balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
const auto deadline = absl::Now() + absl::Seconds(30);
const auto deadline =
absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
while (true) {
ASSERT_LT(absl::Now(), deadline) << "timed out waiting for LDS ACK";
response_state = balancer_->ads_service()->lds_response_state();
if (response_state.has_value()) break;
absl::SleepFor(absl::Seconds(1) * grpc_test_slowdown_factor());
}
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Make sure we can still send RPCs.
Expand All @@ -127,6 +130,240 @@ TEST_P(LdsDeletionTest, ListenerDeletionIgnored) {
WaitForAllBackends(DEBUG_LOCATION, 1, 2);
}

using LdsRdsInteractionTest = XdsEnd2endTest;

INSTANTIATE_TEST_SUITE_P(
XdsTest, LdsRdsInteractionTest,
::testing::Values(XdsTestType().set_enable_rds_testing()),
&XdsTestType::Name);

TEST_P(LdsRdsInteractionTest, SwitchFromRdsToInlineRouteConfig) {
CreateAndStartBackends(2);
// Bring up client pointing to backend 0 and wait for it to connect.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0);
// RDS should have been ACKed.
auto response_state = balancer_->ads_service()->rds_response_state();
ASSERT_TRUE(response_state.has_value());
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Now recreate the LDS resource with an inline route config pointing to a
// different CDS and EDS resource, pointing to backend 1, and make sure
// the client uses it.
const char* kNewClusterName = "new_cluster_name";
const char* kNewEdsResourceName = "new_eds_resource_name";
auto cluster = default_cluster_;
cluster.set_name(kNewClusterName);
cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
balancer_->ads_service()->SetCdsResource(cluster);
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kNewEdsResourceName));
RouteConfiguration new_route_config = default_route_config_;
new_route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
Listener listener = default_listener_;
HttpConnectionManager http_connection_manager =
ClientHcmAccessor().Unpack(listener);
*http_connection_manager.mutable_route_config() = new_route_config;
ClientHcmAccessor().Pack(http_connection_manager, &listener);
balancer_->ads_service()->SetLdsResource(listener);
// Wait for client to start using backend 1.
WaitForBackend(DEBUG_LOCATION, 1);
// Send an update to the original RDS resource, which the client
// should no longer be subscribed to. We need this RouteConfig to be
// different than the original one so that the update does not get
// squelched by XdsClient, so we add a second domain to the vhost that
// will not actually be used.
new_route_config = default_route_config_;
new_route_config.mutable_virtual_hosts(0)->add_domains("foo.example.com");
balancer_->ads_service()->SetRdsResource(new_route_config);
// Wait for RDS ACK to know that the client saw the change.
// TODO(roth): The client does not actually ACK here, it just sends an
// unsubscription request, but our fake xDS server is incorrectly treating
// that as an ACK. When we have time, fix the behavior of the fake
// xDS server, and then change this test to ensure that there is no RDS
// ACK within the 30-second timeout period.
const auto deadline =
absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
while (true) {
ASSERT_LT(absl::Now(), deadline) << "timed out waiting for RDS ACK";
response_state = balancer_->ads_service()->rds_response_state();
if (response_state.has_value()) break;
absl::SleepFor(absl::Seconds(1) * grpc_test_slowdown_factor());
}
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Make sure RPCs are still going to backend 1. This shows that the
// client did not replace its route config with the one from the RDS
// resource that it should no longer be using.
ResetBackendCounters();
CheckRpcSendOk(DEBUG_LOCATION);
EXPECT_EQ(0, backends_[0]->backend_service()->request_count());
EXPECT_EQ(1, backends_[1]->backend_service()->request_count());
}

TEST_P(LdsRdsInteractionTest, SwitchFromInlineRouteConfigToRds) {
CreateAndStartBackends(2);
// Create an LDS resource with an inline RouteConfig pointing to a
// different CDS and EDS resource, sending traffic to backend 0.
const char* kNewClusterName = "new_cluster_name";
const char* kNewEdsResourceName = "new_eds_resource_name";
auto cluster = default_cluster_;
cluster.set_name(kNewClusterName);
cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kNewEdsResourceName));
RouteConfiguration route_config = default_route_config_;
route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
Listener listener = default_listener_;
HttpConnectionManager http_connection_manager =
ClientHcmAccessor().Unpack(listener);
*http_connection_manager.mutable_route_config() = route_config;
ClientHcmAccessor().Pack(http_connection_manager, &listener);
balancer_->ads_service()->SetLdsResource(listener);
// Start the client and make sure traffic goes to backend 0.
WaitForBackend(DEBUG_LOCATION, 0);
// RDS should not have been ACKed, because the RouteConfig was inlined.
ASSERT_FALSE(balancer_->ads_service()->rds_response_state().has_value());
// Change the LDS resource to point to an RDS resource. The LDS resource
// configures the fault injection filter with a config that fails all RPCs.
// However, the RDS resource has a typed_per_filter_config override that
// disables the fault injection filter. The RDS resource points to a
// new cluster that sends traffic to backend 1.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
route_config = default_route_config_;
auto* config_map = route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_typed_per_filter_config();
(*config_map)["envoy.fault"].PackFrom(
envoy::extensions::filters::http::fault::v3::HTTPFault());
envoy::extensions::filters::http::fault::v3::HTTPFault http_fault;
auto* abort_percentage = http_fault.mutable_abort()->mutable_percentage();
abort_percentage->set_numerator(100);
abort_percentage->set_denominator(abort_percentage->HUNDRED);
http_fault.mutable_abort()->set_grpc_status(
static_cast<uint32_t>(StatusCode::ABORTED));
listener = default_listener_;
http_connection_manager = ClientHcmAccessor().Unpack(listener);
*http_connection_manager.add_http_filters() =
http_connection_manager.http_filters(0);
auto* filter = http_connection_manager.mutable_http_filters(0);
filter->set_name("envoy.fault");
filter->mutable_typed_config()->PackFrom(http_fault);
ClientHcmAccessor().Pack(http_connection_manager, &listener);
SetListenerAndRouteConfiguration(balancer_.get(), std::move(listener),
route_config);
// Wait for traffic to switch to backend 1. There should be no RPC
// failures here; if there are, that indicates that the client started
// using the new LDS resource before it saw the new RDS resource.
WaitForBackend(DEBUG_LOCATION, 1);
}

TEST_P(LdsRdsInteractionTest, HcmConfigUpdatedWithoutRdsChange) {
CreateAndStartBackends(1);
// Bring up client pointing to backend 0 and wait for it to connect.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0);
// LDS should have been ACKed.
auto response_state = balancer_->ads_service()->lds_response_state();
ASSERT_TRUE(response_state.has_value());
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Now update the LDS resource to add the fault injection filter with
// a config that fails all RPCs.
envoy::extensions::filters::http::fault::v3::HTTPFault http_fault;
auto* abort_percentage = http_fault.mutable_abort()->mutable_percentage();
abort_percentage->set_numerator(100);
abort_percentage->set_denominator(abort_percentage->HUNDRED);
http_fault.mutable_abort()->set_grpc_status(
static_cast<uint32_t>(StatusCode::ABORTED));
Listener listener = default_listener_;
HttpConnectionManager http_connection_manager =
ClientHcmAccessor().Unpack(listener);
*http_connection_manager.add_http_filters() =
http_connection_manager.http_filters(0);
auto* filter = http_connection_manager.mutable_http_filters(0);
filter->set_name("envoy.fault");
filter->mutable_typed_config()->PackFrom(http_fault);
ClientHcmAccessor().Pack(http_connection_manager, &listener);
SetListenerAndRouteConfiguration(balancer_.get(), std::move(listener),
default_route_config_);
// Wait for the LDS update to be ACKed.
const auto deadline =
absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
while (true) {
ASSERT_LT(absl::Now(), deadline) << "timed out waiting for LDS ACK";
response_state = balancer_->ads_service()->lds_response_state();
if (response_state.has_value()) break;
absl::SleepFor(absl::Seconds(1) * grpc_test_slowdown_factor());
}
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Now RPCs should fail with ABORTED status.
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::ABORTED, "Fault injected");
}

TEST_P(LdsRdsInteractionTest, LdsUpdateChangesHcmConfigAndRdsResourceName) {
CreateAndStartBackends(2);
// Bring up client pointing to backend 0 and wait for it to connect.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0);
// Change the LDS resource to point to an RDS resource. The LDS resource
// configures the fault injection filter with a config that fails all RPCs.
// However, the RDS resource has a typed_per_filter_config override that
// disables the fault injection filter. The RDS resource points to a
// new cluster that sends traffic to backend 1.
const char* kNewClusterName = "new_cluster_name";
const char* kNewEdsResourceName = "new_eds_resource_name";
auto cluster = default_cluster_;
cluster.set_name(kNewClusterName);
cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
balancer_->ads_service()->SetCdsResource(cluster);
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kNewEdsResourceName));
RouteConfiguration route_config = default_route_config_;
route_config.set_name("new_route_config");
route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
auto* config_map = route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_typed_per_filter_config();
(*config_map)["envoy.fault"].PackFrom(
envoy::extensions::filters::http::fault::v3::HTTPFault());
envoy::extensions::filters::http::fault::v3::HTTPFault http_fault;
auto* abort_percentage = http_fault.mutable_abort()->mutable_percentage();
abort_percentage->set_numerator(100);
abort_percentage->set_denominator(abort_percentage->HUNDRED);
http_fault.mutable_abort()->set_grpc_status(
static_cast<uint32_t>(StatusCode::ABORTED));
Listener listener = default_listener_;
HttpConnectionManager http_connection_manager =
ClientHcmAccessor().Unpack(listener);
*http_connection_manager.add_http_filters() =
http_connection_manager.http_filters(0);
auto* filter = http_connection_manager.mutable_http_filters(0);
filter->set_name("envoy.fault");
filter->mutable_typed_config()->PackFrom(http_fault);
ClientHcmAccessor().Pack(http_connection_manager, &listener);
SetListenerAndRouteConfiguration(balancer_.get(), std::move(listener),
route_config);
// Wait for traffic to switch to backend 1. There should be no RPC
// failures here; if there are, that indicates that the client started
// using the new LDS resource before it saw the new RDS resource.
WaitForBackend(DEBUG_LOCATION, 1);
}

using LdsRdsTest = XdsEnd2endTest;

// Test with and without RDS.
Expand Down

0 comments on commit 86fc3f0

Please sign in to comment.