Skip to content

Commit

Permalink
router: fix VHDS subscription creation bug (#8650)
Browse files Browse the repository at this point in the history
Fixes #7617 by handling creation of VHDS subscription after InitManager has completed initialization.

Risk Level: low, vhds only
Testing: added an integration test
Docs Changes: n/a
Release Notes: n/a
Fixes: #7617

Signed-off-by: Dmitri Dolguikh <ddolguik@redhat.com>
  • Loading branch information
Dmitri Dolguikh authored and zuercher committed Nov 27, 2019
1 parent dd9f2d4 commit fb628ce
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 11 deletions.
3 changes: 3 additions & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,13 @@ envoy_cc_library(
"//include/envoy/thread_local:thread_local_interface",
"//source/common/common:assert_lib",
"//source/common/common:callback_impl_lib",
"//source/common/common:cleanup_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/config:subscription_factory_lib",
"//source/common/config:utility_lib",
"//source/common/init:manager_lib",
"//source/common/init:target_lib",
"//source/common/init:watcher_lib",
"//source/common/protobuf:utility_lib",
"//source/common/router:route_config_update_impl_lib",
"//source/common/router:vhds_lib",
Expand Down
27 changes: 26 additions & 1 deletion source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,21 @@ void RdsRouteConfigSubscription::onConfigUpdate(
provider->validateConfig(route_config);
}

std::unique_ptr<Init::ManagerImpl> noop_init_manager;
std::unique_ptr<Cleanup> resume_rds;
if (config_update_info_->onRdsUpdate(route_config, version_info)) {
stats_.config_reload_.inc();

if (config_update_info_->routeConfiguration().has_vhds()) {
ENVOY_LOG(debug, "rds: vhds configuration present, starting vhds: config_name={} hash={}",
route_config_name_, config_update_info_->configHash());
maybeCreateInitManager(version_info, noop_init_manager, resume_rds);
// TODO(dmitri-d): It's unsafe to depend directly on factory context here,
// the listener might have been torn down, need to remove this.
vhds_subscription_ = std::make_unique<VhdsSubscription>(
config_update_info_, factory_context_, stat_prefix_, route_config_providers_);
vhds_subscription_->registerInitTargetWithInitManager(getRdsConfigInitManager());
vhds_subscription_->registerInitTargetWithInitManager(
noop_init_manager == nullptr ? getRdsConfigInitManager() : *noop_init_manager);
} else {
ENVOY_LOG(debug, "rds: loading new configuration: config_name={} hash={}", route_config_name_,
config_update_info_->configHash());
Expand All @@ -132,6 +136,27 @@ void RdsRouteConfigSubscription::onConfigUpdate(
init_target_.ready();
}

// Initialize a no-op InitManager in case the one in the factory_context has completed
// initialization. This can happen if an RDS config update for an already established RDS
// subscription contains VHDS configuration.
void RdsRouteConfigSubscription::maybeCreateInitManager(
const std::string& version_info, std::unique_ptr<Init::ManagerImpl>& init_manager,
std::unique_ptr<Cleanup>& init_vhds) {
if (getRdsConfigInitManager().state() == Init::Manager::State::Initialized) {
init_manager = std::make_unique<Init::ManagerImpl>(
fmt::format("VHDS {}:{}", route_config_name_, version_info));
init_vhds = std::make_unique<Cleanup>([this, &init_manager, version_info] {
// For new RDS subscriptions created after listener warming up, we don't wait for them to warm
// up.
Init::WatcherImpl noop_watcher(
// Note: we just throw it away.
fmt::format("VHDS ConfigUpdate watcher {}:{}", route_config_name_, version_info),
[]() { /*Do nothing.*/ });
init_manager->initialize(noop_watcher);
});
}
}

void RdsRouteConfigSubscription::onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources, const std::string&) {
Expand Down
7 changes: 7 additions & 0 deletions source/common/router/rds_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
#include "envoy/thread_local/thread_local.h"

#include "common/common/callback_impl.h"
#include "common/common/cleanup.h"
#include "common/common/logger.h"
#include "common/config/resources.h"
#include "common/init/manager_impl.h"
#include "common/init/target_impl.h"
#include "common/init/watcher_impl.h"
#include "common/protobuf/utility.h"
#include "common/router/route_config_update_receiver_impl.h"
#include "common/router/vhds.h"
Expand Down Expand Up @@ -111,6 +115,9 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks,
return route_config_providers_;
}
RouteConfigUpdatePtr& routeConfigUpdate() { return config_update_info_; }
void maybeCreateInitManager(const std::string& version_info,
std::unique_ptr<Init::ManagerImpl>& init_manager,
std::unique_ptr<Cleanup>& resume_rds);

private:
// Config::SubscriptionCallbacks
Expand Down
45 changes: 45 additions & 0 deletions test/common/router/rds_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ using testing::_;
using testing::Eq;
using testing::InSequence;
using testing::Invoke;
using testing::Return;
using testing::ReturnRef;

namespace Envoy {
Expand Down Expand Up @@ -263,6 +264,50 @@ TEST_F(RdsImplTest, FailureSubscription) {
rds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, {});
}

class RdsRouteConfigSubscriptionTest : public RdsTestBase {
public:
RdsRouteConfigSubscriptionTest() {
EXPECT_CALL(server_factory_context_.admin_.config_tracker_, add_("routes", _));
route_config_provider_manager_ =
std::make_unique<RouteConfigProviderManagerImpl>(server_factory_context_.admin_);
}

~RdsRouteConfigSubscriptionTest() override {
server_factory_context_.thread_local_.shutdownThread();
}

std::unique_ptr<RouteConfigProviderManagerImpl> route_config_provider_manager_;
};

// Verifies that maybeCreateInitManager() creates a noop init manager if the main init manager is in
// Initialized state already
TEST_F(RdsRouteConfigSubscriptionTest, CreatesNoopInitManager) {
const std::string rds_config = R"EOF(
route_config_name: my_route
config_source:
api_config_source:
api_type: GRPC
grpc_services:
envoy_grpc:
cluster_name: xds_cluster
)EOF";
EXPECT_CALL(outer_init_manager_, state()).WillOnce(Return(Init::Manager::State::Initialized));
const auto rds =
TestUtility::parseYaml<envoy::config::filter::network::http_connection_manager::v2::Rds>(
rds_config);
const auto route_config_provider = route_config_provider_manager_->createRdsRouteConfigProvider(
rds, mock_factory_context_, "stat_prefix", outer_init_manager_);
RdsRouteConfigSubscription& subscription =
(dynamic_cast<RdsRouteConfigProviderImpl*>(route_config_provider.get()))->subscription();

std::unique_ptr<Init::ManagerImpl> noop_init_manager;
std::unique_ptr<Cleanup> init_vhds;
subscription.maybeCreateInitManager("version_info", noop_init_manager, init_vhds);

EXPECT_TRUE(init_vhds);
EXPECT_TRUE(noop_init_manager);
}

class RouteConfigProviderManagerImplTest : public RdsTestBase {
public:
void setup() {
Expand Down
129 changes: 119 additions & 10 deletions test/integration/vhds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ const char Config[] = R"EOF(
cluster_name: xds_cluster
)EOF";

// TODO (dmitri-d) move config yaml into ConfigHelper
const char RdsWithoutVhdsConfig[] = R"EOF(
name: my_route
virtual_hosts:
- name: vhost_rds1
domains: ["vhost.rds.first"]
routes:
- match: { prefix: "/rdsone" }
route: { cluster: my_service }
)EOF";

const char RdsConfig[] = R"EOF(
name: my_route
vhds:
Expand Down Expand Up @@ -100,6 +111,113 @@ name: my_route
cluster_name: xds_cluster
)EOF";

const char VhostTemplate[] = R"EOF(
name: {}
domains: [{}]
routes:
- match: {{ prefix: "/" }}
route: {{ cluster: "my_service" }}
)EOF";

