Skip to content

Commit

Permalink
set _is_in_socket_map after insert successfully
Browse files Browse the repository at this point in the history
  • Loading branch information
chenguangming committed Jul 19, 2022
1 parent d54c1b8 commit b87795d
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 25 deletions.
1 change: 0 additions & 1 deletion src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ int InputMessenger::Create(const butil::EndPoint& remote_side,
options.user = this;
options.on_edge_triggered_events = OnNewMessages;
options.health_check_interval_s = health_check_interval_s;
options.is_in_socket_map = true;
return Socket::Create(options, id);
}

Expand Down
30 changes: 15 additions & 15 deletions src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ChannelBalancer : public SharedLoadBalancer {
private:
butil::Mutex _mutex;
// Find out duplicated sub channels.
ChannelToIdMap _chan_map;
ChannelToIdMap _chan_socket_map;
};

class SubDone;
Expand Down Expand Up @@ -157,11 +157,11 @@ friend class SubDone;

ChannelBalancer::~ChannelBalancer() {
for (ChannelToIdMap::iterator
it = _chan_map.begin(); it != _chan_map.end(); ++it) {
it = _chan_socket_map.begin(); it != _chan_socket_map.end(); ++it) {
SocketUniquePtr ptr(it->second); // Dereference
it->second->ReleaseAdditionalReference();
}
_chan_map.clear();
_chan_socket_map.clear();
}

int ChannelBalancer::Init(const char* lb_name) {
Expand All @@ -175,7 +175,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
return -1;
}
BAIDU_SCOPED_LOCK(_mutex);
if (_chan_map.find(sub_channel) != _chan_map.end()) {
if (_chan_socket_map.find(sub_channel) != _chan_socket_map.end()) {
LOG(ERROR) << "Duplicated sub_channel=" << sub_channel;
return -1;
}
Expand All @@ -189,8 +189,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
SocketOptions options;
options.user = sub_chan;
options.health_check_interval_s = FLAGS_channel_check_interval;
options.is_in_socket_map = true;


if (Socket::Create(options, &sock_id) != 0) {
delete sub_chan;
LOG(ERROR) << "Fail to create fake socket for sub channel";
Expand All @@ -204,7 +203,8 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
ptr->SetFailed();
return -1;
}
_chan_map[sub_channel]= ptr.release(); // Add reference.
ptr->SetInsertedIntoSocketMap(); // set inserted status
_chan_socket_map[sub_channel]= ptr.release(); // Add reference.
if (handle) {
*handle = sock_id;
}
Expand All @@ -221,10 +221,10 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha
SubChannel* sub = static_cast<SubChannel*>(ptr->user());
{
BAIDU_SCOPED_LOCK(_mutex);
CHECK_EQ(1UL, _chan_map.erase(sub->chan));
CHECK_EQ(1UL, _chan_socket_map.erase(sub->chan));
ptr->SetRemovedFromSocketMap(); // set removed status
}
{
ptr->SetRemovedFromSocketMap(); // set removed status
SocketUniquePtr ptr2(ptr.get()); // Dereference.
}
if (rc == 0) {
Expand All @@ -246,8 +246,8 @@ inline int ChannelBalancer::SelectChannel(const LoadBalancer::SelectIn& in,

int ChannelBalancer::CheckHealth() {
BAIDU_SCOPED_LOCK(_mutex);
for (ChannelToIdMap::const_iterator it = _chan_map.begin();
it != _chan_map.end(); ++it) {
for (ChannelToIdMap::const_iterator it = _chan_socket_map.begin();
it != _chan_socket_map.end(); ++it) {
if (!it->second->Failed() &&
it->first->CheckHealth() == 0) {
return 0;
Expand All @@ -260,12 +260,12 @@ void ChannelBalancer::Describe(std::ostream& os,
const DescribeOptions& options) {
BAIDU_SCOPED_LOCK(_mutex);
if (!options.verbose) {
os << _chan_map.size();
os << _chan_socket_map.size();
return;
}
for (ChannelToIdMap::const_iterator it = _chan_map.begin();
it != _chan_map.end(); ++it) {
if (it != _chan_map.begin()) {
for (ChannelToIdMap::const_iterator it = _chan_socket_map.begin();
it != _chan_socket_map.end(); ++it) {
if (it != _chan_socket_map.begin()) {
os << ' ';
}
it->first->Describe(os, options);
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->reset_parsing_context(options.initial_parsing_context);
m->_correlation_id = 0;
m->_health_check_interval_s = options.health_check_interval_s;
m->_is_in_socket_map = options.is_in_socket_map;
m->_is_in_socket_map = false;
m->_ninprocess.store(1, butil::memory_order_relaxed);
m->_auth_flag_error.store(0, butil::memory_order_relaxed);
const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);
Expand Down
8 changes: 5 additions & 3 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ struct SocketOptions {
// one thread at any time.
void (*on_edge_triggered_events)(Socket*);
int health_check_interval_s;
// When socket is inserted into socket map, it needs to be set to true.
bool is_in_socket_map;
std::shared_ptr<SocketSSLContext> initial_ssl_ctx;
bthread_keytable_pool_t* keytable_pool;
SocketConnection* conn;
Expand Down Expand Up @@ -289,8 +287,12 @@ friend class policy::H2GlobalStreamCreator;
int health_check_interval() const { return _health_check_interval_s; }

// Only for SocketMap and ChannelBalancer.
// When socket is removed from SocketMap, set _is_in_socket_map to false.
// When socket is removed from socket map, set _is_in_socket_map to false.
void SetRemovedFromSocketMap() { _is_in_socket_map = false; }
// Only for SocketMap and ChannelBalancer.
// When socket is inserted into socket map, set _is_in_socket_map to true.
void SetInsertedIntoSocketMap() { _is_in_socket_map = true; }
bool IsInSocketMap() const { return _is_in_socket_map; }

// The unique identifier.
SocketId id() const { return _this_id; }
Expand Down
1 change: 0 additions & 1 deletion src/brpc/socket_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ inline SocketOptions::SocketOptions()
, user(NULL)
, on_edge_triggered_events(NULL)
, health_check_interval_s(-1)
, is_in_socket_map(false)
, keytable_pool(NULL)
, conn(NULL)
, app_connect(NULL)
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class GlobalSocketCreator : public SocketCreator {
int CreateSocket(const SocketOptions& opt, SocketId* id) {
SocketOptions sock_opt = opt;
sock_opt.health_check_interval_s = FLAGS_health_check_interval;
sock_opt.is_in_socket_map = true;
return get_client_side_messenger()->Create(sock_opt, id);
}
};
Expand Down Expand Up @@ -245,6 +244,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
return -1;
}
ptr->SetInsertedIntoSocketMap(); // set inserted status
SingleConnection new_sc = { 1, ptr.release(), 0 };
_map[key] = new_sc;
*id = tmp_id;
Expand Down
6 changes: 3 additions & 3 deletions test/brpc_socket_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,11 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
options.remote_side = point;
options.user = new CheckRecycle;
options.health_check_interval_s = 1/*s*/;
options.is_in_socket_map = true;
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
{
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));
s->SetInsertedIntoSocketMap();
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
Expand Down Expand Up @@ -643,11 +643,11 @@ TEST_F(SocketTest, health_check) {
options.remote_side = point;
options.user = new CheckRecycle;
options.health_check_interval_s = kCheckInteval/*s*/;
options.is_in_socket_map = true;
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));


s->SetInsertedIntoSocketMap();
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
Expand Down

0 comments on commit b87795d

Please sign in to comment.