Skip to content

Commit

Permalink
xds: Make AGGREGATED_DELTA_GRPC set is_aggregated on the Subscription (
Browse files Browse the repository at this point in the history
…#25055)

Currently, there's a bug when the ApiConfigSource is AGGREGATED_DELTA_GRPC and xdstp is used: The
GrpcCollectionSubscriptionImpl that gets created sets is_aggregated to false, instead of setting it to true. This causes the GrpcSubscriptionImpl instance to attempt to start the GrpcMux on start, instead of waiting to share a gRPC stream like ADS is supposed to.

Unfortunately, this behavior was masked by the integration tests, because we use a FakeUpstream in the integration tests that just contains a single xDS stream to which the Envoy instances connect to.

The only way to verify this behavior was to look at the logs when running the XdsTpAdsIntegrationTest: when is_aggregated was set to false (prior to this commit), the logs would contain GrpcStream messages saying the stream already exists. After changing is_aggregated to true, the logs no longer output stream "already exists", because we don't attempt to create multiple streams, which is the correct behavior.

Signed-off-by: Ali Beyad <abeyad@google.com>
  • Loading branch information
abeyad authored Jan 27, 2023
1 parent 80135f2 commit 765072a
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
4 changes: 2 additions & 2 deletions source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
Utility::parseRateLimitSettings(api_config_source), local_info_,
std::move(custom_config_validators), xds_config_tracker_),
callbacks, resource_decoder, stats, dispatcher_,
Utility::configSourceInitialFetchTimeout(config), false, options);
Utility::configSourceInitialFetchTimeout(config), /*is_aggregated=*/false, options);
}
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC: {
return std::make_unique<GrpcCollectionSubscriptionImpl>(
collection_locator, cm_.adsMux(), callbacks, resource_decoder, stats, dispatcher_,
Utility::configSourceInitialFetchTimeout(config), false, options);
Utility::configSourceInitialFetchTimeout(config), /*is_aggregated=*/true, options);
}
default:
throw EnvoyException(fmt::format("Unknown xdstp:// transport API type in {}",
Expand Down
5 changes: 4 additions & 1 deletion test/common/config/subscription_factory_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,14 @@ TEST_P(SubscriptionFactoryTestUnifiedOrLegacyMux, GrpcCollectionAggregatedSubscr
Upstream::ClusterManager::ClusterSet primary_clusters;
primary_clusters.insert("static_cluster");
EXPECT_CALL(cm_, primaryClusters()).WillOnce(ReturnRef(primary_clusters));
GrpcMuxSharedPtr ads_mux = std::make_shared<NiceMock<MockGrpcMux>>();
auto ads_mux = std::make_shared<NiceMock<MockGrpcMux>>();
EXPECT_CALL(cm_, adsMux()).WillOnce(Return(ads_mux));
EXPECT_CALL(dispatcher_, createTimer_(_));
// onConfigUpdateFailed() should not be called for gRPC stream connection failure
EXPECT_CALL(callbacks_, onConfigUpdateFailed(_, _)).Times(0);
// Since this is ADS, the mux's start() should not be called (which attempts to create a gRPC
// stream).
EXPECT_CALL(*ads_mux, start()).Times(0);
collectionSubscriptionFromUrl("xdstp://foo/envoy.config.endpoint.v3.ClusterLoadAssignment/bar",
config)
->start({});
Expand Down

0 comments on commit 765072a

Please sign in to comment.