diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 338ad66c73..eb8ea8a9d9 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1194,6 +1194,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { wopt.pipelined_count = _pipelined_count; wopt.auth_flags = _auth_flags; wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED); + wopt.write_in_background = write_to_socket_in_background(); int rc; size_t packet_size = 0; if (user_packet_guard) { diff --git a/src/brpc/controller.h b/src/brpc/controller.h index f27dd54f86..144aa118dc 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -144,6 +144,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19); static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20); static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21); + static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22); public: struct Inheritable { @@ -353,6 +354,17 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); bool is_done_allowed_to_run_in_place() const { return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); } + // Create a background KEEPWRITE bthread to write to socket when issuing + // RPCs, instead of trying to write to socket once in calling thread (see + // `Socket::StartWrite` in socket.cpp). + // The socket write could take some time (several microseconds maybe), if + // you cares about it and don't want the calling thread to be blocked, you + // can set this flag. + // Should provides better batch effect in situations like when you are + // continually issuing lots of async RPC calls in only one thread. + void set_write_to_socket_in_background(bool f) { set_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND, f); } + bool write_to_socket_in_background() const { return has_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND); } + // ------------------------------------------------------------------------ // Server-side methods. // These calls shall be made from the server side only. Their results are diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index c49ca08358..a634dd376c 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -1687,7 +1687,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) { // in some protocols(namely RTMP). req->Setup(this); - if (ssl_state() != SSL_OFF) { + if (opt.write_in_background || ssl_state() != SSL_OFF) { // Writing into SSL may block the current bthread, always write // in the background. goto KEEPWRITE_IN_BACKGROUND; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index eff9474ce5..74f7360ea2 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -289,10 +289,20 @@ friend class policy::H2GlobalStreamCreator; // Default: false bool ignore_eovercrowded; + // The calling thread directly creates KeepWrite thread to write into + // this socket, skipping writing once. + // In situations like when you are continually issuing lots of + // StreamWrite or async RPC calls in only one thread, directly creating + // KeepWrite thread at first provides batch write effect and better + // performance. Otherwise, each write only writes one `msg` into socket + // and no KeepWrite thread can be created, which brings poor + // performance. + bool write_in_background; + WriteOptions() : id_wait(INVALID_BTHREAD_ID), abstime(NULL) , pipelined_count(0), auth_flags(0) - , ignore_eovercrowded(false) {} + , ignore_eovercrowded(false), write_in_background(false) {} }; int Write(butil::IOBuf *msg, const WriteOptions* options = NULL); diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index d8466d2a81..8b346bd628 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -271,7 +271,8 @@ void Stream::TriggerOnConnectIfNeed() { bthread_mutex_unlock(&_connect_mutex); } -int Stream::AppendIfNotFull(const butil::IOBuf &data) { +int Stream::AppendIfNotFull(const butil::IOBuf &data, + const StreamWriteOptions* options) { if (_cur_buf_size > 0) { std::unique_lock lck(_congestion_control_mutex); if (_produced >= _remote_consumed + _cur_buf_size) { @@ -290,7 +291,9 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) { size_t data_length = data.length(); butil::IOBuf copied_data(data); - const int rc = _fake_socket_weak_ref->Write(&copied_data); + Socket::WriteOptions wopt; + wopt.write_in_background = options != NULL && options->write_in_background; + const int rc = _fake_socket_weak_ref->Write(&copied_data, &wopt); if (rc != 0) { // Stream may be closed by peer before LOG(WARNING) << "Fail to write to _fake_socket, " << berror(); @@ -679,7 +682,8 @@ void Stream::HandleRpcResponse(butil::IOBuf* response_buffer) { policy::ProcessRpcResponse(msg); } -int StreamWrite(StreamId stream_id, const butil::IOBuf &message) { +int StreamWrite(StreamId stream_id, const butil::IOBuf &message, + const StreamWriteOptions* options) { SocketUniquePtr ptr; if (Socket::Address(stream_id, &ptr) != 0) { return EINVAL; diff --git a/src/brpc/stream.h b/src/brpc/stream.h index fbf2d51d0e..410a5a097c 100644 --- a/src/brpc/stream.h +++ b/src/brpc/stream.h @@ -82,6 +82,18 @@ struct StreamOptions { StreamInputHandler* handler; }; +struct StreamWriteOptions +{ + StreamWriteOptions() : write_in_background(false) {} + + // Write message to socket in background thread. + // Provides batch write effect and better performance in situations when + // you are continually issuing lots of StreamWrite or async RPC calls in + // only one thread. Otherwise, each StreamWrite directly writes message into + // socket and brings poor performance. + bool write_in_background; +}; + // [Called at the client side] // Create a stream at client-side along with the |cntl|, which will be connected // when receiving the response with a stream from server-side. If |options| is @@ -104,7 +116,8 @@ int StreamAccept(StreamId* response_stream, Controller &cntl, // - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size // which the remote side hasn't consumed yet excceeds the number. // - EINVAL: |stream_id| is invalied or has been closed -int StreamWrite(StreamId stream_id, const butil::IOBuf &message); +int StreamWrite(StreamId stream_id, const butil::IOBuf &message, + const StreamWriteOptions* options = NULL); // Write util the pending buffer size is less than |max_buf_size| or orrur // occurs diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h index 259f0b77f7..f24b75a352 100644 --- a/src/brpc/stream_impl.h +++ b/src/brpc/stream_impl.h @@ -42,7 +42,8 @@ class BAIDU_CACHELINE_ALIGNMENT Stream : public SocketConnection { // --------------------- SocketConnection -------------- - int AppendIfNotFull(const butil::IOBuf& msg); + int AppendIfNotFull(const butil::IOBuf& msg, + const StreamWriteOptions* options = NULL); static int Create(const StreamOptions& options, const StreamSettings *remote_settings, StreamId *id);