Skip to content

Commit

Permalink
Merge pull request #1814 from chenBright/fix_socket_ssl_state
Browse files Browse the repository at this point in the history
Fix ssl state exception coredump when Dowrite and HealthCheck happen simultaneously
  • Loading branch information
zyearn authored Jul 21, 2022
2 parents 53cbd1a + 9ca4513 commit 0676bbc
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 8 deletions.
11 changes: 6 additions & 5 deletions src/brpc/details/health_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,12 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
// one is addressing the Socket(except here). Because the Socket
// is not addressable, the reference count will not increase
// again. This solution is not perfect because the `expected_nref'
// is implementation specific. In our case, one reference comes
// from SocketMapInsert(socket_map.cpp), one reference is here.
// Although WaitAndReset() could hang when someone is addressing
// the failed Socket forever (also indicating bug), this is not an
// issue in current code.
// is implementation specific. In our case, one reference comes
// from someone who holds a reference related to health checking,
// e.g. SocketMapInsert(socket_map.cpp) or ChannelBalancer::AddChannel
// (selective_channel.cpp), one reference is here. Although WaitAndReset()
// could hang when someone is addressing the failed Socket forever
// (also indicating bug), this is not an issue in current code.
if (_first_time) { // Only check at first time.
_first_time = false;
if (ptr->WaitAndReset(2/*note*/) != 0) {
Expand Down
4 changes: 3 additions & 1 deletion src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
SocketOptions options;
options.user = sub_chan;
options.health_check_interval_s = FLAGS_channel_check_interval;

if (Socket::Create(options, &sock_id) != 0) {
delete sub_chan;
LOG(ERROR) << "Fail to create fake socket for sub channel";
Expand All @@ -203,6 +203,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
ptr->SetFailed();
return -1;
}
ptr->SetHCRelatedRefHeld(); // set held status
_chan_map[sub_channel]= ptr.release(); // Add reference.
if (handle) {
*handle = sock_id;
Expand All @@ -223,6 +224,7 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha
CHECK_EQ(1UL, _chan_map.erase(sub->chan));
}
{
ptr->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr2(ptr.get()); // Dereference.
}
if (rc == 0) {
Expand Down
9 changes: 9 additions & 0 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ Socket::Socket(Forbidden)
, _parsing_context(NULL)
, _correlation_id(0)
, _health_check_interval_s(-1)
, _is_hc_related_ref_held(false)
, _ninprocess(1)
, _auth_flag_error(0)
, _auth_id(INVALID_BTHREAD_ID)
Expand Down Expand Up @@ -614,6 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->reset_parsing_context(options.initial_parsing_context);
m->_correlation_id = 0;
m->_health_check_interval_s = options.health_check_interval_s;
m->_is_hc_related_ref_held = false;
m->_ninprocess.store(1, butil::memory_order_relaxed);
m->_auth_flag_error.store(0, butil::memory_order_relaxed);
const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);
Expand Down Expand Up @@ -682,6 +684,13 @@ int Socket::WaitAndReset(int32_t expected_nref) {
<< " was abandoned during health checking";
return -1;
} else {
// nobody holds a health-checking-related reference,
// so no need to do health checking.
if (!_is_hc_related_ref_held) {
LOG(WARNING) << "nobody holds a health-checking-related reference";
return -1;
}

break;
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ friend class policy::H2GlobalStreamCreator;
// Initialized by SocketOptions.health_check_interval_s.
int health_check_interval() const { return _health_check_interval_s; }

// When someone holds a health-checking-related reference,
// this function need to be called to make health checking run normally.
void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; }
// When someone releases the health-checking-related reference,
// this function need to be called to cancel health checking.
void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; }
bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; }

// The unique identifier.
SocketId id() const { return _this_id; }

Expand Down Expand Up @@ -747,6 +755,11 @@ friend void DereferenceSocket(Socket*);
// Non-zero when health-checking is on.
int _health_check_interval_s;

// The variable indicates whether the reference related
// to the health checking is held by someone. It can be
// synchronized via _versioned_ref atomic variable.
bool _is_hc_related_ref_held;

// +-1 bit-+---31 bit---+
// | flag | counter |
// +-------+------------+
Expand Down
6 changes: 5 additions & 1 deletion src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,14 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
SingleConnection* sc = _map.seek(key);
if (sc) {
if (!sc->socket->Failed() ||
sc->socket->health_check_interval() > 0/*HC enabled*/) {
(sc->socket->health_check_interval() > 0 &&
sc->socket->IsHCRelatedRefHeld())/*HC enabled*/) {
++sc->ref_count;
*id = sc->socket->id();
return 0;
}
// A socket w/o HC is failed (permanently), replace it.
sc->socket->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion.
_map.erase(key); // in principle, we can override the entry in map w/o
// removing and inserting it again. But this would make error branches
Expand All @@ -244,6 +246,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
return -1;
}
ptr->SetHCRelatedRefHeld(); // set held status
SingleConnection new_sc = { 1, ptr.release(), 0 };
_map[key] = new_sc;
*id = tmp_id;
Expand Down Expand Up @@ -302,6 +305,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
butil::StringPiece(namebuf, len), PrintSocketMap, this);
}
s->ReleaseAdditionalReference(); // release extra ref
s->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(s); // Dereference
}
}
Expand Down
4 changes: 3 additions & 1 deletion test/brpc_socket_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
{
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));
s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
Expand Down Expand Up @@ -645,7 +646,8 @@ TEST_F(SocketTest, health_check) {
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));


s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
Expand Down

0 comments on commit 0676bbc

Please sign in to comment.