Skip to content

Commit

Permalink
BugFix: fix the issue of the client-side socket fd cannot be recycled…
Browse files Browse the repository at this point in the history
… in a timely manner when using fiber connection pool, causing the number of close-wait to increase. (#108)
  • Loading branch information
liucf3995 authored Jan 30, 2024
1 parent b1a67e0 commit 2fdb559
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
6 changes: 6 additions & 0 deletions trpc/runtime/iomodel/reactor/fiber/fiber_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ void FiberConnection::QueueCleanupCallbackCheck() {
read_mostly_.seldomly_used->error_events.load(std::memory_order_relaxed) == 0) {
// Consider queue a call to `OnCleanup()` then.
if (!read_mostly_.seldomly_used->cleanup_queued.exchange(true, std::memory_order_release)) {
{
std::scoped_lock<std::mutex> _(mutex_);
// Set the connection unavailability status flag in advance to make read/write tasks retreat in advance,
// reducing the conflict of Cleanup callback locks.
conn_unavailable_ = true;
}
// No need to take a reference to us, `OnCleanup()` has not been called.
GetReactor()->SubmitTask([this] {
// The load below acts as a fence (paired with `exchange` above). (But
Expand Down
6 changes: 6 additions & 0 deletions trpc/runtime/iomodel/reactor/fiber/fiber_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ class alignas(hardware_destructive_interference_size) FiberConnection : public C

std::atomic<std::size_t> restart_read_count_{0}, restart_write_count_{0};

// Connection closing cleanup and connection sending/receiving mutex lock, to protect connection cleanup and network
// data transmission/reception thread safety.
std::mutex mutex_;
// Connection unavailability status flag.
bool conn_unavailable_{false};

private:
void SuppressReadAndClearReadEvent();
void SuppressAndClearWriteEvent();
Expand Down
20 changes: 15 additions & 5 deletions trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ FiberTcpConnection::FiberTcpConnection(Reactor* reactor, const Socket& socket)
}

FiberTcpConnection::~FiberTcpConnection() {
// Requirements: destroy IO-handler before close socket.
GetIoHandler()->Destroy();
socket_.Close();

TRPC_LOG_DEBUG("~FiberTcpConnection fd:" << socket_.GetFd() << ", conn_id:" << this->GetConnId());
TRPC_ASSERT(!socket_.IsValid());
}
Expand Down Expand Up @@ -116,7 +112,15 @@ int FiberTcpConnection::Send(IoMessage&& msg) {
}

constexpr auto kMaximumBytesPerCall = 1048576;
// External calls to the Send method may conflict with Socket closing concurrently, so a lock is added here for
// protection.
std::unique_lock<std::mutex> lock(mutex_);
// If the connection is unavailable, return an error directly.
if (conn_unavailable_) {
return -1;
}
auto flush_status = FlushWritingBuffer(kMaximumBytesPerCall);
lock.unlock();
if (TRPC_LIKELY(flush_status == FlushStatus::kFlushed)) {
return 0;
} else if (flush_status == FlushStatus::kSystemBufferSaturated || flush_status == FlushStatus::kQuotaExceeded) {
Expand Down Expand Up @@ -370,7 +374,13 @@ void FiberTcpConnection::OnCleanup(CleanupReason reason) {

writing_buffers_.Stop();

// For multi-threads-safety, move "socket_.Close()" to ~FiberTcpConnection();
{
std::scoped_lock<std::mutex> _(mutex_);
conn_unavailable_ = true;
// Requirements: destroy IO-handler before close socket.
GetIoHandler()->Destroy();
socket_.Close();
}
}

IoHandler::HandshakeStatus FiberTcpConnection::DoHandshake(bool from_on_readable) {
Expand Down

0 comments on commit 2fdb559

Please sign in to comment.