Skip to content

Commit

Permalink
grpc mux: fix sending node again after stream is reset (#14080)
Browse files Browse the repository at this point in the history
Previously, first_stream_request_ wasn't working as intended because api_state_ is still cached from the previous stream

Risk Level: Low
Testing: Modified test was failing before the accompanying change
Docs Changes: N/A

Fixes #9682

Signed-off-by: Taylor Barrella <tabarr@google.com>
  • Loading branch information
tbarrella authored Dec 17, 2020
1 parent 5e6c4b9 commit 9340c2c
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Bug Fixes
* config: validate that upgrade configs have a non-empty :ref:`upgrade_type <envoy_v3_api_field_extensions.filters.network.http_connection_manager.v3.HttpConnectionManager.UpgradeConfig.upgrade_type>`, fixing a bug where an errant "-" could result in unexpected behavior.
* dns: fix a bug where custom resolvers provided in configuration were not preserved after network issues.
* dns_filter: correctly associate DNS response IDs when multiple queries are received.
* grpc mux: fix sending node again after stream is reset when ::ref:`set_node_on_first_message_only <envoy_api_field_core.ApiConfigSource.set_node_on_first_message_only>` is set.
* http: fixed URL parsing for HTTP/1.1 fully qualified URLs and connect requests containing IPv6 addresses.
* http: reject requests with missing required headers after filter chain processing.
* http: sending CONNECT_ERROR for HTTP/2 where appropriate during CONNECT requests.
Expand Down
9 changes: 7 additions & 2 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
}
}

if (skip_subsequent_node_ && !first_stream_request_) {
request.clear_node();
if (skip_subsequent_node_) {
if (first_stream_request_) {
// Node may have been cleared during a previous request.
request.mutable_node()->MergeFrom(local_info_.node());
} else {
request.clear_node();
}
}
VersionConverter::prepareMessageForGrpcWire(request, transport_api_version_);
ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url, request.DebugString());
Expand Down
10 changes: 8 additions & 2 deletions test/common/config/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,24 @@ TEST_F(GrpcMuxImplTest, ResetStream) {
expectSendMessage("baz", {"z"}, "");
grpc_mux_->start();

// Send another message for foo so that the node is cleared in the cached request.
// This is to test that the the node is set again in the first message below.
expectSendMessage("foo", {"z", "x", "y"}, "");
auto foo_z_sub = grpc_mux_->addWatch("foo", {"z"}, callbacks_, resource_decoder_);

EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _))
.Times(3);
.Times(4);
EXPECT_CALL(random_, random());
EXPECT_CALL(*timer, enableTimer(_, _));
grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Canceled, "");
EXPECT_EQ(0, control_plane_connected_state_.value());
EXPECT_EQ(0, control_plane_pending_requests_.value());
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage("foo", {"x", "y"}, "", true);
expectSendMessage("foo", {"z", "x", "y"}, "", true);
expectSendMessage("bar", {}, "");
expectSendMessage("baz", {"z"}, "");
expectSendMessage("foo", {"x", "y"}, "");
timer->invokeCallback();

expectSendMessage("baz", {}, "");
Expand Down
2 changes: 1 addition & 1 deletion test/common/config/grpc_subscription_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness {
"envoy.api.v2.EndpointDiscoveryService.StreamEndpoints")),
async_client_(new NiceMock<Grpc::MockAsyncClient>()) {
node_.set_id("fo0");
EXPECT_CALL(local_info_, node()).WillOnce(testing::ReturnRef(node_));
EXPECT_CALL(local_info_, node()).WillRepeatedly(testing::ReturnRef(node_));
ttl_timer_ = new NiceMock<Event::MockTimer>(&dispatcher_);

timer_ = new Event::MockTimer(&dispatcher_);
Expand Down
26 changes: 26 additions & 0 deletions test/integration/ads_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,32 @@ TEST_P(AdsIntegrationTest, Failure) {
makeSingleRequest();
}

// Regression test for https://github.com/envoyproxy/envoy/issues/9682.
TEST_P(AdsIntegrationTest, ResendNodeOnStreamReset) {
initialize();
EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true));
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(Config::TypeUrl::get().Cluster,
{buildCluster("cluster_0")},
{buildCluster("cluster_0")}, {}, "1");

EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "",
{"cluster_0"}, {"cluster_0"}, {}));
sendDiscoveryResponse<envoy::config::endpoint::v3::ClusterLoadAssignment>(
Config::TypeUrl::get().ClusterLoadAssignment, {buildClusterLoadAssignment("cluster_0")},
{buildClusterLoadAssignment("cluster_0")}, {}, "1");

// A second CDS request should be sent so that the node is cleared in the cached request.
EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "1", {}, {}, {}));

xds_stream_->finishGrpcStream(Grpc::Status::Internal);
AssertionResult result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_);
RELEASE_ASSERT(result, result.message());
xds_stream_->startGrpcStream();

EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "1", {"cluster_0"},
{"cluster_0"}, {}, true));
}

// Validate that xds can support a mix of v2 and v3 type url.
TEST_P(AdsIntegrationTest, MixV2V3TypeUrlInDiscoveryResponse) {
config_helper_.addRuntimeOverride(
Expand Down

0 comments on commit 9340c2c

Please sign in to comment.