From d8d255f817d01c7ef97f1c57badff86546ef6a10 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Sun, 28 Jan 2024 15:58:31 +0800 Subject: [PATCH 1/2] Fix: set connected before send stream data Co-authored-by: Xin Liao --- src/brpc/policy/baidu_rpc_protocol.cpp | 30 ++++++++++++++++++++------ 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index b19fbb379b..05464ae9a4 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -25,6 +25,7 @@ #include "butil/iobuf.h" // butil::IOBuf #include "butil/raw_pack.h" // RawPacker RawUnpacker #include "brpc/controller.h" // Controller +#include "brpc/errno.pb.h" #include "brpc/socket.h" // Socket #include "brpc/server.h" // Server #include "brpc/span.h" @@ -216,7 +217,9 @@ void SendRpcResponse(int64_t correlation_id, if (Socket::Address(response_stream_id, &stream_ptr) == 0) { Stream* s = (Stream*)stream_ptr->conn(); s->FillSettings(meta.mutable_stream_settings()); - s->SetHostSocket(sock); + if (s->SetHostSocket(sock) != 0) { + LOG(WARNING) << "SetHostSocket failed"; + } } else { LOG(WARNING) << "Stream=" << response_stream_id << " was closed before sending response"; @@ -247,6 +250,25 @@ void SendRpcResponse(int64_t correlation_id, // Send rpc response over stream even if server side failed to create // stream for some reason. if(cntl->has_remote_stream()){ + // We have to check sock->Failed() before calling SetConnected(). + // Otherwise stream->_host_socket may be nullptr and cause CHECK failures. + if (sock->Failed()) { + LOG(WARNING) << "Fail to write into " << *sock; + cntl->SetFailed(EFAILEDSOCKET, "Fail to write into %s", + sock->description().c_str()); + if (stream_ptr) { + ((Stream *)stream_ptr->conn())->Close(); + } + return; + } + // If we don't set connected here before send the response, + // client-side may close the stream before server-side set connected. + // This will cause missing on_closed message on the client-side. + if (stream_ptr) { + // Now it's ok the mark this server-side stream as connectted as all the + // written user data would follower the RPC response. + ((Stream*)stream_ptr->conn())->SetConnected(); + } // Send the response over stream to notify that this stream connection // is successfully built. // Response_stream can be INVALID_STREAM_ID when error occurs. @@ -262,12 +284,6 @@ void SendRpcResponse(int64_t correlation_id, } return; } - - if(stream_ptr) { - // Now it's ok the mark this server-side stream as connected as all the - // written user data would follower the RPC response. - ((Stream*)stream_ptr->conn())->SetConnected(); - } } else{ // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency. From 3a6198ec335118c54981c08fe8012df72eced4b6 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Mon, 25 Mar 2024 23:32:26 +0800 Subject: [PATCH 2/2] improve --- src/brpc/policy/baidu_rpc_protocol.cpp | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 05464ae9a4..a01c5cc0b7 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -217,8 +217,14 @@ void SendRpcResponse(int64_t correlation_id, if (Socket::Address(response_stream_id, &stream_ptr) == 0) { Stream* s = (Stream*)stream_ptr->conn(); s->FillSettings(meta.mutable_stream_settings()); + // If failed to set host socket here, + // s->SetConnected will fail at CHECK(_host_socket != NULL) if (s->SetHostSocket(sock) != 0) { - LOG(WARNING) << "SetHostSocket failed"; + LOG(WARNING) << "Failed to set host socket " << *sock; + cntl->SetFailed(EFAILEDSOCKET, "Fail to set host socket %s", + sock->description().c_str()); + ((Stream *)stream_ptr->conn())->Close(); + return; } } else { LOG(WARNING) << "Stream=" << response_stream_id @@ -250,17 +256,6 @@ void SendRpcResponse(int64_t correlation_id, // Send rpc response over stream even if server side failed to create // stream for some reason. if(cntl->has_remote_stream()){ - // We have to check sock->Failed() before calling SetConnected(). - // Otherwise stream->_host_socket may be nullptr and cause CHECK failures. - if (sock->Failed()) { - LOG(WARNING) << "Fail to write into " << *sock; - cntl->SetFailed(EFAILEDSOCKET, "Fail to write into %s", - sock->description().c_str()); - if (stream_ptr) { - ((Stream *)stream_ptr->conn())->Close(); - } - return; - } // If we don't set connected here before send the response, // client-side may close the stream before server-side set connected. // This will cause missing on_closed message on the client-side.