Skip to content

Commit

Permalink
Add options to control whether write to socket in background thread (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
MrGuin authored Aug 16, 2023
1 parent d72d320 commit 7d1df9f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
wopt.pipelined_count = _pipelined_count;
wopt.auth_flags = _auth_flags;
wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED);
wopt.write_in_background = write_to_socket_in_background();
int rc;
size_t packet_size = 0;
if (user_packet_guard) {
Expand Down
12 changes: 12 additions & 0 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19);
static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20);
static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21);
static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22);

public:
struct Inheritable {
Expand Down Expand Up @@ -353,6 +354,17 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
bool is_done_allowed_to_run_in_place() const
{ return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); }

// Create a background KEEPWRITE bthread to write to socket when issuing
// RPCs, instead of trying to write to socket once in calling thread (see
// `Socket::StartWrite` in socket.cpp).
// The socket write could take some time (several microseconds maybe), if
// you cares about it and don't want the calling thread to be blocked, you
// can set this flag.
// Should provides better batch effect in situations like when you are
// continually issuing lots of async RPC calls in only one thread.
void set_write_to_socket_in_background(bool f) { set_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND, f); }
bool write_to_socket_in_background() const { return has_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND); }

// ------------------------------------------------------------------------
// Server-side methods.
// These calls shall be made from the server side only. Their results are
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1687,7 +1687,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
// in some protocols(namely RTMP).
req->Setup(this);

if (ssl_state() != SSL_OFF) {
if (opt.write_in_background || ssl_state() != SSL_OFF) {
// Writing into SSL may block the current bthread, always write
// in the background.
goto KEEPWRITE_IN_BACKGROUND;
Expand Down
12 changes: 11 additions & 1 deletion src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,20 @@ friend class policy::H2GlobalStreamCreator;
// Default: false
bool ignore_eovercrowded;

// The calling thread directly creates KeepWrite thread to write into
// this socket, skipping writing once.
// In situations like when you are continually issuing lots of
// StreamWrite or async RPC calls in only one thread, directly creating
// KeepWrite thread at first provides batch write effect and better
// performance. Otherwise, each write only writes one `msg` into socket
// and no KeepWrite thread can be created, which brings poor
// performance.
bool write_in_background;

WriteOptions()
: id_wait(INVALID_BTHREAD_ID), abstime(NULL)
, pipelined_count(0), auth_flags(0)
, ignore_eovercrowded(false) {}
, ignore_eovercrowded(false), write_in_background(false) {}
};
int Write(butil::IOBuf *msg, const WriteOptions* options = NULL);

Expand Down
10 changes: 7 additions & 3 deletions src/brpc/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ void Stream::TriggerOnConnectIfNeed() {
bthread_mutex_unlock(&_connect_mutex);
}

int Stream::AppendIfNotFull(const butil::IOBuf &data) {
int Stream::AppendIfNotFull(const butil::IOBuf &data,
const StreamWriteOptions* options) {
if (_cur_buf_size > 0) {
std::unique_lock<bthread_mutex_t> lck(_congestion_control_mutex);
if (_produced >= _remote_consumed + _cur_buf_size) {
Expand All @@ -290,7 +291,9 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) {

size_t data_length = data.length();
butil::IOBuf copied_data(data);
const int rc = _fake_socket_weak_ref->Write(&copied_data);
Socket::WriteOptions wopt;
wopt.write_in_background = options != NULL && options->write_in_background;
const int rc = _fake_socket_weak_ref->Write(&copied_data, &wopt);
if (rc != 0) {
// Stream may be closed by peer before
LOG(WARNING) << "Fail to write to _fake_socket, " << berror();
Expand Down Expand Up @@ -679,7 +682,8 @@ void Stream::HandleRpcResponse(butil::IOBuf* response_buffer) {
policy::ProcessRpcResponse(msg);
}

int StreamWrite(StreamId stream_id, const butil::IOBuf &message) {
int StreamWrite(StreamId stream_id, const butil::IOBuf &message,
const StreamWriteOptions* options) {
SocketUniquePtr ptr;
if (Socket::Address(stream_id, &ptr) != 0) {
return EINVAL;
Expand Down
15 changes: 14 additions & 1 deletion src/brpc/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ struct StreamOptions {
StreamInputHandler* handler;
};

struct StreamWriteOptions
{
StreamWriteOptions() : write_in_background(false) {}

// Write message to socket in background thread.
// Provides batch write effect and better performance in situations when
// you are continually issuing lots of StreamWrite or async RPC calls in
// only one thread. Otherwise, each StreamWrite directly writes message into
// socket and brings poor performance.
bool write_in_background;
};

// [Called at the client side]
// Create a stream at client-side along with the |cntl|, which will be connected
// when receiving the response with a stream from server-side. If |options| is
Expand All @@ -104,7 +116,8 @@ int StreamAccept(StreamId* response_stream, Controller &cntl,
// - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size
// which the remote side hasn't consumed yet excceeds the number.
// - EINVAL: |stream_id| is invalied or has been closed
int StreamWrite(StreamId stream_id, const butil::IOBuf &message);
int StreamWrite(StreamId stream_id, const butil::IOBuf &message,
const StreamWriteOptions* options = NULL);

// Write util the pending buffer size is less than |max_buf_size| or orrur
// occurs
Expand Down
3 changes: 2 additions & 1 deletion src/brpc/stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class BAIDU_CACHELINE_ALIGNMENT Stream : public SocketConnection {

// --------------------- SocketConnection --------------

int AppendIfNotFull(const butil::IOBuf& msg);
int AppendIfNotFull(const butil::IOBuf& msg,
const StreamWriteOptions* options = NULL);
static int Create(const StreamOptions& options,
const StreamSettings *remote_settings,
StreamId *id);
Expand Down

0 comments on commit 7d1df9f

Please sign in to comment.