From b87795d9c5bea8816117472fe04261f41ce75d11 Mon Sep 17 00:00:00 2001 From: chenguangming Date: Tue, 19 Jul 2022 11:45:55 +0800 Subject: [PATCH] set _is_in_socket_map after insert successfully --- src/brpc/input_messenger.cpp | 1 - src/brpc/selective_channel.cpp | 30 +++++++++++++++--------------- src/brpc/socket.cpp | 2 +- src/brpc/socket.h | 8 +++++--- src/brpc/socket_inl.h | 1 - src/brpc/socket_map.cpp | 2 +- test/brpc_socket_unittest.cpp | 6 +++--- 7 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index 4d36a72349..d9b1a3a9ad 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -439,7 +439,6 @@ int InputMessenger::Create(const butil::EndPoint& remote_side, options.user = this; options.on_edge_triggered_events = OnNewMessages; options.health_check_interval_s = health_check_interval_s; - options.is_in_socket_map = true; return Socket::Create(options, id); } diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index e078d45550..658f433a49 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -93,7 +93,7 @@ class ChannelBalancer : public SharedLoadBalancer { private: butil::Mutex _mutex; // Find out duplicated sub channels. - ChannelToIdMap _chan_map; + ChannelToIdMap _chan_socket_map; }; class SubDone; @@ -157,11 +157,11 @@ friend class SubDone; ChannelBalancer::~ChannelBalancer() { for (ChannelToIdMap::iterator - it = _chan_map.begin(); it != _chan_map.end(); ++it) { + it = _chan_socket_map.begin(); it != _chan_socket_map.end(); ++it) { SocketUniquePtr ptr(it->second); // Dereference it->second->ReleaseAdditionalReference(); } - _chan_map.clear(); + _chan_socket_map.clear(); } int ChannelBalancer::Init(const char* lb_name) { @@ -175,7 +175,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, return -1; } BAIDU_SCOPED_LOCK(_mutex); - if (_chan_map.find(sub_channel) != _chan_map.end()) { + if (_chan_socket_map.find(sub_channel) != _chan_socket_map.end()) { LOG(ERROR) << "Duplicated sub_channel=" << sub_channel; return -1; } @@ -189,8 +189,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, SocketOptions options; options.user = sub_chan; options.health_check_interval_s = FLAGS_channel_check_interval; - options.is_in_socket_map = true; - + if (Socket::Create(options, &sock_id) != 0) { delete sub_chan; LOG(ERROR) << "Fail to create fake socket for sub channel"; @@ -204,7 +203,8 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, ptr->SetFailed(); return -1; } - _chan_map[sub_channel]= ptr.release(); // Add reference. + ptr->SetInsertedIntoSocketMap(); // set inserted status + _chan_socket_map[sub_channel]= ptr.release(); // Add reference. if (handle) { *handle = sock_id; } @@ -221,10 +221,10 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha SubChannel* sub = static_cast(ptr->user()); { BAIDU_SCOPED_LOCK(_mutex); - CHECK_EQ(1UL, _chan_map.erase(sub->chan)); + CHECK_EQ(1UL, _chan_socket_map.erase(sub->chan)); + ptr->SetRemovedFromSocketMap(); // set removed status } { - ptr->SetRemovedFromSocketMap(); // set removed status SocketUniquePtr ptr2(ptr.get()); // Dereference. } if (rc == 0) { @@ -246,8 +246,8 @@ inline int ChannelBalancer::SelectChannel(const LoadBalancer::SelectIn& in, int ChannelBalancer::CheckHealth() { BAIDU_SCOPED_LOCK(_mutex); - for (ChannelToIdMap::const_iterator it = _chan_map.begin(); - it != _chan_map.end(); ++it) { + for (ChannelToIdMap::const_iterator it = _chan_socket_map.begin(); + it != _chan_socket_map.end(); ++it) { if (!it->second->Failed() && it->first->CheckHealth() == 0) { return 0; @@ -260,12 +260,12 @@ void ChannelBalancer::Describe(std::ostream& os, const DescribeOptions& options) { BAIDU_SCOPED_LOCK(_mutex); if (!options.verbose) { - os << _chan_map.size(); + os << _chan_socket_map.size(); return; } - for (ChannelToIdMap::const_iterator it = _chan_map.begin(); - it != _chan_map.end(); ++it) { - if (it != _chan_map.begin()) { + for (ChannelToIdMap::const_iterator it = _chan_socket_map.begin(); + it != _chan_socket_map.end(); ++it) { + if (it != _chan_socket_map.begin()) { os << ' '; } it->first->Describe(os, options); diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 01ee536a82..99c0798848 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -615,7 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { m->reset_parsing_context(options.initial_parsing_context); m->_correlation_id = 0; m->_health_check_interval_s = options.health_check_interval_s; - m->_is_in_socket_map = options.is_in_socket_map; + m->_is_in_socket_map = false; m->_ninprocess.store(1, butil::memory_order_relaxed); m->_auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 831a48cc6a..a2d247fefd 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -187,8 +187,6 @@ struct SocketOptions { // one thread at any time. void (*on_edge_triggered_events)(Socket*); int health_check_interval_s; - // When socket is inserted into socket map, it needs to be set to true. - bool is_in_socket_map; std::shared_ptr initial_ssl_ctx; bthread_keytable_pool_t* keytable_pool; SocketConnection* conn; @@ -289,8 +287,12 @@ friend class policy::H2GlobalStreamCreator; int health_check_interval() const { return _health_check_interval_s; } // Only for SocketMap and ChannelBalancer. - // When socket is removed from SocketMap, set _is_in_socket_map to false. + // When socket is removed from socket map, set _is_in_socket_map to false. void SetRemovedFromSocketMap() { _is_in_socket_map = false; } + // Only for SocketMap and ChannelBalancer. + // When socket is inserted into socket map, set _is_in_socket_map to true. + void SetInsertedIntoSocketMap() { _is_in_socket_map = true; } + bool IsInSocketMap() const { return _is_in_socket_map; } // The unique identifier. SocketId id() const { return _this_id; } diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h index 510f88fac6..31ce6a907e 100644 --- a/src/brpc/socket_inl.h +++ b/src/brpc/socket_inl.h @@ -57,7 +57,6 @@ inline SocketOptions::SocketOptions() , user(NULL) , on_edge_triggered_events(NULL) , health_check_interval_s(-1) - , is_in_socket_map(false) , keytable_pool(NULL) , conn(NULL) , app_connect(NULL) diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 80847453a1..abbf4ca757 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -58,7 +58,6 @@ class GlobalSocketCreator : public SocketCreator { int CreateSocket(const SocketOptions& opt, SocketId* id) { SocketOptions sock_opt = opt; sock_opt.health_check_interval_s = FLAGS_health_check_interval; - sock_opt.is_in_socket_map = true; return get_client_side_messenger()->Create(sock_opt, id); } }; @@ -245,6 +244,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, LOG(FATAL) << "Fail to address SocketId=" << tmp_id; return -1; } + ptr->SetInsertedIntoSocketMap(); // set inserted status SingleConnection new_sc = { 1, ptr.release(), 0 }; _map[key] = new_sc; *id = tmp_id; diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index f88b39b4f2..9db69112a5 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -494,11 +494,11 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) { options.remote_side = point; options.user = new CheckRecycle; options.health_check_interval_s = 1/*s*/; - options.is_in_socket_map = true; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); { brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); + s->SetInsertedIntoSocketMap(); global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd()); @@ -643,11 +643,11 @@ TEST_F(SocketTest, health_check) { options.remote_side = point; options.user = new CheckRecycle; options.health_check_interval_s = kCheckInteval/*s*/; - options.is_in_socket_map = true; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); - + + s->SetInsertedIntoSocketMap(); global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd());