Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tcp_proxy: API cleanup #13035

Merged
merged 2 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions source/common/tcp/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ void ActiveTcpClient::clearCallbacks() {

void ActiveTcpClient::onEvent(Network::ConnectionEvent event) {
Envoy::ConnectionPool::ActiveClient::onEvent(event);
// Do not pass the Connected event to TCP proxy sessions.
// The tcp proxy filter synthesizes its own Connected event in onPoolReadyBase
// and receiving it twice causes problems.
// TODO(alyssawilk) clean this up in a follow-up. It's confusing.
// Do not pass the Connected event to any session which registered during onEvent above.
// Consumers of connection pool connections assume they are receiving already connected
// connections.
if (callbacks_ && event != Network::ConnectionEvent::Connected) {
callbacks_->onEvent(event);
// After receiving a disconnect event, the owner of callbacks_ will likely self-destruct.
Expand Down
59 changes: 32 additions & 27 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ void Filter::DownstreamCallbacks::onBelowWriteBufferLowWatermark() {
}

void Filter::UpstreamCallbacks::onEvent(Network::ConnectionEvent event) {
if (event == Network::ConnectionEvent::Connected) {
return;
}
if (drainer_ == nullptr) {
parent_->onUpstreamEvent(event);
} else {
Expand Down Expand Up @@ -505,8 +508,7 @@ void Filter::onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host,
getStreamInfo().onUpstreamHostSelected(host);
getStreamInfo().setUpstreamLocalAddress(local_address);
getStreamInfo().setUpstreamSslConnection(ssl_info);
// Simulate the event that onPoolReady represents.
upstream_callbacks_->onEvent(Network::ConnectionEvent::Connected);
onUpstreamConnection();
read_callbacks_->continueReading();
}

Expand Down Expand Up @@ -637,31 +639,34 @@ void Filter::onUpstreamEvent(Network::ConnectionEvent event) {
read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
}
}
} else if (event == Network::ConnectionEvent::Connected) {
// Re-enable downstream reads now that the upstream connection is established
// so we have a place to send downstream data to.
read_callbacks_->connection().readDisable(false);

read_callbacks_->upstreamHost()->outlierDetector().putResult(
Upstream::Outlier::Result::LocalOriginConnectSuccessFinal);

getStreamInfo().setRequestedServerName(read_callbacks_->connection().requestedServerName());
ENVOY_LOG(debug, "TCP:onUpstreamEvent(), requestedServerName: {}",
getStreamInfo().requestedServerName());

if (config_->idleTimeout()) {
// The idle_timer_ can be moved to a Drainer, so related callbacks call into
// the UpstreamCallbacks, which has the same lifetime as the timer, and can dispatch
// the call to either TcpProxy or to Drainer, depending on the current state.
idle_timer_ = read_callbacks_->connection().dispatcher().createTimer(
[upstream_callbacks = upstream_callbacks_]() { upstream_callbacks->onIdleTimeout(); });
resetIdleTimer();
read_callbacks_->connection().addBytesSentCallback([this](uint64_t) { resetIdleTimer(); });
if (upstream_) {
upstream_->addBytesSentCallback([upstream_callbacks = upstream_callbacks_](uint64_t) {
upstream_callbacks->onBytesSent();
});
}
}
}

void Filter::onUpstreamConnection() {
connecting_ = false;
// Re-enable downstream reads now that the upstream connection is established
// so we have a place to send downstream data to.
read_callbacks_->connection().readDisable(false);

read_callbacks_->upstreamHost()->outlierDetector().putResult(
Upstream::Outlier::Result::LocalOriginConnectSuccessFinal);

getStreamInfo().setRequestedServerName(read_callbacks_->connection().requestedServerName());
ENVOY_LOG(debug, "TCP:onUpstreamEvent(), requestedServerName: {}",
getStreamInfo().requestedServerName());

if (config_->idleTimeout()) {
// The idle_timer_ can be moved to a Drainer, so related callbacks call into
// the UpstreamCallbacks, which has the same lifetime as the timer, and can dispatch
// the call to either TcpProxy or to Drainer, depending on the current state.
idle_timer_ = read_callbacks_->connection().dispatcher().createTimer(
[upstream_callbacks = upstream_callbacks_]() { upstream_callbacks->onIdleTimeout(); });
resetIdleTimer();
read_callbacks_->connection().addBytesSentCallback([this](uint64_t) { resetIdleTimer(); });
if (upstream_) {
upstream_->addBytesSentCallback([upstream_callbacks = upstream_callbacks_](uint64_t) {
upstream_callbacks->onBytesSent();
});
}
}
}
Expand Down
1 change: 1 addition & 0 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ class Filter : public Network::ReadFilter,
void onDownstreamEvent(Network::ConnectionEvent event);
void onUpstreamData(Buffer::Instance& data, bool end_stream);
void onUpstreamEvent(Network::ConnectionEvent event);
void onUpstreamConnection();
void onIdleTimeout();
void resetIdleTimer();
void disableIdleTimer();
Expand Down
9 changes: 9 additions & 0 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,15 @@ TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(ConnectAttemptsLimit)) {
EXPECT_EQ(access_log_data_, "UF,URX");
}

TEST_F(TcpProxyTest, ConnectedNoOp) {
setup(1);
raiseEventUpstreamConnected(0);

upstream_callbacks_->onEvent(Network::ConnectionEvent::Connected);

filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
}

// Test that the tcp proxy sends the correct notifications to the outlier detector
TEST_F(TcpProxyTest, OutlierDetection) {
envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy config = defaultConfig();
Expand Down