From 4d7114a81ec0e6ec56b7c88eb6017495927f6f49 Mon Sep 17 00:00:00 2001 From: Yang Liming Date: Thu, 21 Dec 2023 10:40:43 +0800 Subject: [PATCH] modify bthread attribute with tag --- src/brpc/acceptor.cpp | 2 + src/brpc/controller.cpp | 1 + src/brpc/details/http_message.h | 1 + src/brpc/parallel_channel.cpp | 2 + src/brpc/periodic_task.cpp | 3 +- src/brpc/rdma/rdma_endpoint.cpp | 7 +- src/brpc/server.cpp | 5 +- src/brpc/socket.cpp | 9 +- src/brpc/stream.cpp | 262 ++++++++++++++------------------ 9 files changed, 134 insertions(+), 158 deletions(-) diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp index 68d77082b7..54cc4908cf 100644 --- a/src/brpc/acceptor.cpp +++ b/src/brpc/acceptor.cpp @@ -76,6 +76,8 @@ int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec, return -1; } if (idle_timeout_sec > 0) { + bthread_attr_t tmp = BTHREAD_ATTR_NORMAL; + tmp.tag = _bthread_tag; if (bthread_start_background(&_close_idle_tid, NULL, CloseIdleConnections, this) != 0) { LOG(FATAL) << "Fail to start bthread"; diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index f49a27a92f..fc7625c7ac 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -718,6 +718,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, bthread_t bt; bthread_attr_t attr = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); + attr.tag = bthread_self_tag(); _tmp_completion_info = info; if (bthread_start_background(&bt, &attr, RunEndRPC, this) != 0) { LOG(FATAL) << "Fail to start bthread"; diff --git a/src/brpc/details/http_message.h b/src/brpc/details/http_message.h index dc999cfa91..c55cc8bb03 100644 --- a/src/brpc/details/http_message.h +++ b/src/brpc/details/http_message.h @@ -20,6 +20,7 @@ #define BRPC_HTTP_MESSAGE_H #include // std::string +#include // std::unique_ptr #include "butil/macros.h" #include "butil/iobuf.h" // butil::IOBuf #include "butil/scoped_lock.h" // butil::unique_lock diff --git a/src/brpc/parallel_channel.cpp b/src/brpc/parallel_channel.cpp index ca71bedcc1..fe6f117fed 100644 --- a/src/brpc/parallel_channel.cpp +++ b/src/brpc/parallel_channel.cpp @@ -288,6 +288,7 @@ class ParallelChannelDone : public google::protobuf::Closure { bthread_t bh; bthread_attr_t attr = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); + attr.tag = bthread_self_tag(); if (bthread_start_background(&bh, &attr, RunOnComplete, this) != 0) { LOG(FATAL) << "Fail to start bthread"; OnComplete(); @@ -708,6 +709,7 @@ void ParallelChannel::CallMethod( bthread_t bh; bthread_attr_t attr = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); + attr.tag = bthread_self_tag(); // Hack: save done in cntl->_done to remove a malloc of args. cntl->_done = done; if (bthread_start_background(&bh, &attr, RunDoneAndDestroy, cntl) == 0) { diff --git a/src/brpc/periodic_task.cpp b/src/brpc/periodic_task.cpp index 27ea3ec310..af89a6052b 100644 --- a/src/brpc/periodic_task.cpp +++ b/src/brpc/periodic_task.cpp @@ -38,8 +38,7 @@ static void* PeriodicTaskThread(void* arg) { static void RunPeriodicTaskThread(void* arg) { bthread_t th = 0; - int rc = bthread_start_background( - &th, &BTHREAD_ATTR_NORMAL, PeriodicTaskThread, arg); + int rc = bthread_start_background(&th, NULL, PeriodicTaskThread, arg); if (rc != 0) { LOG(ERROR) << "Fail to start PeriodicTaskThread"; static_cast(arg)->OnDestroyingTask(); diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 40b52a806f..9e336fc049 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -246,8 +246,8 @@ void RdmaConnect::StartConnect(const Socket* socket, _done = done; _data = data; bthread_t tid; - if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL, - RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) { + if (bthread_start_background(&tid, NULL, RdmaEndpoint::ProcessHandshakeAtClient, + socket->_rdma_ep) < 0) { LOG(FATAL) << "Fail to start handshake bthread"; } else { s.release(); @@ -305,8 +305,7 @@ void RdmaEndpoint::OnNewDataFromTcp(Socket* m) { ep->_state = S_HELLO_WAIT; SocketUniquePtr s; m->ReAddress(&s); - if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL, - ProcessHandshakeAtServer, ep) < 0) { + if (bthread_start_background(&tid, NULL, ProcessHandshakeAtServer, ep) < 0) { ep->_state = UNINIT; LOG(FATAL) << "Fail to start handshake bthread"; } else { diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index ac8f29c9f3..6145e7d444 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -906,6 +906,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint, init_args[i].done = false; init_args[i].stop = false; bthread_attr_t tmp = BTHREAD_ATTR_NORMAL; + tmp.tag = _options.bthread_tag; tmp.keytable_pool = _keytable_pool; if (bthread_start_background( &init_args[i].th, &tmp, BthreadInitEntry, &init_args[i]) != 0) { @@ -1144,7 +1145,9 @@ int Server::StartInternal(const butil::EndPoint& endpoint, // Launch _derivative_thread. CHECK_EQ(INVALID_BTHREAD, _derivative_thread); - if (bthread_start_background(&_derivative_thread, NULL, + bthread_attr_t tmp = BTHREAD_ATTR_NORMAL; + tmp.tag = _options.bthread_tag; + if (bthread_start_background(&_derivative_thread, &tmp, UpdateDerivedVars, this) != 0) { LOG(ERROR) << "Fail to create _derivative_thread"; return -1; diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index acd1b54d8c..bbe9710092 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -1464,8 +1464,7 @@ void Socket::AfterAppConnected(int err, void* data) { // requests are not setup yet. check the comment on Setup() in Write() req->Setup(s); bthread_t th; - if (bthread_start_background( - &th, &BTHREAD_ATTR_NORMAL, KeepWrite, req) != 0) { + if (bthread_start_background(&th, NULL, KeepWrite, req) != 0) { PLOG(WARNING) << "Fail to start KeepWrite"; KeepWrite(req); } @@ -1505,8 +1504,7 @@ int Socket::KeepWriteIfConnected(int fd, int err, void* data) { bthread_t th; std::unique_ptr thrd_func(brpc::NewCallback( Socket::CheckConnectedAndKeepWrite, fd, err, data)); - if ((err = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, - RunClosure, thrd_func.get())) == 0) { + if ((err = bthread_start_background(&th, NULL, RunClosure, thrd_func.get())) == 0) { thrd_func.release(); return 0; } else { @@ -1736,8 +1734,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) { KEEPWRITE_IN_BACKGROUND: ReAddress(&ptr_for_keep_write); req->socket = ptr_for_keep_write.release(); - if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, - KeepWrite, req) != 0) { + if (bthread_start_background(&th, NULL, KeepWrite, req) != 0) { LOG(FATAL) << "Fail to start KeepWrite"; KeepWrite(req); } diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index a9126537fd..40e7d1e40a 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. - #include "brpc/stream.h" #include @@ -31,28 +30,26 @@ #include "brpc/policy/baidu_rpc_protocol.h" #include "brpc/stream_impl.h" - namespace brpc { DECLARE_bool(usercode_in_pthread); DECLARE_int64(socket_max_streams_unconsumed_bytes); -const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L; - -Stream::Stream() - : _host_socket(NULL) - , _fake_socket_weak_ref(NULL) - , _connected(false) - , _closed(false) - , _produced(0) - , _remote_consumed(0) - , _cur_buf_size(0) - , _local_consumed(0) - , _parse_rpc_response(false) - , _pending_buf(NULL) - , _start_idle_timer_us(0) - , _idle_timer(0) -{ +const static butil::IOBuf* TIMEOUT_TASK = (butil::IOBuf*)-1L; + +Stream::Stream() + : _host_socket(NULL), + _fake_socket_weak_ref(NULL), + _connected(false), + _closed(false), + _produced(0), + _remote_consumed(0), + _cur_buf_size(0), + _local_consumed(0), + _parse_rpc_response(false), + _pending_buf(NULL), + _start_idle_timer_us(0), + _idle_timer(0) { _connect_meta.on_connect = NULL; CHECK_EQ(0, bthread_mutex_init(&_connect_mutex, NULL)); CHECK_EQ(0, bthread_mutex_init(&_congestion_control_mutex, NULL)); @@ -65,9 +62,8 @@ Stream::~Stream() { bthread_id_list_destroy(&_writable_wait_list); } -int Stream::Create(const StreamOptions &options, - const StreamSettings *remote_settings, - StreamId *id) { +int Stream::Create(const StreamOptions& options, const StreamSettings* remote_settings, + StreamId* id) { Stream* s = new Stream(); s->_host_socket = NULL; s->_fake_socket_weak_ref = NULL; @@ -78,7 +74,8 @@ int Stream::Create(const StreamOptions &options, if (options.max_buf_size > 0 && options.min_buf_size > options.max_buf_size) { // set 0 if min_buf_size is invalid. s->_options.min_buf_size = 0; - LOG(WARNING) << "options.min_buf_size is larger than options.max_buf_size, it will be set to 0."; + LOG(WARNING) + << "options.min_buf_size is larger than options.max_buf_size, it will be set to 0."; } if (FLAGS_socket_max_streams_unconsumed_bytes > 0 && s->_options.min_buf_size > 0) { s->_cur_buf_size = s->_options.min_buf_size; @@ -90,13 +87,12 @@ int Stream::Create(const StreamOptions &options, } else { s->_parse_rpc_response = true; } - if (bthread_id_list_init(&s->_writable_wait_list, 8, 8/*FIXME*/)) { + if (bthread_id_list_init(&s->_writable_wait_list, 8, 8 /*FIXME*/)) { delete s; return -1; } bthread::ExecutionQueueOptions q_opt; - q_opt.bthread_attr - = FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; + q_opt.bthread_attr = FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; if (bthread::execution_queue_start(&s->_consumer_queue, &q_opt, Consume, s) != 0) { LOG(FATAL) << "Fail to create ExecutionQueue"; delete s; @@ -117,38 +113,33 @@ int Stream::Create(const StreamOptions &options, return 0; } -void Stream::BeforeRecycle(Socket *) { +void Stream::BeforeRecycle(Socket*) { // No one holds reference now, so we don't need lock here bthread_id_list_reset(&_writable_wait_list, ECONNRESET); if (_connected) { // Send CLOSE frame RPC_VLOG << "Send close frame"; CHECK(_host_socket != NULL); - policy::SendStreamClose(_host_socket, - _remote_settings.stream_id(), id()); + policy::SendStreamClose(_host_socket, _remote_settings.stream_id(), id()); } if (_host_socket) { _host_socket->RemoveStream(id()); } - + // The instance is to be deleted in the consumer thread bthread::execution_queue_stop(_consumer_queue); } -ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/, - butil::IOBuf **data_list, - size_t size) { +ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/, butil::IOBuf** data_list, size_t size) { if (_host_socket == NULL) { CHECK(false) << "Not connected"; errno = EBADF; return -1; } if (!_remote_settings.writable()) { - LOG(WARNING) << "The remote side of Stream=" << id() - << "->" << _remote_settings.stream_id() - << "@" << _host_socket->remote_side() - << " doesn't have a handler"; + LOG(WARNING) << "The remote side of Stream=" << id() << "->" << _remote_settings.stream_id() + << "@" << _host_socket->remote_side() << " doesn't have a handler"; errno = EBADF; return -1; } @@ -179,7 +170,7 @@ ssize_t Stream::CutMessageIntoSSLChannel(SSL*, butil::IOBuf**, size_t) { return -1; } -void* Stream::RunOnConnect(void *arg) { +void* Stream::RunOnConnect(void* arg) { ConnectMeta* meta = (ConnectMeta*)arg; if (meta->ec == 0) { meta->on_connect(Socket::STREAM_FAKE_FD, 0, meta->arg); @@ -190,8 +181,7 @@ void* Stream::RunOnConnect(void *arg) { return NULL; } -int Stream::Connect(Socket* ptr, const timespec*, - int (*on_connect)(int, int, void *), void *data) { +int Stream::Connect(Socket* ptr, const timespec*, int (*on_connect)(int, int, void*), void* data) { CHECK_EQ(ptr->id(), _id); bthread_mutex_lock(&_connect_mutex); if (_connect_meta.on_connect != NULL) { @@ -208,7 +198,7 @@ int Stream::Connect(Socket* ptr, const timespec*, meta->ec = _connect_meta.ec; bthread_mutex_unlock(&_connect_mutex); bthread_t tid; - if (bthread_start_urgent(&tid, &BTHREAD_ATTR_NORMAL, RunOnConnect, meta) != 0) { + if (bthread_start_urgent(&tid, NULL, RunOnConnect, meta) != 0) { LOG(FATAL) << "Fail to start bthread, " << berror(); RunOnConnect(meta); } @@ -218,9 +208,7 @@ int Stream::Connect(Socket* ptr, const timespec*, return 0; } -void Stream::SetConnected() { - return SetConnected(NULL); -} +void Stream::SetConnected() { return SetConnected(NULL); } void Stream::SetConnected(const StreamSettings* remote_settings) { bthread_mutex_lock(&_connect_mutex); @@ -241,8 +229,8 @@ void Stream::SetConnected(const StreamSettings* remote_settings) { CHECK(_remote_settings.IsInitialized()); } CHECK(_host_socket != NULL); - RPC_VLOG << "stream=" << id() << " is connected to stream_id=" - << _remote_settings.stream_id() << " at host_socket=" << *_host_socket; + RPC_VLOG << "stream=" << id() << " is connected to stream_id=" << _remote_settings.stream_id() + << " at host_socket=" << *_host_socket; _connected = true; _connect_meta.ec = 0; TriggerOnConnectIfNeed(); @@ -262,7 +250,7 @@ void Stream::TriggerOnConnectIfNeed() { meta->ec = _connect_meta.ec; bthread_mutex_unlock(&_connect_mutex); bthread_t tid; - if (bthread_start_urgent(&tid, &BTHREAD_ATTR_NORMAL, RunOnConnect, meta) != 0) { + if (bthread_start_urgent(&tid, NULL, RunOnConnect, meta) != 0) { LOG(FATAL) << "Fail to start bthread, " << berror(); RunOnConnect(meta); } @@ -271,15 +259,14 @@ void Stream::TriggerOnConnectIfNeed() { bthread_mutex_unlock(&_connect_mutex); } -int Stream::AppendIfNotFull(const butil::IOBuf &data, - const StreamWriteOptions* options) { +int Stream::AppendIfNotFull(const butil::IOBuf& data, const StreamWriteOptions* options) { if (_cur_buf_size > 0) { std::unique_lock lck(_congestion_control_mutex); if (_produced >= _remote_consumed + _cur_buf_size) { const size_t saved_produced = _produced; const size_t saved_remote_consumed = _remote_consumed; lck.unlock(); - RPC_VLOG << "Stream=" << _id << " is full" + RPC_VLOG << "Stream=" << _id << " is full" << "_produced=" << saved_produced << " _remote_consumed=" << saved_remote_consumed << " gap=" << saved_produced - saved_remote_consumed @@ -320,14 +307,17 @@ void Stream::SetRemoteConsumed(size_t new_remote_consumed) { if (FLAGS_socket_max_streams_unconsumed_bytes > 0) { _host_socket->_total_streams_unconsumed_size -= new_remote_consumed - _remote_consumed; - if (_host_socket->_total_streams_unconsumed_size > FLAGS_socket_max_streams_unconsumed_bytes) { + if (_host_socket->_total_streams_unconsumed_size > + FLAGS_socket_max_streams_unconsumed_bytes) { if (_options.min_buf_size > 0) { _cur_buf_size = _options.min_buf_size; } else { _cur_buf_size /= 2; } - LOG(INFO) << "stream consumers on socket " << _host_socket->id() << " is crowded, " << "cut stream " << id() << " buffer to " << _cur_buf_size; - } else if (_produced >= new_remote_consumed + _cur_buf_size && (_options.max_buf_size <= 0 || _cur_buf_size < (size_t)_options.max_buf_size)) { + LOG(INFO) << "stream consumers on socket " << _host_socket->id() << " is crowded, " + << "cut stream " << id() << " buffer to " << _cur_buf_size; + } else if (_produced >= new_remote_consumed + _cur_buf_size && + (_options.max_buf_size <= 0 || _cur_buf_size < (size_t)_options.max_buf_size)) { if (_options.max_buf_size > 0 && _cur_buf_size * 2 > (size_t)_options.max_buf_size) { _cur_buf_size = _options.max_buf_size; } else { @@ -349,23 +339,22 @@ void Stream::SetRemoteConsumed(size_t new_remote_consumed) { } void* Stream::RunOnWritable(void* arg) { - WritableMeta *wm = (WritableMeta*)arg; + WritableMeta* wm = (WritableMeta*)arg; wm->on_writable(wm->id, wm->arg, wm->error_code); delete wm; return NULL; } -int Stream::TriggerOnWritable(bthread_id_t id, void *data, int error_code) { - WritableMeta *wm = (WritableMeta*)data; - +int Stream::TriggerOnWritable(bthread_id_t id, void* data, int error_code) { + WritableMeta* wm = (WritableMeta*)data; + if (wm->has_timer) { bthread_timer_del(wm->timer); } wm->error_code = error_code; if (wm->new_thread) { - const bthread_attr_t* attr = - FLAGS_usercode_in_pthread ? &BTHREAD_ATTR_PTHREAD - : &BTHREAD_ATTR_NORMAL; + const bthread_attr_t* attr = + FLAGS_usercode_in_pthread ? &BTHREAD_ATTR_PTHREAD : &BTHREAD_ATTR_NORMAL; bthread_t tid; if (bthread_start_background(&tid, attr, RunOnWritable, wm) != 0) { LOG(FATAL) << "Fail to start bthread" << berror(); @@ -377,14 +366,14 @@ int Stream::TriggerOnWritable(bthread_id_t id, void *data, int error_code) { return bthread_id_unlock_and_destroy(id); } -void OnTimedOut(void *arg) { - bthread_id_t id = { reinterpret_cast(arg) }; +void OnTimedOut(void* arg) { + bthread_id_t id = {reinterpret_cast(arg)}; bthread_id_error(id, ETIMEDOUT); } -void Stream::Wait(void (*on_writable)(StreamId, void*, int), void* arg, - const timespec* due_time, bool new_thread, bthread_id_t *join_id) { - WritableMeta *wm = new WritableMeta; +void Stream::Wait(void (*on_writable)(StreamId, void*, int), void* arg, const timespec* due_time, + bool new_thread, bthread_id_t* join_id) { + WritableMeta* wm = new WritableMeta; wm->on_writable = on_writable; wm->id = id(); wm->arg = arg; @@ -404,8 +393,7 @@ void Stream::Wait(void (*on_writable)(StreamId, void*, int), void* arg, CHECK_EQ(0, bthread_id_lock(wait_id, NULL)); if (due_time != NULL) { wm->has_timer = true; - const int rc = bthread_timer_add(&wm->timer, *due_time, - OnTimedOut, + const int rc = bthread_timer_add(&wm->timer, *due_time, OnTimedOut, reinterpret_cast(wait_id.value)); if (rc != 0) { LOG(ERROR) << "Fail to add timer, " << berror(rc); @@ -413,8 +401,7 @@ void Stream::Wait(void (*on_writable)(StreamId, void*, int), void* arg, } } bthread_mutex_lock(&_congestion_control_mutex); - if (_cur_buf_size <= 0 - || _produced < _remote_consumed + _cur_buf_size) { + if (_cur_buf_size <= 0 || _produced < _remote_consumed + _cur_buf_size) { bthread_mutex_unlock(&_congestion_control_mutex); CHECK_EQ(0, TriggerOnWritable(wait_id, wm, 0)); return; @@ -425,14 +412,11 @@ void Stream::Wait(void (*on_writable)(StreamId, void*, int), void* arg, CHECK_EQ(0, bthread_id_unlock(wait_id)); } -void Stream::Wait(void (*on_writable)(StreamId, void *, int), void *arg, - const timespec* due_time) { +void Stream::Wait(void (*on_writable)(StreamId, void*, int), void* arg, const timespec* due_time) { return Wait(on_writable, arg, due_time, true, NULL); } -void OnWritable(StreamId, void *arg, int error_code) { - *(int*)arg = error_code; -} +void OnWritable(StreamId, void* arg, int error_code) { *(int*)arg = error_code; } int Stream::Wait(const timespec* due_time) { int rc; @@ -444,65 +428,59 @@ int Stream::Wait(const timespec* due_time) { return rc; } -int Stream::OnReceived(const StreamFrameMeta& fm, butil::IOBuf *buf, Socket* sock) { +int Stream::OnReceived(const StreamFrameMeta& fm, butil::IOBuf* buf, Socket* sock) { if (_host_socket == NULL) { if (SetHostSocket(sock) != 0) { return -1; } } switch (fm.frame_type()) { - case FRAME_TYPE_FEEDBACK: - SetRemoteConsumed(fm.feedback().consumed_size()); - CHECK(buf->empty()); - break; - case FRAME_TYPE_DATA: - if (_pending_buf != NULL) { - _pending_buf->append(*buf); - buf->clear(); - } else { - _pending_buf = new butil::IOBuf; - _pending_buf->swap(*buf); - } - if (!fm.has_continuation()) { - butil::IOBuf *tmp = _pending_buf; - _pending_buf = NULL; - if (bthread::execution_queue_execute(_consumer_queue, tmp) != 0) { - CHECK(false) << "Fail to push into channel"; - delete tmp; - Close(); + case FRAME_TYPE_FEEDBACK: + SetRemoteConsumed(fm.feedback().consumed_size()); + CHECK(buf->empty()); + break; + case FRAME_TYPE_DATA: + if (_pending_buf != NULL) { + _pending_buf->append(*buf); + buf->clear(); + } else { + _pending_buf = new butil::IOBuf; + _pending_buf->swap(*buf); } - } - break; - case FRAME_TYPE_RST: - RPC_VLOG << "stream=" << id() << " received rst frame"; - Close(); - break; - case FRAME_TYPE_CLOSE: - RPC_VLOG << "stream=" << id() << " received close frame"; - // TODO:: See the comments in Consume - Close(); - break; - case FRAME_TYPE_UNKNOWN: - RPC_VLOG << "Received unknown frame"; - return -1; + if (!fm.has_continuation()) { + butil::IOBuf* tmp = _pending_buf; + _pending_buf = NULL; + if (bthread::execution_queue_execute(_consumer_queue, tmp) != 0) { + CHECK(false) << "Fail to push into channel"; + delete tmp; + Close(); + } + } + break; + case FRAME_TYPE_RST: + RPC_VLOG << "stream=" << id() << " received rst frame"; + Close(); + break; + case FRAME_TYPE_CLOSE: + RPC_VLOG << "stream=" << id() << " received close frame"; + // TODO:: See the comments in Consume + Close(); + break; + case FRAME_TYPE_UNKNOWN: + RPC_VLOG << "Received unknown frame"; + return -1; } return 0; } class MessageBatcher { public: - MessageBatcher(butil::IOBuf* storage[], size_t cap, Stream* s) - : _storage(storage) - , _cap(cap) - , _size(0) - , _total_length(0) - , _s(s) - {} + MessageBatcher(butil::IOBuf* storage[], size_t cap, Stream* s) + : _storage(storage), _cap(cap), _size(0), _total_length(0), _s(s) {} ~MessageBatcher() { flush(); } void flush() { if (_size > 0 && _s->_options.handler != NULL) { - _s->_options.handler->on_received_messages( - _s->id(), _storage, _size); + _s->_options.handler->on_received_messages(_s->id(), _storage, _size); } for (size_t i = 0; i < _size; ++i) { delete _storage[i]; @@ -515,9 +493,9 @@ class MessageBatcher { } _storage[_size++] = buf; _total_length += buf->length(); - } size_t total_length() { return _total_length; } + private: butil::IOBuf** _storage; size_t _cap; @@ -526,7 +504,7 @@ class MessageBatcher { Stream* _s; }; -int Stream::Consume(void *meta, bthread::TaskIterator& iter) { +int Stream::Consume(void* meta, bthread::TaskIterator& iter) { Stream* s = (Stream*)meta; s->StopIdleTimer(); if (iter.is_queue_stopped()) { @@ -545,7 +523,7 @@ int Stream::Consume(void *meta, bthread::TaskIterator& iter) { MessageBatcher mb(buf_list, s->_options.messages_in_batch, s); bool has_timeout_task = false; for (; iter; ++iter) { - butil::IOBuf* t= *iter; + butil::IOBuf* t = *iter; if (t == TIMEOUT_TASK) { has_timeout_task = true; } else { @@ -583,7 +561,7 @@ void Stream::SendFeedback() { WriteToHostSocket(&out); } -int Stream::SetHostSocket(Socket *host_socket) { +int Stream::SetHostSocket(Socket* host_socket) { if (_host_socket != NULL) { CHECK(false) << "SetHostSocket has already been called"; return -1; @@ -598,14 +576,14 @@ int Stream::SetHostSocket(Socket *host_socket) { return 0; } -void Stream::FillSettings(StreamSettings *settings) { +void Stream::FillSettings(StreamSettings* settings) { settings->set_stream_id(id()); settings->set_need_feedback(_cur_buf_size > 0); settings->set_writable(_options.handler != NULL); } -void OnIdleTimeout(void *arg) { - bthread::ExecutionQueueId q = { (uint64_t)arg }; +void OnIdleTimeout(void* arg) { + bthread::ExecutionQueueId q = {(uint64_t)arg}; bthread::execution_queue_execute(q, (butil::IOBuf*)TIMEOUT_TASK); } @@ -614,10 +592,10 @@ void Stream::StartIdleTimer() { return; } _start_idle_timer_us = butil::gettimeofday_us(); - timespec due_time = butil::microseconds_to_timespec( - _start_idle_timer_us + _options.idle_timeout_ms * 1000); - const int rc = bthread_timer_add(&_idle_timer, due_time, OnIdleTimeout, - (void*)(_consumer_queue.value)); + timespec due_time = + butil::microseconds_to_timespec(_start_idle_timer_us + _options.idle_timeout_ms * 1000); + const int rc = + bthread_timer_add(&_idle_timer, due_time, OnIdleTimeout, (void*)(_consumer_queue.value)); LOG_IF(WARNING, rc != 0) << "Fail to add timer"; } @@ -676,13 +654,13 @@ void Stream::HandleRpcResponse(butil::IOBuf* response_buffer) { } _host_socket->PostponeEOF(); _host_socket->ReAddress(&msg->_socket); - msg->_received_us = butil::gettimeofday_us(); + msg->_received_us = butil::gettimeofday_us(); msg->_base_real_us = butil::gettimeofday_us(); - msg->_arg = NULL; // ProcessRpcResponse() don't need arg + msg->_arg = NULL; // ProcessRpcResponse() don't need arg policy::ProcessRpcResponse(msg); } -int StreamWrite(StreamId stream_id, const butil::IOBuf &message, +int StreamWrite(StreamId stream_id, const butil::IOBuf& message, const StreamWriteOptions* options) { SocketUniquePtr ptr; if (Socket::Address(stream_id, &ptr) != 0) { @@ -696,19 +674,18 @@ int StreamWrite(StreamId stream_id, const butil::IOBuf &message, return (rc == 1) ? EAGAIN : errno; } -void StreamWait(StreamId stream_id, const timespec *due_time, - void (*on_writable)(StreamId, void*, int), void *arg) { +void StreamWait(StreamId stream_id, const timespec* due_time, + void (*on_writable)(StreamId, void*, int), void* arg) { SocketUniquePtr ptr; if (Socket::Address(stream_id, &ptr) != 0) { Stream::WritableMeta* wm = new Stream::WritableMeta; wm->id = stream_id; - wm->arg= arg; + wm->arg = arg; wm->has_timer = false; wm->on_writable = on_writable; wm->error_code = EINVAL; - const bthread_attr_t* attr = - FLAGS_usercode_in_pthread ? &BTHREAD_ATTR_PTHREAD - : &BTHREAD_ATTR_NORMAL; + const bthread_attr_t* attr = + FLAGS_usercode_in_pthread ? &BTHREAD_ATTR_PTHREAD : &BTHREAD_ATTR_NORMAL; bthread_t tid; if (bthread_start_background(&tid, attr, Stream::RunOnWritable, wm) != 0) { PLOG(FATAL) << "Fail to start bthread"; @@ -729,12 +706,9 @@ int StreamWait(StreamId stream_id, const timespec* due_time) { return s->Wait(due_time); } -int StreamClose(StreamId stream_id) { - return Stream::SetFailed(stream_id); -} +int StreamClose(StreamId stream_id) { return Stream::SetFailed(stream_id); } -int StreamCreate(StreamId *request_stream, Controller &cntl, - const StreamOptions* options) { +int StreamCreate(StreamId* request_stream, Controller& cntl, const StreamOptions* options) { if (cntl._request_stream != INVALID_STREAM_ID) { LOG(ERROR) << "Can't create request stream more than once"; return -1; @@ -757,9 +731,7 @@ int StreamCreate(StreamId *request_stream, Controller &cntl, return 0; } -int StreamAccept(StreamId* response_stream, Controller &cntl, - const StreamOptions* options) { - +int StreamAccept(StreamId* response_stream, Controller& cntl, const StreamOptions* options) { if (cntl._response_stream != INVALID_STREAM_ID) { LOG(ERROR) << "Can't create reponse stream more than once"; return -1; @@ -786,4 +758,4 @@ int StreamAccept(StreamId* response_stream, Controller &cntl, return 0; } -} // namespace brpc +} // namespace brpc