Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TcpAsyncClient: enhance reconnect robustness #32578

Merged
merged 4 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions envoy/tcp/async_tcp_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ class AsyncTcpClient {

/**
* Connect to a remote host. Errors or connection events are reported via the
* event callback registered via setAsyncTcpClientCallbacks(). We need to set the
* callbacks again to call connect() after the connection is disconnected.
* event callback registered via setAsyncTcpClientCallbacks(). If the callbacks
* needs to be changed before reconnecting, it is required to set the callbacks
* again, before calling to connect() to attempting to reconnect.
* @returns true if a new client has created and the connection is in progress.
* @returns false if an underlying client exists and is connected or connecting.
*/
virtual bool connect() PURE;

Expand Down
29 changes: 16 additions & 13 deletions source/common/tcp/async_tcp_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ AsyncTcpClientImpl::AsyncTcpClientImpl(Event::Dispatcher& dispatcher,
: dispatcher_(dispatcher), thread_local_cluster_(thread_local_cluster),
cluster_info_(thread_local_cluster_.info()), context_(context),
connect_timer_(dispatcher.createTimer([this]() { onConnectTimeout(); })),
enable_half_close_(enable_half_close) {
cluster_info_->trafficStats()->upstream_cx_active_.inc();
cluster_info_->trafficStats()->upstream_cx_total_.inc();
}

AsyncTcpClientImpl::~AsyncTcpClientImpl() {
cluster_info_->trafficStats()->upstream_cx_active_.dec();
}
enable_half_close_(enable_half_close) {}

bool AsyncTcpClientImpl::connect() {
if (connection_) {
return false;
}

connection_ = std::move(thread_local_cluster_.tcpConn(context_).connection_);
if (!connection_) {
return false;
}

cluster_info_->trafficStats()->upstream_cx_total_.inc();
cluster_info_->trafficStats()->upstream_cx_active_.inc();
connection_->enableHalfClose(enable_half_close_);
connection_->addConnectionCallbacks(*this);
connection_->addReadFilter(std::make_shared<NetworkReadFilter>(*this));
Expand Down Expand Up @@ -109,28 +109,31 @@ void AsyncTcpClientImpl::reportConnectionDestroy(Network::ConnectionEvent event)
void AsyncTcpClientImpl::onEvent(Network::ConnectionEvent event) {
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
if (disconnected_) {
if (connected_) {
cluster_info_->trafficStats()->upstream_cx_active_.dec();
ohadvano marked this conversation as resolved.
Show resolved Hide resolved
} else {
cluster_info_->trafficStats()->upstream_cx_connect_fail_.inc();
}

if (!disconnected_ && conn_length_ms_ != nullptr) {
if (connected_ && conn_length_ms_ != nullptr) {
conn_length_ms_->complete();
conn_length_ms_.reset();
}

disableConnectTimeout();
reportConnectionDestroy(event);
disconnected_ = true;

connected_ = false;
if (connection_) {
detected_close_ = connection_->detectedCloseType();
}

dispatcher_.deferredDelete(std::move(connection_));
if (callbacks_) {
callbacks_->onEvent(event);
callbacks_ = nullptr;
}
} else {
disconnected_ = false;
connected_ = true;
conn_connect_ms_->complete();
conn_connect_ms_.reset();
disableConnectTimeout();
Expand Down
6 changes: 2 additions & 4 deletions source/common/tcp/async_tcp_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ class AsyncTcpClientImpl : public AsyncTcpClient,
Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context, bool enable_half_close);

~AsyncTcpClientImpl() override;

void close(Network::ConnectionCloseType type) override;

Network::DetectedCloseType detectedCloseType() const override { return detected_close_; }
Expand All @@ -56,7 +54,7 @@ class AsyncTcpClientImpl : public AsyncTcpClient,
/**
* @return if the client connects to a peer host.
*/
bool connected() override { return !disconnected_; }
bool connected() override { return connected_; }

Event::Dispatcher& dispatcher() override { return dispatcher_; }

Expand Down Expand Up @@ -108,7 +106,7 @@ class AsyncTcpClientImpl : public AsyncTcpClient,
Event::TimerPtr connect_timer_;
AsyncTcpClientCallbacks* callbacks_{};
Network::DetectedCloseType detected_close_{Network::DetectedCloseType::Normal};
bool disconnected_{true};
bool connected_{false};
bool enable_half_close_{false};
};

Expand Down
28 changes: 27 additions & 1 deletion test/common/tcp/async_tcp_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,36 @@ TEST_F(AsyncTcpClientImplTest, TestActiveCx) {
expectCreateConnection();
EXPECT_EQ(1UL, cluster_manager_.thread_local_cluster_.cluster_.info_->traffic_stats_
->upstream_cx_active_.value());
client_.reset();
EXPECT_CALL(callbacks_, onEvent(Network::ConnectionEvent::LocalClose));
connection_->raiseEvent(Network::ConnectionEvent::LocalClose);
EXPECT_EQ(0UL, cluster_manager_.thread_local_cluster_.cluster_.info_->traffic_stats_
->upstream_cx_active_.value());
}

TEST_F(AsyncTcpClientImplTest, ReconnectWhileClientConnected) {
setUpClient();
expectCreateConnection();
EXPECT_FALSE(client_->connect());
}

TEST_F(AsyncTcpClientImplTest, ReconnectWhileClientConnecting) {
setUpClient();
expectCreateConnection(false);
EXPECT_FALSE(client_->connect());
}

TEST_F(AsyncTcpClientImplTest, ReconnectAfterClientDisconnected) {
setUpClient();
expectCreateConnection();

EXPECT_CALL(callbacks_, onEvent(Network::ConnectionEvent::LocalClose));
connection_->raiseEvent(Network::ConnectionEvent::LocalClose);
connect_timer_ = new NiceMock<Event::MockTimer>(&dispatcher_);
expectCreateConnection();

EXPECT_EQ(2UL, cluster_manager_.thread_local_cluster_.cluster_.info_->traffic_stats_
->upstream_cx_total_.value());
}

} // namespace Tcp
} // namespace Envoy
13 changes: 13 additions & 0 deletions test/integration/filters/test_network_async_tcp_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter {
}

Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override {
if (require_reconnect_ && !client_->connect()) {
ENVOY_LOG_MISC(debug, "Unable to reconnect to cluster");
return Network::FilterStatus::StopIteration;
}

stats_.on_data_.inc();
ENVOY_LOG_MISC(debug, "Downstream onData: {}, length: {} sending to upstream", data.toString(),
data.length());
Expand All @@ -76,6 +81,8 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter {
read_callbacks_->connection().addConnectionCallbacks(*downstream_callbacks_);
}

bool require_reconnect_{false};

private:
struct DownstreamCallbacks : public Envoy::Network::ConnectionCallbacks {
explicit DownstreamCallbacks(TestNetworkAsyncTcpFilter& parent) : parent_(parent) {}
Expand Down Expand Up @@ -121,6 +128,12 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter {
void onEvent(Network::ConnectionEvent event) override {
ENVOY_LOG_MISC(debug, "tcp client test filter upstream callback onEvent: {}",
static_cast<int>(event));

if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
parent_.require_reconnect_ = true;
}

if (event != Network::ConnectionEvent::RemoteClose) {
return;
}
Expand Down
32 changes: 32 additions & 0 deletions test/integration/tcp_async_client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,38 @@ TEST_P(TcpAsyncClientIntegrationTest, MultipleResponseFrames) {
tcp_client->close();
}

TEST_P(TcpAsyncClientIntegrationTest, Reconnect) {
if (GetParam() == Network::Address::IpVersion::v6) {
return;
}

enableHalfClose(true);
initialize();

IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0"));
ASSERT_TRUE(tcp_client->write("hello1", false));
test_server_->waitForCounterEq("cluster.cluster_0.upstream_cx_total", 1);
test_server_->waitForGaugeEq("cluster.cluster_0.upstream_cx_active", 1);
FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
ASSERT_TRUE(fake_upstream_connection->waitForData(
[&](const std::string& data) -> bool { return data == "hello1"; }));
ASSERT_TRUE(fake_upstream_connection->close());
test_server_->waitForGaugeEq("cluster.cluster_0.upstream_cx_active", 0);

// We use the same tcp_client to ensure that a new upstream connection is created.
ASSERT_TRUE(tcp_client->write("hello2", false));
test_server_->waitForCounterEq("cluster.cluster_0.upstream_cx_total", 2);
test_server_->waitForGaugeEq("cluster.cluster_0.upstream_cx_active", 1);
FakeRawConnectionPtr fake_upstream_connection2;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection2));
ASSERT_TRUE(fake_upstream_connection2->waitForData(
[&](const std::string& data) -> bool { return data == "hello2"; }));

tcp_client->close();
test_server_->waitForGaugeEq("cluster.cluster_0.upstream_cx_active", 0);
}

#if ENVOY_PLATFORM_ENABLE_SEND_RST
// Test if RST close can be detected from downstream and upstream is closed by RST.
TEST_P(TcpAsyncClientIntegrationTest, TestClientCloseRST) {
Expand Down
Loading