class VhdsInitializationTest : public HttpIntegrationTest,
public Grpc::GrpcClientIntegrationParamTest {
public:
VhdsInitializationTest()
: HttpIntegrationTest(Http::CodecClient::Type::HTTP2, ipVersion(), realTime(), Config) {
use_lds_ = false;
}

void TearDown() override {
cleanUpXdsConnection();
test_server_.reset();
fake_upstreams_.clear();
}

// Overridden to insert this stuff into the initialize() at the very beginning of
// HttpIntegrationTest::testRouterRequestAndResponseWithBody().
void initialize() override {
// Controls how many fake_upstreams_.emplace_back(new FakeUpstream) will happen in
// BaseIntegrationTest::createUpstreams() (which is part of initialize()).
// Make sure this number matches the size of the 'clusters' repeated field in the bootstrap
// config that you use!
setUpstreamCount(2); // the CDS cluster
setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); // CDS uses gRPC uses HTTP2.

// BaseIntegrationTest::initialize() does many things:
// 1) It appends to fake_upstreams_ as many as you asked for via setUpstreamCount().
// 2) It updates your bootstrap config with the ports your fake upstreams are actually listening
// on (since you're supposed to leave them as 0).
// 3) It creates and starts an IntegrationTestServer - the thing that wraps the almost-actual
// Envoy used in the tests.
// 4) Bringing up the server usually entails waiting to ensure that any listeners specified in
// the bootstrap config have come up, and registering them in a port map (see lookupPort()).
// However, this test needs to defer all of that to later.
defer_listener_finalization_ = true;
HttpIntegrationTest::initialize();

// Now that the upstream has been created, process Envoy's request to discover it.
// (First, we have to let Envoy establish its connection to the RDS server.)
AssertionResult result = // xds_connection_ is filled with the new FakeHttpConnection.
fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, xds_connection_);
RELEASE_ASSERT(result, result.message());
result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_);
RELEASE_ASSERT(result, result.message());
xds_stream_->startGrpcStream();
fake_upstreams_[0]->set_allow_unexpected_disconnects(true);

