diff --git a/src/brpc/details/health_check.cpp b/src/brpc/details/health_check.cpp index 6cb5f47da2..4c88604eb3 100644 --- a/src/brpc/details/health_check.cpp +++ b/src/brpc/details/health_check.cpp @@ -179,11 +179,12 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { // one is addressing the Socket(except here). Because the Socket // is not addressable, the reference count will not increase // again. This solution is not perfect because the `expected_nref' - // is implementation specific. In our case, one reference comes - // from SocketMapInsert(socket_map.cpp), one reference is here. - // Although WaitAndReset() could hang when someone is addressing - // the failed Socket forever (also indicating bug), this is not an - // issue in current code. + // is implementation specific. In our case, one reference comes + // from someone who holds a reference related to health checking, + // e.g. SocketMapInsert(socket_map.cpp) or ChannelBalancer::AddChannel + // (selective_channel.cpp), one reference is here. Although WaitAndReset() + // could hang when someone is addressing the failed Socket forever + // (also indicating bug), this is not an issue in current code. if (_first_time) { // Only check at first time. _first_time = false; if (ptr->WaitAndReset(2/*note*/) != 0) { diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index 89f52415c8..e3410409f9 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -189,7 +189,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, SocketOptions options; options.user = sub_chan; options.health_check_interval_s = FLAGS_channel_check_interval; - + if (Socket::Create(options, &sock_id) != 0) { delete sub_chan; LOG(ERROR) << "Fail to create fake socket for sub channel"; @@ -203,6 +203,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, ptr->SetFailed(); return -1; } + ptr->SetHCRelatedRefHeld(); // set held status _chan_map[sub_channel]= ptr.release(); // Add reference. if (handle) { *handle = sock_id; @@ -223,6 +224,7 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha CHECK_EQ(1UL, _chan_map.erase(sub->chan)); } { + ptr->SetHCRelatedRefReleased(); // set released status to cancel health checking SocketUniquePtr ptr2(ptr.get()); // Dereference. } if (rc == 0) { diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index de61f4ad38..6f941c9a43 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -434,6 +434,7 @@ Socket::Socket(Forbidden) , _parsing_context(NULL) , _correlation_id(0) , _health_check_interval_s(-1) + , _is_hc_related_ref_held(false) , _ninprocess(1) , _auth_flag_error(0) , _auth_id(INVALID_BTHREAD_ID) @@ -614,6 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { m->reset_parsing_context(options.initial_parsing_context); m->_correlation_id = 0; m->_health_check_interval_s = options.health_check_interval_s; + m->_is_hc_related_ref_held = false; m->_ninprocess.store(1, butil::memory_order_relaxed); m->_auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); @@ -682,6 +684,13 @@ int Socket::WaitAndReset(int32_t expected_nref) { << " was abandoned during health checking"; return -1; } else { + // nobody holds a health-checking-related reference, + // so no need to do health checking. + if (!_is_hc_related_ref_held) { + LOG(WARNING) << "nobody holds a health-checking-related reference"; + return -1; + } + break; } } diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 5ca0970039..e1b3f9ef18 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -286,6 +286,14 @@ friend class policy::H2GlobalStreamCreator; // Initialized by SocketOptions.health_check_interval_s. int health_check_interval() const { return _health_check_interval_s; } + // When someone holds a health-checking-related reference, + // this function need to be called to make health checking run normally. + void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; } + // When someone releases the health-checking-related reference, + // this function need to be called to cancel health checking. + void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; } + bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; } + // The unique identifier. SocketId id() const { return _this_id; } @@ -747,6 +755,11 @@ friend void DereferenceSocket(Socket*); // Non-zero when health-checking is on. int _health_check_interval_s; + // The variable indicates whether the reference related + // to the health checking is held by someone. It can be + // synchronized via _versioned_ref atomic variable. + bool _is_hc_related_ref_held; + // +-1 bit-+---31 bit---+ // | flag | counter | // +-------+------------+ diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 613f333a98..a08f50b299 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -215,12 +215,14 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, SingleConnection* sc = _map.seek(key); if (sc) { if (!sc->socket->Failed() || - sc->socket->health_check_interval() > 0/*HC enabled*/) { + (sc->socket->health_check_interval() > 0 && + sc->socket->IsHCRelatedRefHeld())/*HC enabled*/) { ++sc->ref_count; *id = sc->socket->id(); return 0; } // A socket w/o HC is failed (permanently), replace it. + sc->socket->SetHCRelatedRefReleased(); // set released status to cancel health checking SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion. _map.erase(key); // in principle, we can override the entry in map w/o // removing and inserting it again. But this would make error branches @@ -244,6 +246,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, LOG(FATAL) << "Fail to address SocketId=" << tmp_id; return -1; } + ptr->SetHCRelatedRefHeld(); // set held status SingleConnection new_sc = { 1, ptr.release(), 0 }; _map[key] = new_sc; *id = tmp_id; @@ -302,6 +305,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key, butil::StringPiece(namebuf, len), PrintSocketMap, this); } s->ReleaseAdditionalReference(); // release extra ref + s->SetHCRelatedRefReleased(); // set released status to cancel health checking SocketUniquePtr ptr(s); // Dereference } } diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index f1c45286b1..c607dc3e72 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -498,6 +498,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) { { brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); + s->SetHCRelatedRefHeld(); // set held status global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd()); @@ -645,7 +646,8 @@ TEST_F(SocketTest, health_check) { ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); - + + s->SetHCRelatedRefHeld(); // set held status global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd());