diff --git a/source/common/router/BUILD b/source/common/router/BUILD index 693814d17982..ca15e148246e 100644 --- a/source/common/router/BUILD +++ b/source/common/router/BUILD @@ -152,10 +152,13 @@ envoy_cc_library( "//include/envoy/thread_local:thread_local_interface", "//source/common/common:assert_lib", "//source/common/common:callback_impl_lib", + "//source/common/common:cleanup_lib", "//source/common/common:minimal_logger_lib", "//source/common/config:subscription_factory_lib", "//source/common/config:utility_lib", + "//source/common/init:manager_lib", "//source/common/init:target_lib", + "//source/common/init:watcher_lib", "//source/common/protobuf:utility_lib", "//source/common/router:route_config_update_impl_lib", "//source/common/router:vhds_lib", diff --git a/source/common/router/rds_impl.cc b/source/common/router/rds_impl.cc index db474d5f617f..6164b39a7d17 100644 --- a/source/common/router/rds_impl.cc +++ b/source/common/router/rds_impl.cc @@ -106,17 +106,21 @@ void RdsRouteConfigSubscription::onConfigUpdate( provider->validateConfig(route_config); } + std::unique_ptr noop_init_manager; + std::unique_ptr resume_rds; if (config_update_info_->onRdsUpdate(route_config, version_info)) { stats_.config_reload_.inc(); if (config_update_info_->routeConfiguration().has_vhds()) { ENVOY_LOG(debug, "rds: vhds configuration present, starting vhds: config_name={} hash={}", route_config_name_, config_update_info_->configHash()); + maybeCreateInitManager(version_info, noop_init_manager, resume_rds); // TODO(dmitri-d): It's unsafe to depend directly on factory context here, // the listener might have been torn down, need to remove this. vhds_subscription_ = std::make_unique( config_update_info_, factory_context_, stat_prefix_, route_config_providers_); - vhds_subscription_->registerInitTargetWithInitManager(getRdsConfigInitManager()); + vhds_subscription_->registerInitTargetWithInitManager( + noop_init_manager == nullptr ? getRdsConfigInitManager() : *noop_init_manager); } else { ENVOY_LOG(debug, "rds: loading new configuration: config_name={} hash={}", route_config_name_, config_update_info_->configHash()); @@ -132,6 +136,27 @@ void RdsRouteConfigSubscription::onConfigUpdate( init_target_.ready(); } +// Initialize a no-op InitManager in case the one in the factory_context has completed +// initialization. This can happen if an RDS config update for an already established RDS +// subscription contains VHDS configuration. +void RdsRouteConfigSubscription::maybeCreateInitManager( + const std::string& version_info, std::unique_ptr& init_manager, + std::unique_ptr& init_vhds) { + if (getRdsConfigInitManager().state() == Init::Manager::State::Initialized) { + init_manager = std::make_unique( + fmt::format("VHDS {}:{}", route_config_name_, version_info)); + init_vhds = std::make_unique([this, &init_manager, version_info] { + // For new RDS subscriptions created after listener warming up, we don't wait for them to warm + // up. + Init::WatcherImpl noop_watcher( + // Note: we just throw it away. + fmt::format("VHDS ConfigUpdate watcher {}:{}", route_config_name_, version_info), + []() { /*Do nothing.*/ }); + init_manager->initialize(noop_watcher); + }); + } +} + void RdsRouteConfigSubscription::onConfigUpdate( const Protobuf::RepeatedPtrField& added_resources, const Protobuf::RepeatedPtrField& removed_resources, const std::string&) { diff --git a/source/common/router/rds_impl.h b/source/common/router/rds_impl.h index 44e1cb208656..f9d0164eb61a 100644 --- a/source/common/router/rds_impl.h +++ b/source/common/router/rds_impl.h @@ -23,8 +23,12 @@ #include "envoy/thread_local/thread_local.h" #include "common/common/callback_impl.h" +#include "common/common/cleanup.h" #include "common/common/logger.h" +#include "common/config/resources.h" +#include "common/init/manager_impl.h" #include "common/init/target_impl.h" +#include "common/init/watcher_impl.h" #include "common/protobuf/utility.h" #include "common/router/route_config_update_receiver_impl.h" #include "common/router/vhds.h" @@ -111,6 +115,9 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks, return route_config_providers_; } RouteConfigUpdatePtr& routeConfigUpdate() { return config_update_info_; } + void maybeCreateInitManager(const std::string& version_info, + std::unique_ptr& init_manager, + std::unique_ptr& resume_rds); private: // Config::SubscriptionCallbacks diff --git a/test/common/router/rds_impl_test.cc b/test/common/router/rds_impl_test.cc index 0dc7df00f3da..4e7be26a9a8e 100644 --- a/test/common/router/rds_impl_test.cc +++ b/test/common/router/rds_impl_test.cc @@ -28,6 +28,7 @@ using testing::_; using testing::Eq; using testing::InSequence; using testing::Invoke; +using testing::Return; using testing::ReturnRef; namespace Envoy { @@ -263,6 +264,50 @@ TEST_F(RdsImplTest, FailureSubscription) { rds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, {}); } +class RdsRouteConfigSubscriptionTest : public RdsTestBase { +public: + RdsRouteConfigSubscriptionTest() { + EXPECT_CALL(server_factory_context_.admin_.config_tracker_, add_("routes", _)); + route_config_provider_manager_ = + std::make_unique(server_factory_context_.admin_); + } + + ~RdsRouteConfigSubscriptionTest() override { + server_factory_context_.thread_local_.shutdownThread(); + } + + std::unique_ptr route_config_provider_manager_; +}; + +// Verifies that maybeCreateInitManager() creates a noop init manager if the main init manager is in +// Initialized state already +TEST_F(RdsRouteConfigSubscriptionTest, CreatesNoopInitManager) { + const std::string rds_config = R"EOF( + route_config_name: my_route + config_source: + api_config_source: + api_type: GRPC + grpc_services: + envoy_grpc: + cluster_name: xds_cluster +)EOF"; + EXPECT_CALL(outer_init_manager_, state()).WillOnce(Return(Init::Manager::State::Initialized)); + const auto rds = + TestUtility::parseYaml( + rds_config); + const auto route_config_provider = route_config_provider_manager_->createRdsRouteConfigProvider( + rds, mock_factory_context_, "stat_prefix", outer_init_manager_); + RdsRouteConfigSubscription& subscription = + (dynamic_cast(route_config_provider.get()))->subscription(); + + std::unique_ptr noop_init_manager; + std::unique_ptr init_vhds; + subscription.maybeCreateInitManager("version_info", noop_init_manager, init_vhds); + + EXPECT_TRUE(init_vhds); + EXPECT_TRUE(noop_init_manager); +} + class RouteConfigProviderManagerImplTest : public RdsTestBase { public: void setup() { diff --git a/test/integration/vhds_integration_test.cc b/test/integration/vhds_integration_test.cc index 561cd99917d3..1e54a38c20a4 100644 --- a/test/integration/vhds_integration_test.cc +++ b/test/integration/vhds_integration_test.cc @@ -72,6 +72,17 @@ const char Config[] = R"EOF( cluster_name: xds_cluster )EOF"; +// TODO (dmitri-d) move config yaml into ConfigHelper +const char RdsWithoutVhdsConfig[] = R"EOF( +name: my_route +virtual_hosts: +- name: vhost_rds1 + domains: ["vhost.rds.first"] + routes: + - match: { prefix: "/rdsone" } + route: { cluster: my_service } +)EOF"; + const char RdsConfig[] = R"EOF( name: my_route vhds: @@ -100,6 +111,113 @@ name: my_route cluster_name: xds_cluster )EOF"; +const char VhostTemplate[] = R"EOF( +name: {} +domains: [{}] +routes: +- match: {{ prefix: "/" }} + route: {{ cluster: "my_service" }} +)EOF"; + +class VhdsInitializationTest : public HttpIntegrationTest, + public Grpc::GrpcClientIntegrationParamTest { +public: + VhdsInitializationTest() + : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, ipVersion(), realTime(), Config) { + use_lds_ = false; + } + + void TearDown() override { + cleanUpXdsConnection(); + test_server_.reset(); + fake_upstreams_.clear(); + } + + // Overridden to insert this stuff into the initialize() at the very beginning of + // HttpIntegrationTest::testRouterRequestAndResponseWithBody(). + void initialize() override { + // Controls how many fake_upstreams_.emplace_back(new FakeUpstream) will happen in + // BaseIntegrationTest::createUpstreams() (which is part of initialize()). + // Make sure this number matches the size of the 'clusters' repeated field in the bootstrap + // config that you use! + setUpstreamCount(2); // the CDS cluster + setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); // CDS uses gRPC uses HTTP2. + + // BaseIntegrationTest::initialize() does many things: + // 1) It appends to fake_upstreams_ as many as you asked for via setUpstreamCount(). + // 2) It updates your bootstrap config with the ports your fake upstreams are actually listening + // on (since you're supposed to leave them as 0). + // 3) It creates and starts an IntegrationTestServer - the thing that wraps the almost-actual + // Envoy used in the tests. + // 4) Bringing up the server usually entails waiting to ensure that any listeners specified in + // the bootstrap config have come up, and registering them in a port map (see lookupPort()). + // However, this test needs to defer all of that to later. + defer_listener_finalization_ = true; + HttpIntegrationTest::initialize(); + + // Now that the upstream has been created, process Envoy's request to discover it. + // (First, we have to let Envoy establish its connection to the RDS server.) + AssertionResult result = // xds_connection_ is filled with the new FakeHttpConnection. + fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + fake_upstreams_[0]->set_allow_unexpected_disconnects(true); + + EXPECT_TRUE(compareSotwDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "", + {"my_route"}, true)); + sendSotwDiscoveryResponse( + Config::TypeUrl::get().RouteConfiguration, + {TestUtility::parseYaml(RdsWithoutVhdsConfig)}, "1"); + + // Wait for our statically specified listener to become ready, and register its port in the + // test framework's downstream listener port map. + test_server_->waitUntilListenersReady(); + registerTestServerPorts({"http"}); + } + + FakeStreamPtr vhds_stream_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, VhdsInitializationTest, + GRPC_CLIENT_INTEGRATION_PARAMS); + +// tests a scenario when: +// - RouteConfiguration without VHDS is received +// - RouteConfiguration update with VHDS configuration in it is received +// - Upstream makes a request to a VirtualHost in the VHDS update +TEST_P(VhdsInitializationTest, InitializeVhdsAfterRdsHasBeenInitialized) { + // Calls our initialize(), which includes establishing a listener, route, and cluster. + testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/rdsone", "vhost.rds.first"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // Update RouteConfig, this time include VHDS config + sendSotwDiscoveryResponse( + Config::TypeUrl::get().RouteConfiguration, + {TestUtility::parseYaml(RdsConfigWithVhosts)}, "2"); + + auto result = xds_connection_->waitForNewStream(*dispatcher_, vhds_stream_, true); + RELEASE_ASSERT(result, result.message()); + vhds_stream_->startGrpcStream(); + + EXPECT_TRUE( + compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); + sendDeltaDiscoveryResponse( + Config::TypeUrl::get().VirtualHost, + {TestUtility::parseYaml( + fmt::format(VhostTemplate, "vhost_0", "vhost.first"))}, + {}, "1", vhds_stream_); + EXPECT_TRUE( + compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); + + // Confirm vhost.first that was configured via VHDS is reachable + testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/", "vhost.first"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); +} + class VhdsIntegrationTest : public HttpIntegrationTest, public Grpc::GrpcClientIntegrationParamTest { public: @@ -115,14 +233,7 @@ class VhdsIntegrationTest : public HttpIntegrationTest, } std::string virtualHostYaml(const std::string& name, const std::string& domain) { - return fmt::format(R"EOF( - name: {} - domains: [{}] - routes: - - match: {{ prefix: "/" }} - route: {{ cluster: "my_service" }} - )EOF", - name, domain); + return fmt::format(VhostTemplate, name, domain); } envoy::api::v2::route::VirtualHost buildVirtualHost() { @@ -206,8 +317,6 @@ class VhdsIntegrationTest : public HttpIntegrationTest, bool use_rds_with_vhosts{false}; }; -INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, VhdsIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); - // tests a scenario when: // - a spontaneous VHDS DiscoveryResponse adds two virtual hosts // - the next spontaneous VHDS DiscoveryResponse removes newly added virtual hosts