Skip to content

Commit

Permalink
rename _is_in_socket_map to _is_hc_related_ref_held
Browse files Browse the repository at this point in the history
  • Loading branch information
chenguangming committed Jul 21, 2022
1 parent b87795d commit e030196
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 28 deletions.
5 changes: 3 additions & 2 deletions src/brpc/details/health_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
// one is addressing the Socket(except here). Because the Socket
// is not addressable, the reference count will not increase
// again. This solution is not perfect because the `expected_nref'
// is implementation specific. In our case, one reference comes
// from SocketMapInsert(socket_map.cpp) or ChannelBalancer::AddChannel
// is implementation specific. In our case, one reference comes
// from someone who holds a reference related to health checking,
// e.g. SocketMapInsert(socket_map.cpp) or ChannelBalancer::AddChannel
// (selective_channel.cpp), one reference is here. Although WaitAndReset()
// could hang when someone is addressing the failed Socket forever
// (also indicating bug), this is not an issue in current code.
Expand Down
4 changes: 2 additions & 2 deletions src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
ptr->SetFailed();
return -1;
}
ptr->SetInsertedIntoSocketMap(); // set inserted status
ptr->SetHCRelatedRefHeld(); // set held status
_chan_socket_map[sub_channel]= ptr.release(); // Add reference.
if (handle) {
*handle = sock_id;
Expand All @@ -222,9 +222,9 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha
{
BAIDU_SCOPED_LOCK(_mutex);
CHECK_EQ(1UL, _chan_socket_map.erase(sub->chan));
ptr->SetRemovedFromSocketMap(); // set removed status
}
{
ptr->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr2(ptr.get()); // Dereference.
}
if (rc == 0) {
Expand Down
13 changes: 5 additions & 8 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ Socket::Socket(Forbidden)
, _parsing_context(NULL)
, _correlation_id(0)
, _health_check_interval_s(-1)
, _is_in_socket_map(false)
, _is_hc_related_ref_held(false)
, _ninprocess(1)
, _auth_flag_error(0)
, _auth_id(INVALID_BTHREAD_ID)
Expand Down Expand Up @@ -615,7 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->reset_parsing_context(options.initial_parsing_context);
m->_correlation_id = 0;
m->_health_check_interval_s = options.health_check_interval_s;
m->_is_in_socket_map = false;
m->_is_hc_related_ref_held = false;
m->_ninprocess.store(1, butil::memory_order_relaxed);
m->_auth_flag_error.store(0, butil::memory_order_relaxed);
const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);
Expand Down Expand Up @@ -684,12 +684,9 @@ int Socket::WaitAndReset(int32_t expected_nref) {
<< " was abandoned during health checking";
return -1;
} else {
// The health checking expects two references, one reference is here
// and another reference comes from SocketMapInsert(socket_map.cpp)
// or ChannelBalancer::AddChannel(selective_channel.cpp). However,
// when socket has been remove from SocketMap, another reference is
// not from SocketMap or ChannelBalancer, so no need to do health checking.
if (!_is_in_socket_map) {
// nobody holds a health-checking-related reference,
// so no need to do health checking.
if (!_is_hc_related_ref_held) {
LOG(WARNING) << "socket has been removed from SocketMap before health check task";
return -1;
}
Expand Down
22 changes: 11 additions & 11 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,13 @@ friend class policy::H2GlobalStreamCreator;
// Initialized by SocketOptions.health_check_interval_s.
int health_check_interval() const { return _health_check_interval_s; }

// Only for SocketMap and ChannelBalancer.
// When socket is removed from socket map, set _is_in_socket_map to false.
void SetRemovedFromSocketMap() { _is_in_socket_map = false; }
// Only for SocketMap and ChannelBalancer.
// When socket is inserted into socket map, set _is_in_socket_map to true.
void SetInsertedIntoSocketMap() { _is_in_socket_map = true; }
bool IsInSocketMap() const { return _is_in_socket_map; }
// When someone holds a health-checking-related reference,
// this function need to be called to make health checking run normally.
void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; }
// When someone releases the health-checking-related reference,
// this function need to be called to cancel health checking.
void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; }
bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; }

// The unique identifier.
SocketId id() const { return _this_id; }
Expand Down Expand Up @@ -755,10 +755,10 @@ friend void DereferenceSocket(Socket*);
// Non-zero when health-checking is on.
int _health_check_interval_s;

// When socket is inserted into socket map, it needs to be set to true.
// When socket is removed from socket map, it needs to be set to false.
// It can be synchronized via _versioned_ref atomic variable.
bool _is_in_socket_map;
// The variable indicates whether the reference related to the
// health check is held by someone else. It can be synchronized
// via _versioned_ref atomic variable.
bool _is_hc_related_ref_held;

// +-1 bit-+---31 bit---+
// | flag | counter |
Expand Down
8 changes: 5 additions & 3 deletions src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,14 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
SingleConnection* sc = _map.seek(key);
if (sc) {
if (!sc->socket->Failed() ||
sc->socket->health_check_interval() > 0/*HC enabled*/) {
(sc->socket->health_check_interval() > 0 &&
sc->socket->IsHCRelatedRefHeld())/*HC enabled*/) {
++sc->ref_count;
*id = sc->socket->id();
return 0;
}
// A socket w/o HC is failed (permanently), replace it.
sc->socket->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion.
_map.erase(key); // in principle, we can override the entry in map w/o
// removing and inserting it again. But this would make error branches
Expand All @@ -244,7 +246,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
return -1;
}
ptr->SetInsertedIntoSocketMap(); // set inserted status
ptr->SetHCRelatedRefHeld(); // set held status
SingleConnection new_sc = { 1, ptr.release(), 0 };
_map[key] = new_sc;
*id = tmp_id;
Expand Down Expand Up @@ -302,8 +304,8 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
_this_map_bvar = new bvar::PassiveStatus<std::string>(
butil::StringPiece(namebuf, len), PrintSocketMap, this);
}
s->SetRemovedFromSocketMap(); // set removed status
s->ReleaseAdditionalReference(); // release extra ref
s->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(s); // Dereference
}
}
Expand Down
4 changes: 2 additions & 2 deletions test/brpc_socket_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
{
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));
s->SetInsertedIntoSocketMap();
s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
Expand Down Expand Up @@ -647,7 +647,7 @@ TEST_F(SocketTest, health_check) {
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));

s->SetInsertedIntoSocketMap();
s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
Expand Down

0 comments on commit e030196

Please sign in to comment.