EXPECT_TRUE(compareSotwDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "",
{"my_route"}, true));
sendSotwDiscoveryResponse<envoy::api::v2::RouteConfiguration>(
Config::TypeUrl::get().RouteConfiguration,
{TestUtility::parseYaml<envoy::api::v2::RouteConfiguration>(RdsWithoutVhdsConfig)}, "1");

// Wait for our statically specified listener to become ready, and register its port in the
// test framework's downstream listener port map.
test_server_->waitUntilListenersReady();
registerTestServerPorts({"http"});
}

FakeStreamPtr vhds_stream_;
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, VhdsInitializationTest,
GRPC_CLIENT_INTEGRATION_PARAMS);

// tests a scenario when:
// - RouteConfiguration without VHDS is received
// - RouteConfiguration update with VHDS configuration in it is received
// - Upstream makes a request to a VirtualHost in the VHDS update
TEST_P(VhdsInitializationTest, InitializeVhdsAfterRdsHasBeenInitialized) {
// Calls our initialize(), which includes establishing a listener, route, and cluster.
testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/rdsone", "vhost.rds.first");
cleanupUpstreamAndDownstream();
codec_client_->waitForDisconnect();

// Update RouteConfig, this time include VHDS config
sendSotwDiscoveryResponse<envoy::api::v2::RouteConfiguration>(
Config::TypeUrl::get().RouteConfiguration,
{TestUtility::parseYaml<envoy::api::v2::RouteConfiguration>(RdsConfigWithVhosts)}, "2");

auto result = xds_connection_->waitForNewStream(*dispatcher_, vhds_stream_, true);
RELEASE_ASSERT(result, result.message());
vhds_stream_->startGrpcStream();

EXPECT_TRUE(
compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_));
sendDeltaDiscoveryResponse<envoy::api::v2::route::VirtualHost>(
Config::TypeUrl::get().VirtualHost,
{TestUtility::parseYaml<envoy::api::v2::route::VirtualHost>(
fmt::format(VhostTemplate, "vhost_0", "vhost.first"))},
{}, "1", vhds_stream_);
EXPECT_TRUE(
compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_));

// Confirm vhost.first that was configured via VHDS is reachable
testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/", "vhost.first");
cleanupUpstreamAndDownstream();
codec_client_->waitForDisconnect();
}

class VhdsIntegrationTest : public HttpIntegrationTest,
public Grpc::GrpcClientIntegrationParamTest {
public:
Expand All @@ -115,14 +233,7 @@ class VhdsIntegrationTest : public HttpIntegrationTest,
}

std::string virtualHostYaml(const std::string& name, const std::string& domain) {
return fmt::format(R"EOF(
name: {}
domains: [{}]
routes:
- match: {{ prefix: "/" }}
route: {{ cluster: "my_service" }}
)EOF",
name, domain);
return fmt::format(VhostTemplate, name, domain);
}

envoy::api::v2::route::VirtualHost buildVirtualHost() {
Expand Down Expand Up @@ -206,8 +317,6 @@ class VhdsIntegrationTest : public HttpIntegrationTest,
bool use_rds_with_vhosts{false};
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, VhdsIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS);

// tests a scenario when:
// - a spontaneous VHDS DiscoveryResponse adds two virtual hosts
// - the next spontaneous VHDS DiscoveryResponse removes newly added virtual hosts
Expand Down

0 comments on commit fb628ce

Please sign in to comment.