From dad8e92034f3e00d5354d765b4ef74155b8c0b65 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Sun, 10 Mar 2024 17:16:03 +0800 Subject: [PATCH 1/2] Support on_failed callback for streaming rpc --- .gitignore | 3 + src/brpc/controller.cpp | 3 +- src/brpc/policy/baidu_rpc_protocol.cpp | 10 +- src/brpc/socket.cpp | 10 +- src/brpc/socket.h | 2 +- src/brpc/stream.cpp | 61 ++++- src/brpc/stream.h | 16 +- src/brpc/stream_impl.h | 11 +- test/brpc_streaming_rpc_unittest.cpp | 356 ++++++++++++++++++++++--- 9 files changed, 402 insertions(+), 70 deletions(-) diff --git a/.gitignore b/.gitignore index bdd210096b..9a60889afa 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,6 @@ CTestTestfile.cmake /test/curl.out /test/out.txt /test/recordio_ref.io + +# Ignore protoc-gen-mcpack files +/protoc-gen-mcpack*/ \ No newline at end of file diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index bfe278ffb9..2336601523 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1387,7 +1387,8 @@ void Controller::HandleStreamConnection(Socket *host_socket) { } } if (FailedInline()) { - Stream::SetFailed(_request_stream); + Stream::SetFailed(_request_stream, _error_code, + "%s", _error_text.c_str()); if (_remote_stream_settings != NULL) { policy::SendStreamRst(host_socket, _remote_stream_settings->stream_id()); diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index b19fbb379b..6fa17d6ca2 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -254,11 +254,13 @@ void SendRpcResponse(int64_t correlation_id, accessor.remote_stream_settings()->stream_id(), accessor.response_stream()) != 0) { const int errcode = errno; - PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; - cntl->SetFailed(errcode, "Fail to write into %s", - sock->description().c_str()); + std::string error_text = butil::string_printf(64, "Fail to write into %s", + sock->description().c_str()); + PLOG_IF(WARNING, errcode != EPIPE) << error_text; + cntl->SetFailed(errcode, "%s", error_text.c_str()); if(stream_ptr) { - ((Stream*)stream_ptr->conn())->Close(); + ((Stream*)stream_ptr->conn())->Close(errcode, "%s", + error_text.c_str()); } return; } diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 1f9b2a2e9b..edc3dee0b0 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -986,7 +986,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { &_id_wait_list, error_code, error_text, &_id_wait_list_mutex)); - ResetAllStreams(); + ResetAllStreams(error_code, error_text); // _app_connect shouldn't be set to NULL in SetFailed otherwise // HC is always not supported. // FIXME: Design a better interface for AppConnect @@ -2541,7 +2541,7 @@ int Socket::RemoveStream(StreamId stream_id) { return 0; } -void Socket::ResetAllStreams() { +void Socket::ResetAllStreams(int error_code, const std::string& error_text) { DCHECK(Failed()); std::set saved_stream_set; _stream_mutex.lock(); @@ -2552,9 +2552,9 @@ void Socket::ResetAllStreams() { saved_stream_set.swap(*_stream_set); } _stream_mutex.unlock(); - for (std::set::const_iterator - it = saved_stream_set.begin(); it != saved_stream_set.end(); ++it) { - Stream::SetFailed(*it); + for (auto it = saved_stream_set.begin(); + it != saved_stream_set.end(); ++it) { + Stream::SetFailed(*it, error_code, "%s", error_text.c_str()); } } diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 9d85aafaff..faf6baac64 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -706,7 +706,7 @@ friend void DereferenceSocket(Socket*); // broken socket. int AddStream(StreamId stream_id); int RemoveStream(StreamId stream_id); - void ResetAllStreams(); + void ResetAllStreams(int error_code, const std::string& error_text); bool ValidFileDescriptor(int fd); diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index b311c4b780..36ae9546d5 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -39,11 +39,20 @@ DECLARE_int64(socket_max_streams_unconsumed_bytes); const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L; +void StreamInputHandler::on_failed(StreamId id, int error_code, + const std::string& error_text) { + LOG(ERROR) << "`on_failed' should be override by user when " + "`split_closed_and_failed' returns true. " + "id=" << id << ", [" << error_code + << "] " << error_text; +} + Stream::Stream() : _host_socket(NULL) , _fake_socket_weak_ref(NULL) , _connected(false) , _closed(false) + , _error_code(0) , _produced(0) , _remote_consumed(0) , _cur_buf_size(0) @@ -74,6 +83,7 @@ int Stream::Create(const StreamOptions &options, s->_connected = false; s->_options = options; s->_closed = false; + s->_error_code = 0; s->_cur_buf_size = options.max_buf_size > 0 ? options.max_buf_size : 0; if (options.max_buf_size > 0 && options.min_buf_size > options.max_buf_size) { // set 0 if min_buf_size is invalid. @@ -131,7 +141,7 @@ void Stream::BeforeRecycle(Socket *) { if (_host_socket) { _host_socket->RemoveStream(id()); } - + // The instance is to be deleted in the consumer thread bthread::execution_queue_stop(_consumer_queue); } @@ -466,21 +476,22 @@ int Stream::OnReceived(const StreamFrameMeta& fm, butil::IOBuf *buf, Socket* soc if (!fm.has_continuation()) { butil::IOBuf *tmp = _pending_buf; _pending_buf = NULL; - if (bthread::execution_queue_execute(_consumer_queue, tmp) != 0) { + int rc = bthread::execution_queue_execute(_consumer_queue, tmp); + if (rc != 0) { CHECK(false) << "Fail to push into channel"; delete tmp; - Close(); + Close(rc, "Fail to push into channel"); } } break; case FRAME_TYPE_RST: RPC_VLOG << "stream=" << id() << " received rst frame"; - Close(); + Close(ECONNRESET, "Received RST frame"); break; case FRAME_TYPE_CLOSE: RPC_VLOG << "stream=" << id() << " received close frame"; // TODO:: See the comments in Consume - Close(); + Close(0, "Received CLOSE frame"); break; case FRAME_TYPE_UNKNOWN: RPC_VLOG << "Received unknown frame"; @@ -530,15 +541,29 @@ int Stream::Consume(void *meta, bthread::TaskIterator& iter) { Stream* s = (Stream*)meta; s->StopIdleTimer(); if (iter.is_queue_stopped()) { - // indicating the queue was closed + scoped_ptr recycled_stream(s); + // Indicating the queue was closed. if (s->_host_socket) { DereferenceSocket(s->_host_socket); s->_host_socket = NULL; } if (s->_options.handler != NULL) { + if (s->_options.handler->split_closed_and_failed()) { + // Split closed and failed according to the error code. + int error_code; + std::string error_text; + { + BAIDU_SCOPED_LOCK(s->_connect_mutex); + error_code = s->_error_code; + error_text = s->_error_text; + } + if (error_code != 0) { + s->_options.handler->on_failed(s->id(), error_code, error_text); + return 0; + } + } s->_options.handler->on_closed(s->id()); } - delete s; return 0; } DEFINE_SMALL_ARRAY(butil::IOBuf*, buf_list, s->_options.messages_in_batch, 256); @@ -630,7 +655,7 @@ void Stream::StopIdleTimer() { } } -void Stream::Close() { +void Stream::Close(int error_code, const char* reason_fmt, ...) { _fake_socket_weak_ref->SetFailed(); bthread_mutex_lock(&_connect_mutex); if (_closed) { @@ -638,6 +663,13 @@ void Stream::Close() { return; } _closed = true; + _error_code = error_code; + + va_list ap; + va_start(ap, reason_fmt); + butil::string_vappendf(&_error_text, reason_fmt, ap); + va_end(ap); + if (_connected) { bthread_mutex_unlock(&_connect_mutex); return; @@ -647,14 +679,17 @@ void Stream::Close() { return TriggerOnConnectIfNeed(); } -int Stream::SetFailed(StreamId id) { +int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) { SocketUniquePtr ptr; if (Socket::AddressFailedAsWell(id, &ptr) == -1) { // Don't care recycled stream return 0; } Stream* s = (Stream*)ptr->conn(); - s->Close(); + va_list ap; + va_start(ap, reason_fmt); + s->Close(error_code, reason_fmt, ap); + va_end(ap); return 0; } @@ -665,13 +700,13 @@ void Stream::HandleRpcResponse(butil::IOBuf* response_buffer) { ParseResult pr = policy::ParseRpcMessage(response_buffer, NULL, true, NULL); if (!pr.is_ok()) { CHECK(false); - Close(); + Close(EPROTO, "Fail to parse rpc response message"); return; } InputMessageBase* msg = pr.message(); if (msg == NULL) { CHECK(false); - Close(); + Close(ENOMEM, "Message is NULL"); return; } _host_socket->PostponeEOF(); @@ -730,7 +765,7 @@ int StreamWait(StreamId stream_id, const timespec* due_time) { } int StreamClose(StreamId stream_id) { - return Stream::SetFailed(stream_id); + return Stream::SetFailed(stream_id, 0, "Local close"); } int StreamCreate(StreamId *request_stream, Controller &cntl, diff --git a/src/brpc/stream.h b/src/brpc/stream.h index 90965f371f..3df3d77ce2 100644 --- a/src/brpc/stream.h +++ b/src/brpc/stream.h @@ -44,7 +44,18 @@ class StreamInputHandler { butil::IOBuf *const messages[], size_t size) = 0; virtual void on_idle_timeout(StreamId id) = 0; - virtual void on_closed(StreamId id) = 0; + // 1. If `split_closed_and_failed` returns false(default), + // only `on_closed` will be called. + // 2. If `split_closed_and_failed` returns true, + // 2.1 `on_closed` will be called Whether `StreamClose` + // is called by local side or remote side. + // 2.2 `on_failed` will be called when the stream is + // closed abnormally. + virtual void on_closed(StreamId id) = 0; + virtual void on_failed(StreamId id, int error_code, + const std::string& error_text); + + virtual bool split_closed_and_failed() const { return false; } }; struct StreamOptions { @@ -82,8 +93,7 @@ struct StreamOptions { StreamInputHandler* handler; }; -struct StreamWriteOptions -{ +struct StreamWriteOptions { StreamWriteOptions() : write_in_background(false) {} // Write message to socket in background thread. diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h index f24b75a352..db92dd63d6 100644 --- a/src/brpc/stream_impl.h +++ b/src/brpc/stream_impl.h @@ -61,13 +61,16 @@ class BAIDU_CACHELINE_ALIGNMENT Stream : public SocketConnection { const timespec *due_time); int Wait(const timespec* due_time); void FillSettings(StreamSettings *settings); - static int SetFailed(StreamId id); - void Close(); + static int SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) + __attribute__ ((__format__ (__printf__, 3, 4))); + void Close(int error_code, const char* reason_fmt, ...) + __attribute__ ((__format__ (__printf__, 3, 4))); private: friend void StreamWait(StreamId stream_id, const timespec *due_time, - void (*on_writable)(StreamId, void*, int), void *arg); + void (*on_writable)(StreamId, void*, int), void *arg); friend class MessageBatcher; +friend struct butil::DefaultDeleter; Stream(); ~Stream(); int Init(const StreamOptions options); @@ -111,6 +114,8 @@ friend class MessageBatcher; ConnectMeta _connect_meta; bool _connected; bool _closed; + int _error_code; + std::string _error_text; bthread_mutex_t _congestion_control_mutex; size_t _produced; diff --git a/test/brpc_streaming_rpc_unittest.cpp b/test/brpc_streaming_rpc_unittest.cpp index f7e62c8114..5ce0479035 100644 --- a/test/brpc_streaming_rpc_unittest.cpp +++ b/test/brpc_streaming_rpc_unittest.cpp @@ -20,11 +20,12 @@ // Date: 2015/10/22 16:28:44 #include - #include "brpc/server.h" + #include "brpc/controller.h" #include "brpc/channel.h" #include "brpc/stream_impl.h" +#include "brpc/policy/streaming_rpc_protocol.h" #include "echo.pb.h" class AfterAcceptStream { @@ -67,12 +68,45 @@ class MyServiceWithStream : public test::EchoService { AfterAcceptStream* _after_accept_stream; }; +struct HandlerControl { + HandlerControl() + : block(false) + {} + bool block; +}; + class StreamingRpcTest : public testing::Test { protected: - test::EchoRequest request; - test::EchoResponse response; void SetUp() { request.set_message("hello world"); } void TearDown() {} + + template + void TestReceivedInOrder(brpc::StreamOptions& opt, + Handler& handler); + + template + void TestBlock(brpc::StreamOptions& opt, + Handler& handler, HandlerControl& hc); + + template + void TestAutoClose(brpc::StreamOptions& opt, + Handler& handler, HandlerControl& hc); + + template + void TestIdleTimeout(brpc::StreamOptions& opt, + Handler& handler, HandlerControl& hc); + + template + void TestPingPong(brpc::StreamOptions& opt, + Handler& handler); + + template + void TestServerSendDataBeforeRunDone(brpc::StreamOptions& opt, + Handler& handler); + + + test::EchoRequest request; + test::EchoResponse response; }; TEST_F(StreamingRpcTest, sanity) { @@ -95,13 +129,6 @@ TEST_F(StreamingRpcTest, sanity) { server.Join(); } -struct HandlerControl { - HandlerControl() - : block(false) - {} - bool block; -}; - class OrderedInputHandler : public brpc::StreamInputHandler { public: explicit OrderedInputHandler(HandlerControl *cntl = NULL) @@ -110,11 +137,58 @@ class OrderedInputHandler : public brpc::StreamInputHandler { , _stopped(false) , _idle_times(0) , _cntl(cntl) - { + {} + + int on_received_messages(brpc::StreamId /*id*/, + butil::IOBuf *const messages[], + size_t size) override { + if (_cntl && _cntl->block) { + while (_cntl->block) { + usleep(100); + } + } + for (size_t i = 0; i < size; ++i) { + CHECK(messages[i]->length() == sizeof(int)); + int network = 0; + messages[i]->cutn(&network, sizeof(int)); + EXPECT_EQ((int)ntohl(network), _expected_next_value++); + } + return 0; + } + + void on_idle_timeout(brpc::StreamId /*id*/) override { + ++_idle_times; + } + + void on_closed(brpc::StreamId /*id*/) override { + ASSERT_FALSE(_stopped); + _stopped = true; } + + bool failed() const { return _failed; } + bool stopped() const { return _stopped; } + int idle_times() const { return _idle_times; } +private: + int _expected_next_value; + bool _failed; + bool _stopped; + int _idle_times; + HandlerControl* _cntl; +}; + +class OrderedInputHandler2 : public brpc::StreamInputHandler { +public: + explicit OrderedInputHandler2(HandlerControl *cntl = NULL) + : _expected_next_value(0) + , _failed(false) + , _stopped(false) + , _idle_times(0) + , _cntl(cntl) + {} + int on_received_messages(brpc::StreamId /*id*/, butil::IOBuf *const messages[], - size_t size) { + size_t size) override { if (_cntl && _cntl->block) { while (_cntl->block) { usleep(100); @@ -129,15 +203,27 @@ class OrderedInputHandler : public brpc::StreamInputHandler { return 0; } - void on_idle_timeout(brpc::StreamId /*id*/) { + void on_idle_timeout(brpc::StreamId /*id*/) override { ++_idle_times; } - void on_closed(brpc::StreamId /*id*/) { + void on_closed(brpc::StreamId /*id*/) override { ASSERT_FALSE(_stopped); _stopped = true; } + void on_failed(brpc::StreamId id, int error_code, + const std::string& /*error_text*/) override { + ASSERT_FALSE(_stopped || _failed); + if (error_code == 0) { + _stopped = true; + } else { + _failed = true; + } + } + + bool split_closed_and_failed() const override { return true; } + bool failed() const { return _failed; } bool stopped() const { return _stopped; } int idle_times() const { return _idle_times; } @@ -149,10 +235,9 @@ class OrderedInputHandler : public brpc::StreamInputHandler { HandlerControl* _cntl; }; -TEST_F(StreamingRpcTest, received_in_order) { - OrderedInputHandler handler; - brpc::StreamOptions opt; - opt.handler = &handler; +template +void StreamingRpcTest::TestReceivedInOrder(brpc::StreamOptions& opt, + Handler& handler) { opt.messages_in_batch = 100; brpc::Server server; MyServiceWithStream service(opt); @@ -187,19 +272,29 @@ TEST_F(StreamingRpcTest, received_in_order) { ASSERT_EQ(N, handler._expected_next_value); } +TEST_F(StreamingRpcTest, received_in_order) { + OrderedInputHandler handler; + brpc::StreamOptions opt; + opt.handler = &handler; + TestReceivedInOrder(opt, handler); + + OrderedInputHandler2 handler2; + brpc::StreamOptions opt2; + opt2.handler = &handler2; + TestReceivedInOrder(opt2, handler2); +} + void on_writable(brpc::StreamId, void* arg, int error_code) { std::pair* p = (std::pair*)arg; p->first = true; p->second = error_code; - LOG(INFO) << "error_code=" << error_code; + // LOG(INFO) << "error_code=" << error_code; } -TEST_F(StreamingRpcTest, block) { - HandlerControl hc; - OrderedInputHandler handler(&hc); +template +void StreamingRpcTest::TestBlock(brpc::StreamOptions& opt, + Handler& handler, HandlerControl& hc) { hc.block = true; - brpc::StreamOptions opt; - opt.handler = &handler; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; @@ -216,7 +311,7 @@ TEST_F(StreamingRpcTest, block) { ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); test::EchoService_Stub stub(&channel); stub.Echo(&cntl, &request, &response, NULL); - ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; for (int i = 0; i < N; ++i) { int network = htonl(i); @@ -293,12 +388,34 @@ TEST_F(StreamingRpcTest, block) { ASSERT_EQ(N + N + N, handler._expected_next_value); } -TEST_F(StreamingRpcTest, auto_close_if_host_socket_closed) { +TEST_F(StreamingRpcTest, block) { HandlerControl hc; OrderedInputHandler handler(&hc); - hc.block = true; brpc::StreamOptions opt; opt.handler = &handler; + TestBlock(opt, handler, hc); + + OrderedInputHandler2 handler2(&hc); + brpc::StreamOptions opt2; + opt2.handler = &handler2; + TestBlock(opt2, handler2, hc); +} + +void HandleFailed(OrderedInputHandler& handler) { + ASSERT_FALSE(handler.failed()); + ASSERT_TRUE(handler.stopped()); +} + +// brpc::StreamInputHandler2 is failed, not closed. +void HandleFailed(OrderedInputHandler2& handler) { + ASSERT_TRUE(handler.failed()); + ASSERT_FALSE(handler.stopped()); +} + +template +void StreamingRpcTest::TestAutoClose(brpc::StreamOptions& opt, + Handler& handler, HandlerControl& hc) { + hc.block = true; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; @@ -322,27 +439,86 @@ TEST_F(StreamingRpcTest, auto_close_if_host_socket_closed) { ASSERT_EQ(0, brpc::Socket::Address(request_stream, &ptr)); brpc::Stream* s = (brpc::Stream*)ptr->conn(); ASSERT_TRUE(s->_host_socket != NULL); - s->_host_socket->SetFailed(); + s->_host_socket->SetFailed(10000, ""); } usleep(100); butil::IOBuf out; out.append("test"); ASSERT_EQ(EINVAL, brpc::StreamWrite(request_stream, out)); - while (!handler.stopped()) { + while (!handler.stopped() && !handler.failed()) { usleep(100); } - ASSERT_FALSE(handler.failed()); + HandleFailed(handler); ASSERT_EQ(0, handler.idle_times()); ASSERT_EQ(0, handler._expected_next_value); } -TEST_F(StreamingRpcTest, idle_timeout) { +TEST_F(StreamingRpcTest, auto_close_if_host_socket_closed) { HandlerControl hc; OrderedInputHandler handler(&hc); - hc.block = true; brpc::StreamOptions opt; opt.handler = &handler; + TestAutoClose(opt, handler, hc); + + OrderedInputHandler2 handler2(&hc); + brpc::StreamOptions opt2; + opt2.handler = &handler2; + TestAutoClose(opt2, handler2, hc); +} + +TEST_F(StreamingRpcTest, failed_when_rst) { + OrderedInputHandler2 handler; + brpc::StreamOptions opt; + opt.handler = &handler; + opt.messages_in_batch = 100; + brpc::Server server; + MyServiceWithStream service(opt); + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + ASSERT_EQ(0, server.Start(9007, NULL)); + brpc::Channel channel; + ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); + brpc::Controller cntl; + brpc::StreamId request_stream; + brpc::StreamOptions request_stream_options; + request_stream_options.max_buf_size = 0; + ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); + brpc::ScopedStream stream_guard(request_stream); + test::EchoService_Stub stub(&channel); + stub.Echo(&cntl, &request, &response, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; + const int N = 10000; + for (int i = 0; i < N; ++i) { + int network = htonl(i); + butil::IOBuf out; + out.append(&network, sizeof(network)); + ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i; + } + + usleep(1000 * 10); + { + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(request_stream, &ptr)); + brpc::Stream* s = (brpc::Stream*)ptr->conn(); + ASSERT_TRUE(s->_host_socket != NULL); + brpc::policy::SendStreamRst(s->_host_socket, + s->_remote_settings.stream_id()); + } + // ASSERT_EQ(0, brpc::StreamClose(request_stream)); + server.Stop(0); + server.Join(); + while (!handler.stopped() && !handler.failed()) { + usleep(100); + } + HandleFailed(handler); + ASSERT_EQ(0, handler.idle_times()); + ASSERT_EQ(N, handler._expected_next_value); +} + +template +void StreamingRpcTest::TestIdleTimeout(brpc::StreamOptions& opt, + Handler& handler, HandlerControl& hc) { + hc.block = true; opt.idle_timeout_ms = 2; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; @@ -372,6 +548,19 @@ TEST_F(StreamingRpcTest, idle_timeout) { ASSERT_EQ(0, handler._expected_next_value); } +TEST_F(StreamingRpcTest, idle_timeout) { + HandlerControl hc; + OrderedInputHandler handler(&hc); + brpc::StreamOptions opt; + opt.handler = &handler; + TestIdleTimeout(opt, handler, hc); + + OrderedInputHandler2 handler2(&hc); + brpc::StreamOptions opt2; + opt2.handler = &handler2; + TestIdleTimeout(opt2, handler2, hc); +} + class PingPongHandler : public brpc::StreamInputHandler { public: explicit PingPongHandler() @@ -425,10 +614,74 @@ class PingPongHandler : public brpc::StreamInputHandler { int _idle_times; }; -TEST_F(StreamingRpcTest, ping_pong) { - PingPongHandler resh; - brpc::StreamOptions opt; - opt.handler = &resh; +class PingPongHandler2 : public brpc::StreamInputHandler { +public: + explicit PingPongHandler2() + : _expected_next_value(0) + , _failed(false) + , _stopped(false) + , _idle_times(0) + {} + + int on_received_messages(brpc::StreamId id, + butil::IOBuf *const messages[], + size_t size) override { + if (size != 1) { + _failed = true; + return 0; + } + for (size_t i = 0; i < size; ++i) { + CHECK(messages[i]->length() == sizeof(int)); + int network = 0; + messages[i]->cutn(&network, sizeof(int)); + if ((int)ntohl(network) != _expected_next_value) { + _failed = true; + } + int send_back = ntohl(network) + 1; + _expected_next_value = send_back + 1; + butil::IOBuf out; + network = htonl(send_back); + out.append(&network, sizeof(network)); + // don't care the return value + brpc::StreamWrite(id, out); + } + return 0; + } + + void on_idle_timeout(brpc::StreamId /*id*/) override { + ++_idle_times; + } + + void on_closed(brpc::StreamId /*id*/) override { + ASSERT_FALSE(_stopped); + _stopped = true; + } + + void on_failed(brpc::StreamId id, int error_code, + const std::string& /*error_text*/) override { + ASSERT_FALSE(_stopped || _failed); + if (error_code == 0) { + _stopped = true; + } else { + _failed = true; + } + } + + bool split_closed_and_failed() const override { return true; } + + bool failed() const { return _failed; } + bool stopped() const { return _stopped; } + int idle_times() const { return _idle_times; } +private: + int _expected_next_value; + bool _failed; + bool _stopped; + int _idle_times; +}; + +template +void StreamingRpcTest::TestPingPong(brpc::StreamOptions& opt, + Handler& resh) { const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; @@ -464,6 +717,18 @@ TEST_F(StreamingRpcTest, ping_pong) { ASSERT_EQ(0, reqh.idle_times()); } +TEST_F(StreamingRpcTest, ping_pong) { + PingPongHandler resh; + brpc::StreamOptions opt; + opt.handler = &resh; + TestPingPong(opt, resh); + + PingPongHandler2 resh2; + brpc::StreamOptions opt2; + opt2.handler = &resh2; + TestPingPong(opt2, resh2); +} + class SendNAfterAcceptStream : public AfterAcceptStream { public: explicit SendNAfterAcceptStream(int n) @@ -480,7 +745,9 @@ class SendNAfterAcceptStream : public AfterAcceptStream { int _n; }; -TEST_F(StreamingRpcTest, server_send_data_before_run_done) { +template +void StreamingRpcTest::TestServerSendDataBeforeRunDone( + brpc::StreamOptions& request_stream_options, Handler& handler) { const int N = 10000; SendNAfterAcceptStream after_accept(N); brpc::StreamOptions opt; @@ -491,11 +758,8 @@ TEST_F(StreamingRpcTest, server_send_data_before_run_done) { ASSERT_EQ(0, server.Start(9007, NULL)); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); - OrderedInputHandler handler; - brpc::StreamOptions request_stream_options; brpc::StreamId request_stream; brpc::Controller cntl; - request_stream_options.handler = &handler; ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); brpc::ScopedStream stream_guard(request_stream); test::EchoService_Stub stub(&channel); @@ -512,3 +776,15 @@ TEST_F(StreamingRpcTest, server_send_data_before_run_done) { ASSERT_FALSE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); } + +TEST_F(StreamingRpcTest, server_send_data_before_run_done) { + OrderedInputHandler handler; + brpc::StreamOptions request_stream_options; + request_stream_options.handler = &handler; + TestServerSendDataBeforeRunDone(request_stream_options, handler); + + OrderedInputHandler2 handler2; + brpc::StreamOptions request_stream_options2; + request_stream_options2.handler = &handler2; + TestServerSendDataBeforeRunDone(request_stream_options2, handler2); +} From 485ca103931bccda2c1e69dddfea310e1c23aee2 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Mon, 11 Mar 2024 23:16:33 +0800 Subject: [PATCH 2/2] Call on_failed before on_closed --- src/brpc/stream.cpp | 33 +-- src/brpc/stream.h | 13 +- test/brpc_streaming_rpc_unittest.cpp | 299 ++++----------------------- 3 files changed, 58 insertions(+), 287 deletions(-) diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index 36ae9546d5..27f87c6f49 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -39,14 +39,6 @@ DECLARE_int64(socket_max_streams_unconsumed_bytes); const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L; -void StreamInputHandler::on_failed(StreamId id, int error_code, - const std::string& error_text) { - LOG(ERROR) << "`on_failed' should be override by user when " - "`split_closed_and_failed' returns true. " - "id=" << id << ", [" << error_code - << "] " << error_text; -} - Stream::Stream() : _host_socket(NULL) , _fake_socket_weak_ref(NULL) @@ -528,7 +520,7 @@ class MessageBatcher { _total_length += buf->length(); } - size_t total_length() { return _total_length; } + size_t total_length() const { return _total_length; } private: butil::IOBuf** _storage; size_t _cap; @@ -548,19 +540,16 @@ int Stream::Consume(void *meta, bthread::TaskIterator& iter) { s->_host_socket = NULL; } if (s->_options.handler != NULL) { - if (s->_options.handler->split_closed_and_failed()) { - // Split closed and failed according to the error code. - int error_code; - std::string error_text; - { - BAIDU_SCOPED_LOCK(s->_connect_mutex); - error_code = s->_error_code; - error_text = s->_error_text; - } - if (error_code != 0) { - s->_options.handler->on_failed(s->id(), error_code, error_text); - return 0; - } + int error_code; + std::string error_text; + { + BAIDU_SCOPED_LOCK(s->_connect_mutex); + error_code = s->_error_code; + error_text = s->_error_text; + } + if (error_code != 0) { + // The stream is closed abnormally. + s->_options.handler->on_failed(s->id(), error_code, error_text); } s->_options.handler->on_closed(s->id()); } diff --git a/src/brpc/stream.h b/src/brpc/stream.h index 3df3d77ce2..f222ba0940 100644 --- a/src/brpc/stream.h +++ b/src/brpc/stream.h @@ -44,18 +44,11 @@ class StreamInputHandler { butil::IOBuf *const messages[], size_t size) = 0; virtual void on_idle_timeout(StreamId id) = 0; - // 1. If `split_closed_and_failed` returns false(default), - // only `on_closed` will be called. - // 2. If `split_closed_and_failed` returns true, - // 2.1 `on_closed` will be called Whether `StreamClose` - // is called by local side or remote side. - // 2.2 `on_failed` will be called when the stream is - // closed abnormally. virtual void on_closed(StreamId id) = 0; + // `on_failed` will be called before `on_closed` + // when the stream is closed abnormally. virtual void on_failed(StreamId id, int error_code, - const std::string& error_text); - - virtual bool split_closed_and_failed() const { return false; } + const std::string& error_text) {} }; struct StreamOptions { diff --git a/test/brpc_streaming_rpc_unittest.cpp b/test/brpc_streaming_rpc_unittest.cpp index 5ce0479035..df6a37d888 100644 --- a/test/brpc_streaming_rpc_unittest.cpp +++ b/test/brpc_streaming_rpc_unittest.cpp @@ -68,43 +68,11 @@ class MyServiceWithStream : public test::EchoService { AfterAcceptStream* _after_accept_stream; }; -struct HandlerControl { - HandlerControl() - : block(false) - {} - bool block; -}; - class StreamingRpcTest : public testing::Test { protected: void SetUp() { request.set_message("hello world"); } void TearDown() {} - template - void TestReceivedInOrder(brpc::StreamOptions& opt, - Handler& handler); - - template - void TestBlock(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc); - - template - void TestAutoClose(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc); - - template - void TestIdleTimeout(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc); - - template - void TestPingPong(brpc::StreamOptions& opt, - Handler& handler); - - template - void TestServerSendDataBeforeRunDone(brpc::StreamOptions& opt, - Handler& handler); - - test::EchoRequest request; test::EchoResponse response; }; @@ -129,56 +97,16 @@ TEST_F(StreamingRpcTest, sanity) { server.Join(); } -class OrderedInputHandler : public brpc::StreamInputHandler { -public: - explicit OrderedInputHandler(HandlerControl *cntl = NULL) - : _expected_next_value(0) - , _failed(false) - , _stopped(false) - , _idle_times(0) - , _cntl(cntl) +struct HandlerControl { + HandlerControl() + : block(false) {} - - int on_received_messages(brpc::StreamId /*id*/, - butil::IOBuf *const messages[], - size_t size) override { - if (_cntl && _cntl->block) { - while (_cntl->block) { - usleep(100); - } - } - for (size_t i = 0; i < size; ++i) { - CHECK(messages[i]->length() == sizeof(int)); - int network = 0; - messages[i]->cutn(&network, sizeof(int)); - EXPECT_EQ((int)ntohl(network), _expected_next_value++); - } - return 0; - } - - void on_idle_timeout(brpc::StreamId /*id*/) override { - ++_idle_times; - } - - void on_closed(brpc::StreamId /*id*/) override { - ASSERT_FALSE(_stopped); - _stopped = true; - } - - bool failed() const { return _failed; } - bool stopped() const { return _stopped; } - int idle_times() const { return _idle_times; } -private: - int _expected_next_value; - bool _failed; - bool _stopped; - int _idle_times; - HandlerControl* _cntl; + bool block; }; -class OrderedInputHandler2 : public brpc::StreamInputHandler { +class OrderedInputHandler : public brpc::StreamInputHandler { public: - explicit OrderedInputHandler2(HandlerControl *cntl = NULL) + explicit OrderedInputHandler(HandlerControl *cntl = NULL) : _expected_next_value(0) , _failed(false) , _stopped(false) @@ -214,16 +142,11 @@ class OrderedInputHandler2 : public brpc::StreamInputHandler { void on_failed(brpc::StreamId id, int error_code, const std::string& /*error_text*/) override { - ASSERT_FALSE(_stopped || _failed); - if (error_code == 0) { - _stopped = true; - } else { - _failed = true; - } + ASSERT_FALSE(_failed); + ASSERT_NE(0, error_code); + _failed = true; } - bool split_closed_and_failed() const override { return true; } - bool failed() const { return _failed; } bool stopped() const { return _stopped; } int idle_times() const { return _idle_times; } @@ -235,9 +158,10 @@ class OrderedInputHandler2 : public brpc::StreamInputHandler { HandlerControl* _cntl; }; -template -void StreamingRpcTest::TestReceivedInOrder(brpc::StreamOptions& opt, - Handler& handler) { +TEST_F(StreamingRpcTest, received_in_order) { + OrderedInputHandler handler; + brpc::StreamOptions opt; + opt.handler = &handler; opt.messages_in_batch = 100; brpc::Server server; MyServiceWithStream service(opt); @@ -272,29 +196,19 @@ void StreamingRpcTest::TestReceivedInOrder(brpc::StreamOptions& opt, ASSERT_EQ(N, handler._expected_next_value); } -TEST_F(StreamingRpcTest, received_in_order) { - OrderedInputHandler handler; - brpc::StreamOptions opt; - opt.handler = &handler; - TestReceivedInOrder(opt, handler); - - OrderedInputHandler2 handler2; - brpc::StreamOptions opt2; - opt2.handler = &handler2; - TestReceivedInOrder(opt2, handler2); -} - void on_writable(brpc::StreamId, void* arg, int error_code) { std::pair* p = (std::pair*)arg; p->first = true; p->second = error_code; - // LOG(INFO) << "error_code=" << error_code; + LOG(INFO) << "error_code=" << error_code; } -template -void StreamingRpcTest::TestBlock(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc) { +TEST_F(StreamingRpcTest, block) { + HandlerControl hc; hc.block = true; + OrderedInputHandler handler(&hc); + brpc::StreamOptions opt; + opt.handler = &handler; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; @@ -388,34 +302,12 @@ void StreamingRpcTest::TestBlock(brpc::StreamOptions& opt, ASSERT_EQ(N + N + N, handler._expected_next_value); } -TEST_F(StreamingRpcTest, block) { +TEST_F(StreamingRpcTest, auto_close_if_host_socket_closed) { HandlerControl hc; + hc.block = true; OrderedInputHandler handler(&hc); brpc::StreamOptions opt; opt.handler = &handler; - TestBlock(opt, handler, hc); - - OrderedInputHandler2 handler2(&hc); - brpc::StreamOptions opt2; - opt2.handler = &handler2; - TestBlock(opt2, handler2, hc); -} - -void HandleFailed(OrderedInputHandler& handler) { - ASSERT_FALSE(handler.failed()); - ASSERT_TRUE(handler.stopped()); -} - -// brpc::StreamInputHandler2 is failed, not closed. -void HandleFailed(OrderedInputHandler2& handler) { - ASSERT_TRUE(handler.failed()); - ASSERT_FALSE(handler.stopped()); -} - -template -void StreamingRpcTest::TestAutoClose(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc) { - hc.block = true; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; @@ -439,36 +331,23 @@ void StreamingRpcTest::TestAutoClose(brpc::StreamOptions& opt, ASSERT_EQ(0, brpc::Socket::Address(request_stream, &ptr)); brpc::Stream* s = (brpc::Stream*)ptr->conn(); ASSERT_TRUE(s->_host_socket != NULL); - s->_host_socket->SetFailed(10000, ""); + s->_host_socket->SetFailed(); } usleep(100); butil::IOBuf out; out.append("test"); ASSERT_EQ(EINVAL, brpc::StreamWrite(request_stream, out)); - while (!handler.stopped() && !handler.failed()) { + while (!handler.stopped()) { usleep(100); } - HandleFailed(handler); + ASSERT_TRUE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); ASSERT_EQ(0, handler._expected_next_value); } -TEST_F(StreamingRpcTest, auto_close_if_host_socket_closed) { - HandlerControl hc; - OrderedInputHandler handler(&hc); - brpc::StreamOptions opt; - opt.handler = &handler; - TestAutoClose(opt, handler, hc); - - OrderedInputHandler2 handler2(&hc); - brpc::StreamOptions opt2; - opt2.handler = &handler2; - TestAutoClose(opt2, handler2, hc); -} - TEST_F(StreamingRpcTest, failed_when_rst) { - OrderedInputHandler2 handler; + OrderedInputHandler handler; brpc::StreamOptions opt; opt.handler = &handler; opt.messages_in_batch = 100; @@ -510,15 +389,17 @@ TEST_F(StreamingRpcTest, failed_when_rst) { while (!handler.stopped() && !handler.failed()) { usleep(100); } - HandleFailed(handler); + ASSERT_TRUE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); ASSERT_EQ(N, handler._expected_next_value); } -template -void StreamingRpcTest::TestIdleTimeout(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc) { +TEST_F(StreamingRpcTest, idle_timeout) { + HandlerControl hc; hc.block = true; + OrderedInputHandler handler(&hc); + brpc::StreamOptions opt; + opt.handler = &handler; opt.idle_timeout_ms = 2; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; @@ -548,19 +429,6 @@ void StreamingRpcTest::TestIdleTimeout(brpc::StreamOptions& opt, ASSERT_EQ(0, handler._expected_next_value); } -TEST_F(StreamingRpcTest, idle_timeout) { - HandlerControl hc; - OrderedInputHandler handler(&hc); - brpc::StreamOptions opt; - opt.handler = &handler; - TestIdleTimeout(opt, handler, hc); - - OrderedInputHandler2 handler2(&hc); - brpc::StreamOptions opt2; - opt2.handler = &handler2; - TestIdleTimeout(opt2, handler2, hc); -} - class PingPongHandler : public brpc::StreamInputHandler { public: explicit PingPongHandler() @@ -570,59 +438,6 @@ class PingPongHandler : public brpc::StreamInputHandler { , _idle_times(0) { } - int on_received_messages(brpc::StreamId id, - butil::IOBuf *const messages[], - size_t size) { - if (size != 1) { - _failed = true; - return 0; - } - for (size_t i = 0; i < size; ++i) { - CHECK(messages[i]->length() == sizeof(int)); - int network = 0; - messages[i]->cutn(&network, sizeof(int)); - if ((int)ntohl(network) != _expected_next_value) { - _failed = true; - } - int send_back = ntohl(network) + 1; - _expected_next_value = send_back + 1; - butil::IOBuf out; - network = htonl(send_back); - out.append(&network, sizeof(network)); - // don't care the return value - brpc::StreamWrite(id, out); - } - return 0; - } - - void on_idle_timeout(brpc::StreamId /*id*/) { - ++_idle_times; - } - - void on_closed(brpc::StreamId /*id*/) { - ASSERT_FALSE(_stopped); - _stopped = true; - } - - bool failed() const { return _failed; } - bool stopped() const { return _stopped; } - int idle_times() const { return _idle_times; } -private: - int _expected_next_value; - bool _failed; - bool _stopped; - int _idle_times; -}; - -class PingPongHandler2 : public brpc::StreamInputHandler { -public: - explicit PingPongHandler2() - : _expected_next_value(0) - , _failed(false) - , _stopped(false) - , _idle_times(0) - {} - int on_received_messages(brpc::StreamId id, butil::IOBuf *const messages[], size_t size) override { @@ -657,18 +472,14 @@ class PingPongHandler2 : public brpc::StreamInputHandler { _stopped = true; } + void on_failed(brpc::StreamId id, int error_code, - const std::string& /*error_text*/) override { - ASSERT_FALSE(_stopped || _failed); - if (error_code == 0) { - _stopped = true; - } else { - _failed = true; - } + const std::string& /*error_text*/) override { + ASSERT_FALSE(_failed); + ASSERT_NE(0, error_code); + _failed = true; } - bool split_closed_and_failed() const override { return true; } - bool failed() const { return _failed; } bool stopped() const { return _stopped; } int idle_times() const { return _idle_times; } @@ -679,9 +490,10 @@ class PingPongHandler2 : public brpc::StreamInputHandler { int _idle_times; }; -template -void StreamingRpcTest::TestPingPong(brpc::StreamOptions& opt, - Handler& resh) { +TEST_F(StreamingRpcTest, ping_pong) { + PingPongHandler resh; + brpc::StreamOptions opt; + opt.handler = &resh; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; @@ -717,18 +529,6 @@ void StreamingRpcTest::TestPingPong(brpc::StreamOptions& opt, ASSERT_EQ(0, reqh.idle_times()); } -TEST_F(StreamingRpcTest, ping_pong) { - PingPongHandler resh; - brpc::StreamOptions opt; - opt.handler = &resh; - TestPingPong(opt, resh); - - PingPongHandler2 resh2; - brpc::StreamOptions opt2; - opt2.handler = &resh2; - TestPingPong(opt2, resh2); -} - class SendNAfterAcceptStream : public AfterAcceptStream { public: explicit SendNAfterAcceptStream(int n) @@ -745,9 +545,7 @@ class SendNAfterAcceptStream : public AfterAcceptStream { int _n; }; -template -void StreamingRpcTest::TestServerSendDataBeforeRunDone( - brpc::StreamOptions& request_stream_options, Handler& handler) { +TEST_F(StreamingRpcTest, server_send_data_before_run_done) { const int N = 10000; SendNAfterAcceptStream after_accept(N); brpc::StreamOptions opt; @@ -758,6 +556,9 @@ void StreamingRpcTest::TestServerSendDataBeforeRunDone( ASSERT_EQ(0, server.Start(9007, NULL)); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); + OrderedInputHandler handler; + brpc::StreamOptions request_stream_options; + request_stream_options.handler = &handler; brpc::StreamId request_stream; brpc::Controller cntl; ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); @@ -776,15 +577,3 @@ void StreamingRpcTest::TestServerSendDataBeforeRunDone( ASSERT_FALSE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); } - -TEST_F(StreamingRpcTest, server_send_data_before_run_done) { - OrderedInputHandler handler; - brpc::StreamOptions request_stream_options; - request_stream_options.handler = &handler; - TestServerSendDataBeforeRunDone(request_stream_options, handler); - - OrderedInputHandler2 handler2; - brpc::StreamOptions request_stream_options2; - request_stream_options2.handler = &handler2; - TestServerSendDataBeforeRunDone(request_stream_options2, handler2); -}