From 79c239c1f8a2b54f1efb427454c8421d1b425d43 Mon Sep 17 00:00:00 2001 From: chenguangming Date: Tue, 21 Jun 2022 19:49:03 +0800 Subject: [PATCH 1/7] fix ssl state exception coredump --- src/brpc/socket.cpp | 10 +++++++++- src/brpc/socket.h | 6 ++++++ src/brpc/socket_map.cpp | 1 + 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 59a570ac71..9c80cc02d6 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -434,6 +434,7 @@ Socket::Socket(Forbidden) , _parsing_context(NULL) , _correlation_id(0) , _health_check_interval_s(-1) + , _stop_health_check(false) , _ninprocess(1) , _auth_flag_error(0) , _auth_id(INVALID_BTHREAD_ID) @@ -614,6 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { m->reset_parsing_context(options.initial_parsing_context); m->_correlation_id = 0; m->_health_check_interval_s = options.health_check_interval_s; + m->_stop_health_check.store(false, butil::memory_order_relaxed); m->_ninprocess.store(1, butil::memory_order_relaxed); m->_auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); @@ -686,6 +688,11 @@ int Socket::WaitAndReset(int32_t expected_nref) { } } + if (_stop_health_check.load(butil::memory_order_relaxed)) { + LOG(WARNING) << "stop health check thread"; + return -1; + } + // It's safe to close previous fd (provided expected_nref is correct). const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed); if (ValidFileDescriptor(prev_fd)) { @@ -837,7 +844,8 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { // Do health-checking even if we're not connected before, needed // by Channel to revive never-connected socket when server side // comes online. - if (_health_check_interval_s > 0) { + if (_health_check_interval_s > 0 && + !_stop_health_check.load(butil::memory_order_relaxed)) { GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); StartHealthCheck(id(), GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 4be6a73165..01d1e52ecd 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -286,6 +286,9 @@ friend class policy::H2GlobalStreamCreator; // Initialized by SocketOptions.health_check_interval_s. int health_check_interval() const { return _health_check_interval_s; } + // only for SocketMap + void StopHealthCheck() { _stop_health_check.store(true, butil::memory_order_relaxed); } + // The unique identifier. SocketId id() const { return _this_id; } @@ -747,6 +750,9 @@ friend void DereferenceSocket(Socket*); // Non-zero when health-checking is on. int _health_check_interval_s; + // true when client SocketMap has removed socket + butil::atomic _stop_health_check; + // +-1 bit-+---31 bit---+ // | flag | counter | // +-------+------------+ diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 613f333a98..d3c34fefe5 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -301,6 +301,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key, _this_map_bvar = new bvar::PassiveStatus( butil::StringPiece(namebuf, len), PrintSocketMap, this); } + s->StopHealthCheck(); // stop health check s->ReleaseAdditionalReference(); // release extra ref SocketUniquePtr ptr(s); // Dereference } From 64645e53fecfa5207059544d72f2f1cb3ab18262 Mon Sep 17 00:00:00 2001 From: chenguangming Date: Thu, 23 Jun 2022 14:58:10 +0800 Subject: [PATCH 2/7] fix remove atomic --- src/brpc/socket.cpp | 6 +++--- src/brpc/socket.h | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 9c80cc02d6..b5c2a990c4 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -615,7 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { m->reset_parsing_context(options.initial_parsing_context); m->_correlation_id = 0; m->_health_check_interval_s = options.health_check_interval_s; - m->_stop_health_check.store(false, butil::memory_order_relaxed); + m->_stop_health_check = false; m->_ninprocess.store(1, butil::memory_order_relaxed); m->_auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); @@ -688,7 +688,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { } } - if (_stop_health_check.load(butil::memory_order_relaxed)) { + if (_stop_health_check) { LOG(WARNING) << "stop health check thread"; return -1; } @@ -845,7 +845,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { // by Channel to revive never-connected socket when server side // comes online. if (_health_check_interval_s > 0 && - !_stop_health_check.load(butil::memory_order_relaxed)) { + !_stop_health_check) { GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); StartHealthCheck(id(), GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 01d1e52ecd..2c69927b7d 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -287,7 +287,7 @@ friend class policy::H2GlobalStreamCreator; int health_check_interval() const { return _health_check_interval_s; } // only for SocketMap - void StopHealthCheck() { _stop_health_check.store(true, butil::memory_order_relaxed); } + void StopHealthCheck() { _stop_health_check; } // The unique identifier. SocketId id() const { return _this_id; } @@ -750,8 +750,9 @@ friend void DereferenceSocket(Socket*); // Non-zero when health-checking is on. int _health_check_interval_s; - // true when client SocketMap has removed socket - butil::atomic _stop_health_check; + // True when client SocketMap has removed socket. + // It can be synchronized via _versioned_ref atomic variable + bool _stop_health_check; // +-1 bit-+---31 bit---+ // | flag | counter | From 79097daf8e9c8f23dac0ea4cf9464883e2beabb2 Mon Sep 17 00:00:00 2001 From: chenguangming Date: Thu, 23 Jun 2022 14:59:28 +0800 Subject: [PATCH 3/7] fix set _stop_health_check to true --- src/brpc/socket.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 2c69927b7d..4c14a3e5ee 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -287,7 +287,7 @@ friend class policy::H2GlobalStreamCreator; int health_check_interval() const { return _health_check_interval_s; } // only for SocketMap - void StopHealthCheck() { _stop_health_check; } + void StopHealthCheck() { _stop_health_check = true; } // The unique identifier. SocketId id() const { return _this_id; } From 01f1909a5dd4d84dd48421af3d0295ff7acb2802 Mon Sep 17 00:00:00 2001 From: chenguangming Date: Thu, 30 Jun 2022 16:10:18 +0800 Subject: [PATCH 4/7] fix rename _stop_health_check to _enalbe_health_check --- src/brpc/socket.cpp | 8 ++++---- src/brpc/socket.h | 7 ++++--- src/brpc/socket_map.cpp | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index b5c2a990c4..9574411ff1 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -434,7 +434,7 @@ Socket::Socket(Forbidden) , _parsing_context(NULL) , _correlation_id(0) , _health_check_interval_s(-1) - , _stop_health_check(false) + , _enalbe_health_check(true) , _ninprocess(1) , _auth_flag_error(0) , _auth_id(INVALID_BTHREAD_ID) @@ -615,7 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { m->reset_parsing_context(options.initial_parsing_context); m->_correlation_id = 0; m->_health_check_interval_s = options.health_check_interval_s; - m->_stop_health_check = false; + m->_enalbe_health_check = true; m->_ninprocess.store(1, butil::memory_order_relaxed); m->_auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); @@ -688,7 +688,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { } } - if (_stop_health_check) { + if (!_enalbe_health_check) { LOG(WARNING) << "stop health check thread"; return -1; } @@ -845,7 +845,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { // by Channel to revive never-connected socket when server side // comes online. if (_health_check_interval_s > 0 && - !_stop_health_check) { + _enalbe_health_check) { GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); StartHealthCheck(id(), GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 4c14a3e5ee..ba54f72bf8 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -287,7 +287,7 @@ friend class policy::H2GlobalStreamCreator; int health_check_interval() const { return _health_check_interval_s; } // only for SocketMap - void StopHealthCheck() { _stop_health_check = true; } + void DisableHealthCheck() { _enalbe_health_check = false; } // The unique identifier. SocketId id() const { return _this_id; } @@ -750,9 +750,10 @@ friend void DereferenceSocket(Socket*); // Non-zero when health-checking is on. int _health_check_interval_s; - // True when client SocketMap has removed socket. + // Default: true, + // false when client SocketMap has removed socket. // It can be synchronized via _versioned_ref atomic variable - bool _stop_health_check; + bool _enalbe_health_check; // +-1 bit-+---31 bit---+ // | flag | counter | diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index d3c34fefe5..54dd58440b 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -301,7 +301,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key, _this_map_bvar = new bvar::PassiveStatus( butil::StringPiece(namebuf, len), PrintSocketMap, this); } - s->StopHealthCheck(); // stop health check + s->DisableHealthCheck(); // disable health check s->ReleaseAdditionalReference(); // release extra ref SocketUniquePtr ptr(s); // Dereference } From d54c1b8a1191253bc4f2483e6088fcdb70c34965 Mon Sep 17 00:00:00 2001 From: chenguangming Date: Mon, 18 Jul 2022 12:10:37 +0800 Subject: [PATCH 5/7] rename _enalbe_health_check to _is_in_socket_map --- src/brpc/details/health_check.cpp | 8 ++++---- src/brpc/input_messenger.cpp | 1 + src/brpc/selective_channel.cpp | 2 ++ src/brpc/socket.cpp | 22 +++++++++++++--------- src/brpc/socket.h | 15 +++++++++------ src/brpc/socket_inl.h | 1 + src/brpc/socket_map.cpp | 3 ++- test/brpc_socket_unittest.cpp | 2 ++ 8 files changed, 34 insertions(+), 20 deletions(-) diff --git a/src/brpc/details/health_check.cpp b/src/brpc/details/health_check.cpp index 6cb5f47da2..8e17c214a3 100644 --- a/src/brpc/details/health_check.cpp +++ b/src/brpc/details/health_check.cpp @@ -180,10 +180,10 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { // is not addressable, the reference count will not increase // again. This solution is not perfect because the `expected_nref' // is implementation specific. In our case, one reference comes - // from SocketMapInsert(socket_map.cpp), one reference is here. - // Although WaitAndReset() could hang when someone is addressing - // the failed Socket forever (also indicating bug), this is not an - // issue in current code. + // from SocketMapInsert(socket_map.cpp) or ChannelBalancer::AddChannel + // (selective_channel.cpp), one reference is here. Although WaitAndReset() + // could hang when someone is addressing the failed Socket forever + // (also indicating bug), this is not an issue in current code. if (_first_time) { // Only check at first time. _first_time = false; if (ptr->WaitAndReset(2/*note*/) != 0) { diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index d9b1a3a9ad..4d36a72349 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -439,6 +439,7 @@ int InputMessenger::Create(const butil::EndPoint& remote_side, options.user = this; options.on_edge_triggered_events = OnNewMessages; options.health_check_interval_s = health_check_interval_s; + options.is_in_socket_map = true; return Socket::Create(options, id); } diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index 89f52415c8..e078d45550 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -189,6 +189,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, SocketOptions options; options.user = sub_chan; options.health_check_interval_s = FLAGS_channel_check_interval; + options.is_in_socket_map = true; if (Socket::Create(options, &sock_id) != 0) { delete sub_chan; @@ -223,6 +224,7 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha CHECK_EQ(1UL, _chan_map.erase(sub->chan)); } { + ptr->SetRemovedFromSocketMap(); // set removed status SocketUniquePtr ptr2(ptr.get()); // Dereference. } if (rc == 0) { diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 9574411ff1..01ee536a82 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -434,7 +434,7 @@ Socket::Socket(Forbidden) , _parsing_context(NULL) , _correlation_id(0) , _health_check_interval_s(-1) - , _enalbe_health_check(true) + , _is_in_socket_map(false) , _ninprocess(1) , _auth_flag_error(0) , _auth_id(INVALID_BTHREAD_ID) @@ -615,7 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { m->reset_parsing_context(options.initial_parsing_context); m->_correlation_id = 0; m->_health_check_interval_s = options.health_check_interval_s; - m->_enalbe_health_check = true; + m->_is_in_socket_map = options.is_in_socket_map; m->_ninprocess.store(1, butil::memory_order_relaxed); m->_auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); @@ -684,15 +684,20 @@ int Socket::WaitAndReset(int32_t expected_nref) { << " was abandoned during health checking"; return -1; } else { + // The health checking expects two references, one reference is here + // and another reference comes from SocketMapInsert(socket_map.cpp) + // or ChannelBalancer::AddChannel(selective_channel.cpp). However, + // when socket has been remove from SocketMap, another reference is + // not from SocketMap or ChannelBalancer, so no need to do health checking. + if (!_is_in_socket_map) { + LOG(WARNING) << "socket has been removed from SocketMap before health check task"; + return -1; + } + break; } } - if (!_enalbe_health_check) { - LOG(WARNING) << "stop health check thread"; - return -1; - } - // It's safe to close previous fd (provided expected_nref is correct). const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed); if (ValidFileDescriptor(prev_fd)) { @@ -844,8 +849,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { // Do health-checking even if we're not connected before, needed // by Channel to revive never-connected socket when server side // comes online. - if (_health_check_interval_s > 0 && - _enalbe_health_check) { + if (_health_check_interval_s > 0) { GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); StartHealthCheck(id(), GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); diff --git a/src/brpc/socket.h b/src/brpc/socket.h index ba54f72bf8..831a48cc6a 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -187,6 +187,8 @@ struct SocketOptions { // one thread at any time. void (*on_edge_triggered_events)(Socket*); int health_check_interval_s; + // When socket is inserted into socket map, it needs to be set to true. + bool is_in_socket_map; std::shared_ptr initial_ssl_ctx; bthread_keytable_pool_t* keytable_pool; SocketConnection* conn; @@ -286,8 +288,9 @@ friend class policy::H2GlobalStreamCreator; // Initialized by SocketOptions.health_check_interval_s. int health_check_interval() const { return _health_check_interval_s; } - // only for SocketMap - void DisableHealthCheck() { _enalbe_health_check = false; } + // Only for SocketMap and ChannelBalancer. + // When socket is removed from SocketMap, set _is_in_socket_map to false. + void SetRemovedFromSocketMap() { _is_in_socket_map = false; } // The unique identifier. SocketId id() const { return _this_id; } @@ -750,10 +753,10 @@ friend void DereferenceSocket(Socket*); // Non-zero when health-checking is on. int _health_check_interval_s; - // Default: true, - // false when client SocketMap has removed socket. - // It can be synchronized via _versioned_ref atomic variable - bool _enalbe_health_check; + // When socket is inserted into socket map, it needs to be set to true. + // When socket is removed from socket map, it needs to be set to false. + // It can be synchronized via _versioned_ref atomic variable. + bool _is_in_socket_map; // +-1 bit-+---31 bit---+ // | flag | counter | diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h index 31ce6a907e..510f88fac6 100644 --- a/src/brpc/socket_inl.h +++ b/src/brpc/socket_inl.h @@ -57,6 +57,7 @@ inline SocketOptions::SocketOptions() , user(NULL) , on_edge_triggered_events(NULL) , health_check_interval_s(-1) + , is_in_socket_map(false) , keytable_pool(NULL) , conn(NULL) , app_connect(NULL) diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 54dd58440b..80847453a1 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -58,6 +58,7 @@ class GlobalSocketCreator : public SocketCreator { int CreateSocket(const SocketOptions& opt, SocketId* id) { SocketOptions sock_opt = opt; sock_opt.health_check_interval_s = FLAGS_health_check_interval; + sock_opt.is_in_socket_map = true; return get_client_side_messenger()->Create(sock_opt, id); } }; @@ -301,7 +302,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key, _this_map_bvar = new bvar::PassiveStatus( butil::StringPiece(namebuf, len), PrintSocketMap, this); } - s->DisableHealthCheck(); // disable health check + s->SetRemovedFromSocketMap(); // set removed status s->ReleaseAdditionalReference(); // release extra ref SocketUniquePtr ptr(s); // Dereference } diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index f1c45286b1..f88b39b4f2 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -494,6 +494,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) { options.remote_side = point; options.user = new CheckRecycle; options.health_check_interval_s = 1/*s*/; + options.is_in_socket_map = true; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); { brpc::SocketUniquePtr s; @@ -642,6 +643,7 @@ TEST_F(SocketTest, health_check) { options.remote_side = point; options.user = new CheckRecycle; options.health_check_interval_s = kCheckInteval/*s*/; + options.is_in_socket_map = true; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); From b87795d9c5bea8816117472fe04261f41ce75d11 Mon Sep 17 00:00:00 2001 From: chenguangming Date: Tue, 19 Jul 2022 11:45:55 +0800 Subject: [PATCH 6/7] set _is_in_socket_map after insert successfully --- src/brpc/input_messenger.cpp | 1 - src/brpc/selective_channel.cpp | 30 +++++++++++++++--------------- src/brpc/socket.cpp | 2 +- src/brpc/socket.h | 8 +++++--- src/brpc/socket_inl.h | 1 - src/brpc/socket_map.cpp | 2 +- test/brpc_socket_unittest.cpp | 6 +++--- 7 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index 4d36a72349..d9b1a3a9ad 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -439,7 +439,6 @@ int InputMessenger::Create(const butil::EndPoint& remote_side, options.user = this; options.on_edge_triggered_events = OnNewMessages; options.health_check_interval_s = health_check_interval_s; - options.is_in_socket_map = true; return Socket::Create(options, id); } diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index e078d45550..658f433a49 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -93,7 +93,7 @@ class ChannelBalancer : public SharedLoadBalancer { private: butil::Mutex _mutex; // Find out duplicated sub channels. - ChannelToIdMap _chan_map; + ChannelToIdMap _chan_socket_map; }; class SubDone; @@ -157,11 +157,11 @@ friend class SubDone; ChannelBalancer::~ChannelBalancer() { for (ChannelToIdMap::iterator - it = _chan_map.begin(); it != _chan_map.end(); ++it) { + it = _chan_socket_map.begin(); it != _chan_socket_map.end(); ++it) { SocketUniquePtr ptr(it->second); // Dereference it->second->ReleaseAdditionalReference(); } - _chan_map.clear(); + _chan_socket_map.clear(); } int ChannelBalancer::Init(const char* lb_name) { @@ -175,7 +175,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, return -1; } BAIDU_SCOPED_LOCK(_mutex); - if (_chan_map.find(sub_channel) != _chan_map.end()) { + if (_chan_socket_map.find(sub_channel) != _chan_socket_map.end()) { LOG(ERROR) << "Duplicated sub_channel=" << sub_channel; return -1; } @@ -189,8 +189,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, SocketOptions options; options.user = sub_chan; options.health_check_interval_s = FLAGS_channel_check_interval; - options.is_in_socket_map = true; - + if (Socket::Create(options, &sock_id) != 0) { delete sub_chan; LOG(ERROR) << "Fail to create fake socket for sub channel"; @@ -204,7 +203,8 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, ptr->SetFailed(); return -1; } - _chan_map[sub_channel]= ptr.release(); // Add reference. + ptr->SetInsertedIntoSocketMap(); // set inserted status + _chan_socket_map[sub_channel]= ptr.release(); // Add reference. if (handle) { *handle = sock_id; } @@ -221,10 +221,10 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha SubChannel* sub = static_cast(ptr->user()); { BAIDU_SCOPED_LOCK(_mutex); - CHECK_EQ(1UL, _chan_map.erase(sub->chan)); + CHECK_EQ(1UL, _chan_socket_map.erase(sub->chan)); + ptr->SetRemovedFromSocketMap(); // set removed status } { - ptr->SetRemovedFromSocketMap(); // set removed status SocketUniquePtr ptr2(ptr.get()); // Dereference. } if (rc == 0) { @@ -246,8 +246,8 @@ inline int ChannelBalancer::SelectChannel(const LoadBalancer::SelectIn& in, int ChannelBalancer::CheckHealth() { BAIDU_SCOPED_LOCK(_mutex); - for (ChannelToIdMap::const_iterator it = _chan_map.begin(); - it != _chan_map.end(); ++it) { + for (ChannelToIdMap::const_iterator it = _chan_socket_map.begin(); + it != _chan_socket_map.end(); ++it) { if (!it->second->Failed() && it->first->CheckHealth() == 0) { return 0; @@ -260,12 +260,12 @@ void ChannelBalancer::Describe(std::ostream& os, const DescribeOptions& options) { BAIDU_SCOPED_LOCK(_mutex); if (!options.verbose) { - os << _chan_map.size(); + os << _chan_socket_map.size(); return; } - for (ChannelToIdMap::const_iterator it = _chan_map.begin(); - it != _chan_map.end(); ++it) { - if (it != _chan_map.begin()) { + for (ChannelToIdMap::const_iterator it = _chan_socket_map.begin(); + it != _chan_socket_map.end(); ++it) { + if (it != _chan_socket_map.begin()) { os << ' '; } it->first->Describe(os, options); diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 01ee536a82..99c0798848 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -615,7 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { m->reset_parsing_context(options.initial_parsing_context); m->_correlation_id = 0; m->_health_check_interval_s = options.health_check_interval_s; - m->_is_in_socket_map = options.is_in_socket_map; + m->_is_in_socket_map = false; m->_ninprocess.store(1, butil::memory_order_relaxed); m->_auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 831a48cc6a..a2d247fefd 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -187,8 +187,6 @@ struct SocketOptions { // one thread at any time. void (*on_edge_triggered_events)(Socket*); int health_check_interval_s; - // When socket is inserted into socket map, it needs to be set to true. - bool is_in_socket_map; std::shared_ptr initial_ssl_ctx; bthread_keytable_pool_t* keytable_pool; SocketConnection* conn; @@ -289,8 +287,12 @@ friend class policy::H2GlobalStreamCreator; int health_check_interval() const { return _health_check_interval_s; } // Only for SocketMap and ChannelBalancer. - // When socket is removed from SocketMap, set _is_in_socket_map to false. + // When socket is removed from socket map, set _is_in_socket_map to false. void SetRemovedFromSocketMap() { _is_in_socket_map = false; } + // Only for SocketMap and ChannelBalancer. + // When socket is inserted into socket map, set _is_in_socket_map to true. + void SetInsertedIntoSocketMap() { _is_in_socket_map = true; } + bool IsInSocketMap() const { return _is_in_socket_map; } // The unique identifier. SocketId id() const { return _this_id; } diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h index 510f88fac6..31ce6a907e 100644 --- a/src/brpc/socket_inl.h +++ b/src/brpc/socket_inl.h @@ -57,7 +57,6 @@ inline SocketOptions::SocketOptions() , user(NULL) , on_edge_triggered_events(NULL) , health_check_interval_s(-1) - , is_in_socket_map(false) , keytable_pool(NULL) , conn(NULL) , app_connect(NULL) diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 80847453a1..abbf4ca757 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -58,7 +58,6 @@ class GlobalSocketCreator : public SocketCreator { int CreateSocket(const SocketOptions& opt, SocketId* id) { SocketOptions sock_opt = opt; sock_opt.health_check_interval_s = FLAGS_health_check_interval; - sock_opt.is_in_socket_map = true; return get_client_side_messenger()->Create(sock_opt, id); } }; @@ -245,6 +244,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, LOG(FATAL) << "Fail to address SocketId=" << tmp_id; return -1; } + ptr->SetInsertedIntoSocketMap(); // set inserted status SingleConnection new_sc = { 1, ptr.release(), 0 }; _map[key] = new_sc; *id = tmp_id; diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index f88b39b4f2..9db69112a5 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -494,11 +494,11 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) { options.remote_side = point; options.user = new CheckRecycle; options.health_check_interval_s = 1/*s*/; - options.is_in_socket_map = true; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); { brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); + s->SetInsertedIntoSocketMap(); global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd()); @@ -643,11 +643,11 @@ TEST_F(SocketTest, health_check) { options.remote_side = point; options.user = new CheckRecycle; options.health_check_interval_s = kCheckInteval/*s*/; - options.is_in_socket_map = true; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); - + + s->SetInsertedIntoSocketMap(); global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd()); From 9ca45137bdc5b31866aee983c854c256320fd2b2 Mon Sep 17 00:00:00 2001 From: chenguangming Date: Thu, 21 Jul 2022 12:14:45 +0800 Subject: [PATCH 7/7] rename _is_in_socket_map to _is_hc_related_ref_held --- src/brpc/details/health_check.cpp | 5 +++-- src/brpc/selective_channel.cpp | 28 ++++++++++++++-------------- src/brpc/socket.cpp | 15 ++++++--------- src/brpc/socket.h | 22 +++++++++++----------- src/brpc/socket_map.cpp | 8 +++++--- test/brpc_socket_unittest.cpp | 4 ++-- 6 files changed, 41 insertions(+), 41 deletions(-) diff --git a/src/brpc/details/health_check.cpp b/src/brpc/details/health_check.cpp index 8e17c214a3..4c88604eb3 100644 --- a/src/brpc/details/health_check.cpp +++ b/src/brpc/details/health_check.cpp @@ -179,8 +179,9 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { // one is addressing the Socket(except here). Because the Socket // is not addressable, the reference count will not increase // again. This solution is not perfect because the `expected_nref' - // is implementation specific. In our case, one reference comes - // from SocketMapInsert(socket_map.cpp) or ChannelBalancer::AddChannel + // is implementation specific. In our case, one reference comes + // from someone who holds a reference related to health checking, + // e.g. SocketMapInsert(socket_map.cpp) or ChannelBalancer::AddChannel // (selective_channel.cpp), one reference is here. Although WaitAndReset() // could hang when someone is addressing the failed Socket forever // (also indicating bug), this is not an issue in current code. diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index 658f433a49..e3410409f9 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -93,7 +93,7 @@ class ChannelBalancer : public SharedLoadBalancer { private: butil::Mutex _mutex; // Find out duplicated sub channels. - ChannelToIdMap _chan_socket_map; + ChannelToIdMap _chan_map; }; class SubDone; @@ -157,11 +157,11 @@ friend class SubDone; ChannelBalancer::~ChannelBalancer() { for (ChannelToIdMap::iterator - it = _chan_socket_map.begin(); it != _chan_socket_map.end(); ++it) { + it = _chan_map.begin(); it != _chan_map.end(); ++it) { SocketUniquePtr ptr(it->second); // Dereference it->second->ReleaseAdditionalReference(); } - _chan_socket_map.clear(); + _chan_map.clear(); } int ChannelBalancer::Init(const char* lb_name) { @@ -175,7 +175,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, return -1; } BAIDU_SCOPED_LOCK(_mutex); - if (_chan_socket_map.find(sub_channel) != _chan_socket_map.end()) { + if (_chan_map.find(sub_channel) != _chan_map.end()) { LOG(ERROR) << "Duplicated sub_channel=" << sub_channel; return -1; } @@ -203,8 +203,8 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, ptr->SetFailed(); return -1; } - ptr->SetInsertedIntoSocketMap(); // set inserted status - _chan_socket_map[sub_channel]= ptr.release(); // Add reference. + ptr->SetHCRelatedRefHeld(); // set held status + _chan_map[sub_channel]= ptr.release(); // Add reference. if (handle) { *handle = sock_id; } @@ -221,10 +221,10 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha SubChannel* sub = static_cast(ptr->user()); { BAIDU_SCOPED_LOCK(_mutex); - CHECK_EQ(1UL, _chan_socket_map.erase(sub->chan)); - ptr->SetRemovedFromSocketMap(); // set removed status + CHECK_EQ(1UL, _chan_map.erase(sub->chan)); } { + ptr->SetHCRelatedRefReleased(); // set released status to cancel health checking SocketUniquePtr ptr2(ptr.get()); // Dereference. } if (rc == 0) { @@ -246,8 +246,8 @@ inline int ChannelBalancer::SelectChannel(const LoadBalancer::SelectIn& in, int ChannelBalancer::CheckHealth() { BAIDU_SCOPED_LOCK(_mutex); - for (ChannelToIdMap::const_iterator it = _chan_socket_map.begin(); - it != _chan_socket_map.end(); ++it) { + for (ChannelToIdMap::const_iterator it = _chan_map.begin(); + it != _chan_map.end(); ++it) { if (!it->second->Failed() && it->first->CheckHealth() == 0) { return 0; @@ -260,12 +260,12 @@ void ChannelBalancer::Describe(std::ostream& os, const DescribeOptions& options) { BAIDU_SCOPED_LOCK(_mutex); if (!options.verbose) { - os << _chan_socket_map.size(); + os << _chan_map.size(); return; } - for (ChannelToIdMap::const_iterator it = _chan_socket_map.begin(); - it != _chan_socket_map.end(); ++it) { - if (it != _chan_socket_map.begin()) { + for (ChannelToIdMap::const_iterator it = _chan_map.begin(); + it != _chan_map.end(); ++it) { + if (it != _chan_map.begin()) { os << ' '; } it->first->Describe(os, options); diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 99c0798848..45a19bef0f 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -434,7 +434,7 @@ Socket::Socket(Forbidden) , _parsing_context(NULL) , _correlation_id(0) , _health_check_interval_s(-1) - , _is_in_socket_map(false) + , _is_hc_related_ref_held(false) , _ninprocess(1) , _auth_flag_error(0) , _auth_id(INVALID_BTHREAD_ID) @@ -615,7 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { m->reset_parsing_context(options.initial_parsing_context); m->_correlation_id = 0; m->_health_check_interval_s = options.health_check_interval_s; - m->_is_in_socket_map = false; + m->_is_hc_related_ref_held = false; m->_ninprocess.store(1, butil::memory_order_relaxed); m->_auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); @@ -684,13 +684,10 @@ int Socket::WaitAndReset(int32_t expected_nref) { << " was abandoned during health checking"; return -1; } else { - // The health checking expects two references, one reference is here - // and another reference comes from SocketMapInsert(socket_map.cpp) - // or ChannelBalancer::AddChannel(selective_channel.cpp). However, - // when socket has been remove from SocketMap, another reference is - // not from SocketMap or ChannelBalancer, so no need to do health checking. - if (!_is_in_socket_map) { - LOG(WARNING) << "socket has been removed from SocketMap before health check task"; + // nobody holds a health-checking-related reference, + // so no need to do health checking. + if (!_is_hc_related_ref_held) { + LOG(WARNING) << "nobody holds a health-checking-related reference"; return -1; } diff --git a/src/brpc/socket.h b/src/brpc/socket.h index a2d247fefd..6e4c34f248 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -286,13 +286,13 @@ friend class policy::H2GlobalStreamCreator; // Initialized by SocketOptions.health_check_interval_s. int health_check_interval() const { return _health_check_interval_s; } - // Only for SocketMap and ChannelBalancer. - // When socket is removed from socket map, set _is_in_socket_map to false. - void SetRemovedFromSocketMap() { _is_in_socket_map = false; } - // Only for SocketMap and ChannelBalancer. - // When socket is inserted into socket map, set _is_in_socket_map to true. - void SetInsertedIntoSocketMap() { _is_in_socket_map = true; } - bool IsInSocketMap() const { return _is_in_socket_map; } + // When someone holds a health-checking-related reference, + // this function need to be called to make health checking run normally. + void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; } + // When someone releases the health-checking-related reference, + // this function need to be called to cancel health checking. + void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; } + bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; } // The unique identifier. SocketId id() const { return _this_id; } @@ -755,10 +755,10 @@ friend void DereferenceSocket(Socket*); // Non-zero when health-checking is on. int _health_check_interval_s; - // When socket is inserted into socket map, it needs to be set to true. - // When socket is removed from socket map, it needs to be set to false. - // It can be synchronized via _versioned_ref atomic variable. - bool _is_in_socket_map; + // The variable indicates whether the reference related + // to the health checking is held by someone. It can be + // synchronized via _versioned_ref atomic variable. + bool _is_hc_related_ref_held; // +-1 bit-+---31 bit---+ // | flag | counter | diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index abbf4ca757..a08f50b299 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -215,12 +215,14 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, SingleConnection* sc = _map.seek(key); if (sc) { if (!sc->socket->Failed() || - sc->socket->health_check_interval() > 0/*HC enabled*/) { + (sc->socket->health_check_interval() > 0 && + sc->socket->IsHCRelatedRefHeld())/*HC enabled*/) { ++sc->ref_count; *id = sc->socket->id(); return 0; } // A socket w/o HC is failed (permanently), replace it. + sc->socket->SetHCRelatedRefReleased(); // set released status to cancel health checking SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion. _map.erase(key); // in principle, we can override the entry in map w/o // removing and inserting it again. But this would make error branches @@ -244,7 +246,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, LOG(FATAL) << "Fail to address SocketId=" << tmp_id; return -1; } - ptr->SetInsertedIntoSocketMap(); // set inserted status + ptr->SetHCRelatedRefHeld(); // set held status SingleConnection new_sc = { 1, ptr.release(), 0 }; _map[key] = new_sc; *id = tmp_id; @@ -302,8 +304,8 @@ void SocketMap::RemoveInternal(const SocketMapKey& key, _this_map_bvar = new bvar::PassiveStatus( butil::StringPiece(namebuf, len), PrintSocketMap, this); } - s->SetRemovedFromSocketMap(); // set removed status s->ReleaseAdditionalReference(); // release extra ref + s->SetHCRelatedRefReleased(); // set released status to cancel health checking SocketUniquePtr ptr(s); // Dereference } } diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index 9db69112a5..c607dc3e72 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -498,7 +498,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) { { brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); - s->SetInsertedIntoSocketMap(); + s->SetHCRelatedRefHeld(); // set held status global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd()); @@ -647,7 +647,7 @@ TEST_F(SocketTest, health_check) { brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); - s->SetInsertedIntoSocketMap(); + s->SetHCRelatedRefHeld(); // set held status global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd());