From e030196b013e56d464b25d70a10f3b6828d20671 Mon Sep 17 00:00:00 2001 From: chenguangming Date: Thu, 21 Jul 2022 12:14:45 +0800 Subject: [PATCH] rename _is_in_socket_map to _is_hc_related_ref_held --- src/brpc/details/health_check.cpp | 5 +++-- src/brpc/selective_channel.cpp | 4 ++-- src/brpc/socket.cpp | 13 +++++-------- src/brpc/socket.h | 22 +++++++++++----------- src/brpc/socket_map.cpp | 8 +++++--- test/brpc_socket_unittest.cpp | 4 ++-- 6 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/brpc/details/health_check.cpp b/src/brpc/details/health_check.cpp index 8e17c214a3..4c88604eb3 100644 --- a/src/brpc/details/health_check.cpp +++ b/src/brpc/details/health_check.cpp @@ -179,8 +179,9 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { // one is addressing the Socket(except here). Because the Socket // is not addressable, the reference count will not increase // again. This solution is not perfect because the `expected_nref' - // is implementation specific. In our case, one reference comes - // from SocketMapInsert(socket_map.cpp) or ChannelBalancer::AddChannel + // is implementation specific. In our case, one reference comes + // from someone who holds a reference related to health checking, + // e.g. SocketMapInsert(socket_map.cpp) or ChannelBalancer::AddChannel // (selective_channel.cpp), one reference is here. Although WaitAndReset() // could hang when someone is addressing the failed Socket forever // (also indicating bug), this is not an issue in current code. diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index 658f433a49..e58274ce03 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -203,7 +203,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, ptr->SetFailed(); return -1; } - ptr->SetInsertedIntoSocketMap(); // set inserted status + ptr->SetHCRelatedRefHeld(); // set held status _chan_socket_map[sub_channel]= ptr.release(); // Add reference. if (handle) { *handle = sock_id; @@ -222,9 +222,9 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha { BAIDU_SCOPED_LOCK(_mutex); CHECK_EQ(1UL, _chan_socket_map.erase(sub->chan)); - ptr->SetRemovedFromSocketMap(); // set removed status } { + ptr->SetHCRelatedRefReleased(); // set released status to cancel health checking SocketUniquePtr ptr2(ptr.get()); // Dereference. } if (rc == 0) { diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 99c0798848..25ed0976d1 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -434,7 +434,7 @@ Socket::Socket(Forbidden) , _parsing_context(NULL) , _correlation_id(0) , _health_check_interval_s(-1) - , _is_in_socket_map(false) + , _is_hc_related_ref_held(false) , _ninprocess(1) , _auth_flag_error(0) , _auth_id(INVALID_BTHREAD_ID) @@ -615,7 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { m->reset_parsing_context(options.initial_parsing_context); m->_correlation_id = 0; m->_health_check_interval_s = options.health_check_interval_s; - m->_is_in_socket_map = false; + m->_is_hc_related_ref_held = false; m->_ninprocess.store(1, butil::memory_order_relaxed); m->_auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); @@ -684,12 +684,9 @@ int Socket::WaitAndReset(int32_t expected_nref) { << " was abandoned during health checking"; return -1; } else { - // The health checking expects two references, one reference is here - // and another reference comes from SocketMapInsert(socket_map.cpp) - // or ChannelBalancer::AddChannel(selective_channel.cpp). However, - // when socket has been remove from SocketMap, another reference is - // not from SocketMap or ChannelBalancer, so no need to do health checking. - if (!_is_in_socket_map) { + // nobody holds a health-checking-related reference, + // so no need to do health checking. + if (!_is_hc_related_ref_held) { LOG(WARNING) << "socket has been removed from SocketMap before health check task"; return -1; } diff --git a/src/brpc/socket.h b/src/brpc/socket.h index a2d247fefd..867b6cafb7 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -286,13 +286,13 @@ friend class policy::H2GlobalStreamCreator; // Initialized by SocketOptions.health_check_interval_s. int health_check_interval() const { return _health_check_interval_s; } - // Only for SocketMap and ChannelBalancer. - // When socket is removed from socket map, set _is_in_socket_map to false. - void SetRemovedFromSocketMap() { _is_in_socket_map = false; } - // Only for SocketMap and ChannelBalancer. - // When socket is inserted into socket map, set _is_in_socket_map to true. - void SetInsertedIntoSocketMap() { _is_in_socket_map = true; } - bool IsInSocketMap() const { return _is_in_socket_map; } + // When someone holds a health-checking-related reference, + // this function need to be called to make health checking run normally. + void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; } + // When someone releases the health-checking-related reference, + // this function need to be called to cancel health checking. + void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; } + bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; } // The unique identifier. SocketId id() const { return _this_id; } @@ -755,10 +755,10 @@ friend void DereferenceSocket(Socket*); // Non-zero when health-checking is on. int _health_check_interval_s; - // When socket is inserted into socket map, it needs to be set to true. - // When socket is removed from socket map, it needs to be set to false. - // It can be synchronized via _versioned_ref atomic variable. - bool _is_in_socket_map; + // The variable indicates whether the reference related to the + // health check is held by someone else. It can be synchronized + // via _versioned_ref atomic variable. + bool _is_hc_related_ref_held; // +-1 bit-+---31 bit---+ // | flag | counter | diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index abbf4ca757..a08f50b299 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -215,12 +215,14 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, SingleConnection* sc = _map.seek(key); if (sc) { if (!sc->socket->Failed() || - sc->socket->health_check_interval() > 0/*HC enabled*/) { + (sc->socket->health_check_interval() > 0 && + sc->socket->IsHCRelatedRefHeld())/*HC enabled*/) { ++sc->ref_count; *id = sc->socket->id(); return 0; } // A socket w/o HC is failed (permanently), replace it. + sc->socket->SetHCRelatedRefReleased(); // set released status to cancel health checking SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion. _map.erase(key); // in principle, we can override the entry in map w/o // removing and inserting it again. But this would make error branches @@ -244,7 +246,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, LOG(FATAL) << "Fail to address SocketId=" << tmp_id; return -1; } - ptr->SetInsertedIntoSocketMap(); // set inserted status + ptr->SetHCRelatedRefHeld(); // set held status SingleConnection new_sc = { 1, ptr.release(), 0 }; _map[key] = new_sc; *id = tmp_id; @@ -302,8 +304,8 @@ void SocketMap::RemoveInternal(const SocketMapKey& key, _this_map_bvar = new bvar::PassiveStatus( butil::StringPiece(namebuf, len), PrintSocketMap, this); } - s->SetRemovedFromSocketMap(); // set removed status s->ReleaseAdditionalReference(); // release extra ref + s->SetHCRelatedRefReleased(); // set released status to cancel health checking SocketUniquePtr ptr(s); // Dereference } } diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index 9db69112a5..c607dc3e72 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -498,7 +498,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) { { brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); - s->SetInsertedIntoSocketMap(); + s->SetHCRelatedRefHeld(); // set held status global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd()); @@ -647,7 +647,7 @@ TEST_F(SocketTest, health_check) { brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); - s->SetInsertedIntoSocketMap(); + s->SetHCRelatedRefHeld(); // set held status global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd());