diff --git a/source/common/tcp/conn_pool.cc b/source/common/tcp/conn_pool.cc index ed3332d8afef..ee18b337a59e 100644 --- a/source/common/tcp/conn_pool.cc +++ b/source/common/tcp/conn_pool.cc @@ -53,10 +53,9 @@ void ActiveTcpClient::clearCallbacks() { void ActiveTcpClient::onEvent(Network::ConnectionEvent event) { Envoy::ConnectionPool::ActiveClient::onEvent(event); - // Do not pass the Connected event to TCP proxy sessions. - // The tcp proxy filter synthesizes its own Connected event in onPoolReadyBase - // and receiving it twice causes problems. - // TODO(alyssawilk) clean this up in a follow-up. It's confusing. + // Do not pass the Connected event to any session which registered during onEvent above. + // Consumers of connection pool connections assume they are receiving already connected + // connections. if (callbacks_ && event != Network::ConnectionEvent::Connected) { callbacks_->onEvent(event); // After receiving a disconnect event, the owner of callbacks_ will likely self-destruct. diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index 5aa4b9d64cc4..91abf2f82e5a 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -309,6 +309,9 @@ void Filter::DownstreamCallbacks::onBelowWriteBufferLowWatermark() { } void Filter::UpstreamCallbacks::onEvent(Network::ConnectionEvent event) { + if (event == Network::ConnectionEvent::Connected) { + return; + } if (drainer_ == nullptr) { parent_->onUpstreamEvent(event); } else { @@ -505,8 +508,7 @@ void Filter::onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host, getStreamInfo().onUpstreamHostSelected(host); getStreamInfo().setUpstreamLocalAddress(local_address); getStreamInfo().setUpstreamSslConnection(ssl_info); - // Simulate the event that onPoolReady represents. - upstream_callbacks_->onEvent(Network::ConnectionEvent::Connected); + onUpstreamConnection(); read_callbacks_->continueReading(); } @@ -637,31 +639,34 @@ void Filter::onUpstreamEvent(Network::ConnectionEvent event) { read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); } } - } else if (event == Network::ConnectionEvent::Connected) { - // Re-enable downstream reads now that the upstream connection is established - // so we have a place to send downstream data to. - read_callbacks_->connection().readDisable(false); - - read_callbacks_->upstreamHost()->outlierDetector().putResult( - Upstream::Outlier::Result::LocalOriginConnectSuccessFinal); - - getStreamInfo().setRequestedServerName(read_callbacks_->connection().requestedServerName()); - ENVOY_LOG(debug, "TCP:onUpstreamEvent(), requestedServerName: {}", - getStreamInfo().requestedServerName()); - - if (config_->idleTimeout()) { - // The idle_timer_ can be moved to a Drainer, so related callbacks call into - // the UpstreamCallbacks, which has the same lifetime as the timer, and can dispatch - // the call to either TcpProxy or to Drainer, depending on the current state. - idle_timer_ = read_callbacks_->connection().dispatcher().createTimer( - [upstream_callbacks = upstream_callbacks_]() { upstream_callbacks->onIdleTimeout(); }); - resetIdleTimer(); - read_callbacks_->connection().addBytesSentCallback([this](uint64_t) { resetIdleTimer(); }); - if (upstream_) { - upstream_->addBytesSentCallback([upstream_callbacks = upstream_callbacks_](uint64_t) { - upstream_callbacks->onBytesSent(); - }); - } + } +} + +void Filter::onUpstreamConnection() { + connecting_ = false; + // Re-enable downstream reads now that the upstream connection is established + // so we have a place to send downstream data to. + read_callbacks_->connection().readDisable(false); + + read_callbacks_->upstreamHost()->outlierDetector().putResult( + Upstream::Outlier::Result::LocalOriginConnectSuccessFinal); + + getStreamInfo().setRequestedServerName(read_callbacks_->connection().requestedServerName()); + ENVOY_LOG(debug, "TCP:onUpstreamEvent(), requestedServerName: {}", + getStreamInfo().requestedServerName()); + + if (config_->idleTimeout()) { + // The idle_timer_ can be moved to a Drainer, so related callbacks call into + // the UpstreamCallbacks, which has the same lifetime as the timer, and can dispatch + // the call to either TcpProxy or to Drainer, depending on the current state. + idle_timer_ = read_callbacks_->connection().dispatcher().createTimer( + [upstream_callbacks = upstream_callbacks_]() { upstream_callbacks->onIdleTimeout(); }); + resetIdleTimer(); + read_callbacks_->connection().addBytesSentCallback([this](uint64_t) { resetIdleTimer(); }); + if (upstream_) { + upstream_->addBytesSentCallback([upstream_callbacks = upstream_callbacks_](uint64_t) { + upstream_callbacks->onBytesSent(); + }); } } } diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 1bb73ddd377f..b1abf95cc247 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -353,6 +353,7 @@ class Filter : public Network::ReadFilter, void onDownstreamEvent(Network::ConnectionEvent event); void onUpstreamData(Buffer::Instance& data, bool end_stream); void onUpstreamEvent(Network::ConnectionEvent event); + void onUpstreamConnection(); void onIdleTimeout(); void resetIdleTimer(); void disableIdleTimer(); diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index 817b87a6c01a..20961c086b3d 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -1146,6 +1146,15 @@ TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(ConnectAttemptsLimit)) { EXPECT_EQ(access_log_data_, "UF,URX"); } +TEST_F(TcpProxyTest, ConnectedNoOp) { + setup(1); + raiseEventUpstreamConnected(0); + + upstream_callbacks_->onEvent(Network::ConnectionEvent::Connected); + + filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); +} + // Test that the tcp proxy sends the correct notifications to the outlier detector TEST_F(TcpProxyTest, OutlierDetection) { envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy config = defaultConfig();