Skip to content

Commit

Permalink
modify bthread attribute with tag
Browse files Browse the repository at this point in the history
  • Loading branch information
Yang Liming committed Dec 21, 2023
1 parent 2098dd3 commit 4d7114a
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 158 deletions.
2 changes: 2 additions & 0 deletions src/brpc/acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,
return -1;
}
if (idle_timeout_sec > 0) {
bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
tmp.tag = _bthread_tag;
if (bthread_start_background(&_close_idle_tid, NULL,
CloseIdleConnections, this) != 0) {
LOG(FATAL) << "Fail to start bthread";
Expand Down
1 change: 1 addition & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
bthread_t bt;
bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
attr.tag = bthread_self_tag();
_tmp_completion_info = info;
if (bthread_start_background(&bt, &attr, RunEndRPC, this) != 0) {
LOG(FATAL) << "Fail to start bthread";
Expand Down
1 change: 1 addition & 0 deletions src/brpc/details/http_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define BRPC_HTTP_MESSAGE_H

#include <string> // std::string
#include <memory> // std::unique_ptr
#include "butil/macros.h"
#include "butil/iobuf.h" // butil::IOBuf
#include "butil/scoped_lock.h" // butil::unique_lock
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/parallel_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ class ParallelChannelDone : public google::protobuf::Closure {
bthread_t bh;
bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
attr.tag = bthread_self_tag();
if (bthread_start_background(&bh, &attr, RunOnComplete, this) != 0) {
LOG(FATAL) << "Fail to start bthread";
OnComplete();
Expand Down Expand Up @@ -708,6 +709,7 @@ void ParallelChannel::CallMethod(
bthread_t bh;
bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
attr.tag = bthread_self_tag();
// Hack: save done in cntl->_done to remove a malloc of args.
cntl->_done = done;
if (bthread_start_background(&bh, &attr, RunDoneAndDestroy, cntl) == 0) {
Expand Down
3 changes: 1 addition & 2 deletions src/brpc/periodic_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ static void* PeriodicTaskThread(void* arg) {

static void RunPeriodicTaskThread(void* arg) {
bthread_t th = 0;
int rc = bthread_start_background(
&th, &BTHREAD_ATTR_NORMAL, PeriodicTaskThread, arg);
int rc = bthread_start_background(&th, NULL, PeriodicTaskThread, arg);
if (rc != 0) {
LOG(ERROR) << "Fail to start PeriodicTaskThread";
static_cast<PeriodicTask*>(arg)->OnDestroyingTask();
Expand Down
7 changes: 3 additions & 4 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ void RdmaConnect::StartConnect(const Socket* socket,
_done = done;
_data = data;
bthread_t tid;
if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) {
if (bthread_start_background(&tid, NULL, RdmaEndpoint::ProcessHandshakeAtClient,
socket->_rdma_ep) < 0) {
LOG(FATAL) << "Fail to start handshake bthread";
} else {
s.release();
Expand Down Expand Up @@ -305,8 +305,7 @@ void RdmaEndpoint::OnNewDataFromTcp(Socket* m) {
ep->_state = S_HELLO_WAIT;
SocketUniquePtr s;
m->ReAddress(&s);
if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
ProcessHandshakeAtServer, ep) < 0) {
if (bthread_start_background(&tid, NULL, ProcessHandshakeAtServer, ep) < 0) {
ep->_state = UNINIT;
LOG(FATAL) << "Fail to start handshake bthread";
} else {
Expand Down
5 changes: 4 additions & 1 deletion src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
init_args[i].done = false;
init_args[i].stop = false;
bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
tmp.tag = _options.bthread_tag;
tmp.keytable_pool = _keytable_pool;
if (bthread_start_background(
&init_args[i].th, &tmp, BthreadInitEntry, &init_args[i]) != 0) {
Expand Down Expand Up @@ -1144,7 +1145,9 @@ int Server::StartInternal(const butil::EndPoint& endpoint,

// Launch _derivative_thread.
CHECK_EQ(INVALID_BTHREAD, _derivative_thread);
if (bthread_start_background(&_derivative_thread, NULL,
bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
tmp.tag = _options.bthread_tag;
if (bthread_start_background(&_derivative_thread, &tmp,
UpdateDerivedVars, this) != 0) {
LOG(ERROR) << "Fail to create _derivative_thread";
return -1;
Expand Down
9 changes: 3 additions & 6 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1464,8 +1464,7 @@ void Socket::AfterAppConnected(int err, void* data) {
// requests are not setup yet. check the comment on Setup() in Write()
req->Setup(s);
bthread_t th;
if (bthread_start_background(
&th, &BTHREAD_ATTR_NORMAL, KeepWrite, req) != 0) {
if (bthread_start_background(&th, NULL, KeepWrite, req) != 0) {
PLOG(WARNING) << "Fail to start KeepWrite";
KeepWrite(req);
}
Expand Down Expand Up @@ -1505,8 +1504,7 @@ int Socket::KeepWriteIfConnected(int fd, int err, void* data) {
bthread_t th;
std::unique_ptr<google::protobuf::Closure> thrd_func(brpc::NewCallback(
Socket::CheckConnectedAndKeepWrite, fd, err, data));
if ((err = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
RunClosure, thrd_func.get())) == 0) {
if ((err = bthread_start_background(&th, NULL, RunClosure, thrd_func.get())) == 0) {
thrd_func.release();
return 0;
} else {
Expand Down Expand Up @@ -1736,8 +1734,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
KEEPWRITE_IN_BACKGROUND:
ReAddress(&ptr_for_keep_write);
req->socket = ptr_for_keep_write.release();
if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
KeepWrite, req) != 0) {
if (bthread_start_background(&th, NULL, KeepWrite, req) != 0) {
LOG(FATAL) << "Fail to start KeepWrite";
KeepWrite(req);
}
Expand Down
Loading

0 comments on commit 4d7114a

Please sign in to comment.