Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix ssl state exception coredump #1814

Merged
merged 7 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/brpc/details/health_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,12 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
// one is addressing the Socket(except here). Because the Socket
// is not addressable, the reference count will not increase
// again. This solution is not perfect because the `expected_nref'
// is implementation specific. In our case, one reference comes
// from SocketMapInsert(socket_map.cpp), one reference is here.
// Although WaitAndReset() could hang when someone is addressing
// the failed Socket forever (also indicating bug), this is not an
// issue in current code.
// is implementation specific. In our case, one reference comes
// from someone who holds a reference related to health checking,
// e.g. SocketMapInsert(socket_map.cpp) or ChannelBalancer::AddChannel
// (selective_channel.cpp), one reference is here. Although WaitAndReset()
// could hang when someone is addressing the failed Socket forever
// (also indicating bug), this is not an issue in current code.
if (_first_time) { // Only check at first time.
_first_time = false;
if (ptr->WaitAndReset(2/*note*/) != 0) {
Expand Down
4 changes: 3 additions & 1 deletion src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
SocketOptions options;
options.user = sub_chan;
options.health_check_interval_s = FLAGS_channel_check_interval;

if (Socket::Create(options, &sock_id) != 0) {
delete sub_chan;
LOG(ERROR) << "Fail to create fake socket for sub channel";
Expand All @@ -203,6 +203,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
ptr->SetFailed();
return -1;
}
ptr->SetHCRelatedRefHeld(); // set held status
_chan_map[sub_channel]= ptr.release(); // Add reference.
if (handle) {
*handle = sock_id;
Expand All @@ -223,6 +224,7 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha
CHECK_EQ(1UL, _chan_map.erase(sub->chan));
}
{
ptr->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr2(ptr.get()); // Dereference.
}
if (rc == 0) {
Expand Down
9 changes: 9 additions & 0 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ Socket::Socket(Forbidden)
, _parsing_context(NULL)
, _correlation_id(0)
, _health_check_interval_s(-1)
, _is_hc_related_ref_held(false)
, _ninprocess(1)
, _auth_flag_error(0)
, _auth_id(INVALID_BTHREAD_ID)
Expand Down Expand Up @@ -614,6 +615,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->reset_parsing_context(options.initial_parsing_context);
m->_correlation_id = 0;
m->_health_check_interval_s = options.health_check_interval_s;
m->_is_hc_related_ref_held = false;
m->_ninprocess.store(1, butil::memory_order_relaxed);
m->_auth_flag_error.store(0, butil::memory_order_relaxed);
const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);
Expand Down Expand Up @@ -682,6 +684,13 @@ int Socket::WaitAndReset(int32_t expected_nref) {
<< " was abandoned during health checking";
return -1;
} else {
// nobody holds a health-checking-related reference,
// so no need to do health checking.
if (!_is_hc_related_ref_held) {
LOG(WARNING) << "nobody holds a health-checking-related reference";
return -1;
}

break;
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ friend class policy::H2GlobalStreamCreator;
// Initialized by SocketOptions.health_check_interval_s.
int health_check_interval() const { return _health_check_interval_s; }

// When someone holds a health-checking-related reference,
// this function need to be called to make health checking run normally.
void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; }
// When someone releases the health-checking-related reference,
// this function need to be called to cancel health checking.
void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; }
bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; }

// The unique identifier.
SocketId id() const { return _this_id; }

Expand Down Expand Up @@ -747,6 +755,11 @@ friend void DereferenceSocket(Socket*);
// Non-zero when health-checking is on.
int _health_check_interval_s;

// The variable indicates whether the reference related
// to the health checking is held by someone. It can be
// synchronized via _versioned_ref atomic variable.
bool _is_hc_related_ref_held;

// +-1 bit-+---31 bit---+
// | flag | counter |
// +-------+------------+
Expand Down
6 changes: 5 additions & 1 deletion src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,14 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
SingleConnection* sc = _map.seek(key);
if (sc) {
if (!sc->socket->Failed() ||
sc->socket->health_check_interval() > 0/*HC enabled*/) {
(sc->socket->health_check_interval() > 0 &&
sc->socket->IsHCRelatedRefHeld())/*HC enabled*/) {
++sc->ref_count;
*id = sc->socket->id();
return 0;
}
// A socket w/o HC is failed (permanently), replace it.
sc->socket->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion.
_map.erase(key); // in principle, we can override the entry in map w/o
// removing and inserting it again. But this would make error branches
Expand All @@ -244,6 +246,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
return -1;
}
ptr->SetHCRelatedRefHeld(); // set held status
SingleConnection new_sc = { 1, ptr.release(), 0 };
_map[key] = new_sc;
*id = tmp_id;
Expand Down Expand Up @@ -302,6 +305,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
butil::StringPiece(namebuf, len), PrintSocketMap, this);
}
s->ReleaseAdditionalReference(); // release extra ref
s->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(s); // Dereference
}
}
Expand Down
4 changes: 3 additions & 1 deletion test/brpc_socket_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
{
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));
s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
Expand Down Expand Up @@ -645,7 +646,8 @@ TEST_F(SocketTest, health_check) {
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));


s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
Expand Down