From 485ca103931bccda2c1e69dddfea310e1c23aee2 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Mon, 11 Mar 2024 23:16:33 +0800 Subject: [PATCH] Call on_failed before on_closed --- src/brpc/stream.cpp | 33 +-- src/brpc/stream.h | 13 +- test/brpc_streaming_rpc_unittest.cpp | 299 ++++----------------------- 3 files changed, 58 insertions(+), 287 deletions(-) diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index 36ae9546d5..27f87c6f49 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -39,14 +39,6 @@ DECLARE_int64(socket_max_streams_unconsumed_bytes); const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L; -void StreamInputHandler::on_failed(StreamId id, int error_code, - const std::string& error_text) { - LOG(ERROR) << "`on_failed' should be override by user when " - "`split_closed_and_failed' returns true. " - "id=" << id << ", [" << error_code - << "] " << error_text; -} - Stream::Stream() : _host_socket(NULL) , _fake_socket_weak_ref(NULL) @@ -528,7 +520,7 @@ class MessageBatcher { _total_length += buf->length(); } - size_t total_length() { return _total_length; } + size_t total_length() const { return _total_length; } private: butil::IOBuf** _storage; size_t _cap; @@ -548,19 +540,16 @@ int Stream::Consume(void *meta, bthread::TaskIterator& iter) { s->_host_socket = NULL; } if (s->_options.handler != NULL) { - if (s->_options.handler->split_closed_and_failed()) { - // Split closed and failed according to the error code. - int error_code; - std::string error_text; - { - BAIDU_SCOPED_LOCK(s->_connect_mutex); - error_code = s->_error_code; - error_text = s->_error_text; - } - if (error_code != 0) { - s->_options.handler->on_failed(s->id(), error_code, error_text); - return 0; - } + int error_code; + std::string error_text; + { + BAIDU_SCOPED_LOCK(s->_connect_mutex); + error_code = s->_error_code; + error_text = s->_error_text; + } + if (error_code != 0) { + // The stream is closed abnormally. + s->_options.handler->on_failed(s->id(), error_code, error_text); } s->_options.handler->on_closed(s->id()); } diff --git a/src/brpc/stream.h b/src/brpc/stream.h index 3df3d77ce2..f222ba0940 100644 --- a/src/brpc/stream.h +++ b/src/brpc/stream.h @@ -44,18 +44,11 @@ class StreamInputHandler { butil::IOBuf *const messages[], size_t size) = 0; virtual void on_idle_timeout(StreamId id) = 0; - // 1. If `split_closed_and_failed` returns false(default), - // only `on_closed` will be called. - // 2. If `split_closed_and_failed` returns true, - // 2.1 `on_closed` will be called Whether `StreamClose` - // is called by local side or remote side. - // 2.2 `on_failed` will be called when the stream is - // closed abnormally. virtual void on_closed(StreamId id) = 0; + // `on_failed` will be called before `on_closed` + // when the stream is closed abnormally. virtual void on_failed(StreamId id, int error_code, - const std::string& error_text); - - virtual bool split_closed_and_failed() const { return false; } + const std::string& error_text) {} }; struct StreamOptions { diff --git a/test/brpc_streaming_rpc_unittest.cpp b/test/brpc_streaming_rpc_unittest.cpp index 5ce0479035..df6a37d888 100644 --- a/test/brpc_streaming_rpc_unittest.cpp +++ b/test/brpc_streaming_rpc_unittest.cpp @@ -68,43 +68,11 @@ class MyServiceWithStream : public test::EchoService { AfterAcceptStream* _after_accept_stream; }; -struct HandlerControl { - HandlerControl() - : block(false) - {} - bool block; -}; - class StreamingRpcTest : public testing::Test { protected: void SetUp() { request.set_message("hello world"); } void TearDown() {} - template - void TestReceivedInOrder(brpc::StreamOptions& opt, - Handler& handler); - - template - void TestBlock(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc); - - template - void TestAutoClose(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc); - - template - void TestIdleTimeout(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc); - - template - void TestPingPong(brpc::StreamOptions& opt, - Handler& handler); - - template - void TestServerSendDataBeforeRunDone(brpc::StreamOptions& opt, - Handler& handler); - - test::EchoRequest request; test::EchoResponse response; }; @@ -129,56 +97,16 @@ TEST_F(StreamingRpcTest, sanity) { server.Join(); } -class OrderedInputHandler : public brpc::StreamInputHandler { -public: - explicit OrderedInputHandler(HandlerControl *cntl = NULL) - : _expected_next_value(0) - , _failed(false) - , _stopped(false) - , _idle_times(0) - , _cntl(cntl) +struct HandlerControl { + HandlerControl() + : block(false) {} - - int on_received_messages(brpc::StreamId /*id*/, - butil::IOBuf *const messages[], - size_t size) override { - if (_cntl && _cntl->block) { - while (_cntl->block) { - usleep(100); - } - } - for (size_t i = 0; i < size; ++i) { - CHECK(messages[i]->length() == sizeof(int)); - int network = 0; - messages[i]->cutn(&network, sizeof(int)); - EXPECT_EQ((int)ntohl(network), _expected_next_value++); - } - return 0; - } - - void on_idle_timeout(brpc::StreamId /*id*/) override { - ++_idle_times; - } - - void on_closed(brpc::StreamId /*id*/) override { - ASSERT_FALSE(_stopped); - _stopped = true; - } - - bool failed() const { return _failed; } - bool stopped() const { return _stopped; } - int idle_times() const { return _idle_times; } -private: - int _expected_next_value; - bool _failed; - bool _stopped; - int _idle_times; - HandlerControl* _cntl; + bool block; }; -class OrderedInputHandler2 : public brpc::StreamInputHandler { +class OrderedInputHandler : public brpc::StreamInputHandler { public: - explicit OrderedInputHandler2(HandlerControl *cntl = NULL) + explicit OrderedInputHandler(HandlerControl *cntl = NULL) : _expected_next_value(0) , _failed(false) , _stopped(false) @@ -214,16 +142,11 @@ class OrderedInputHandler2 : public brpc::StreamInputHandler { void on_failed(brpc::StreamId id, int error_code, const std::string& /*error_text*/) override { - ASSERT_FALSE(_stopped || _failed); - if (error_code == 0) { - _stopped = true; - } else { - _failed = true; - } + ASSERT_FALSE(_failed); + ASSERT_NE(0, error_code); + _failed = true; } - bool split_closed_and_failed() const override { return true; } - bool failed() const { return _failed; } bool stopped() const { return _stopped; } int idle_times() const { return _idle_times; } @@ -235,9 +158,10 @@ class OrderedInputHandler2 : public brpc::StreamInputHandler { HandlerControl* _cntl; }; -template -void StreamingRpcTest::TestReceivedInOrder(brpc::StreamOptions& opt, - Handler& handler) { +TEST_F(StreamingRpcTest, received_in_order) { + OrderedInputHandler handler; + brpc::StreamOptions opt; + opt.handler = &handler; opt.messages_in_batch = 100; brpc::Server server; MyServiceWithStream service(opt); @@ -272,29 +196,19 @@ void StreamingRpcTest::TestReceivedInOrder(brpc::StreamOptions& opt, ASSERT_EQ(N, handler._expected_next_value); } -TEST_F(StreamingRpcTest, received_in_order) { - OrderedInputHandler handler; - brpc::StreamOptions opt; - opt.handler = &handler; - TestReceivedInOrder(opt, handler); - - OrderedInputHandler2 handler2; - brpc::StreamOptions opt2; - opt2.handler = &handler2; - TestReceivedInOrder(opt2, handler2); -} - void on_writable(brpc::StreamId, void* arg, int error_code) { std::pair* p = (std::pair*)arg; p->first = true; p->second = error_code; - // LOG(INFO) << "error_code=" << error_code; + LOG(INFO) << "error_code=" << error_code; } -template -void StreamingRpcTest::TestBlock(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc) { +TEST_F(StreamingRpcTest, block) { + HandlerControl hc; hc.block = true; + OrderedInputHandler handler(&hc); + brpc::StreamOptions opt; + opt.handler = &handler; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; @@ -388,34 +302,12 @@ void StreamingRpcTest::TestBlock(brpc::StreamOptions& opt, ASSERT_EQ(N + N + N, handler._expected_next_value); } -TEST_F(StreamingRpcTest, block) { +TEST_F(StreamingRpcTest, auto_close_if_host_socket_closed) { HandlerControl hc; + hc.block = true; OrderedInputHandler handler(&hc); brpc::StreamOptions opt; opt.handler = &handler; - TestBlock(opt, handler, hc); - - OrderedInputHandler2 handler2(&hc); - brpc::StreamOptions opt2; - opt2.handler = &handler2; - TestBlock(opt2, handler2, hc); -} - -void HandleFailed(OrderedInputHandler& handler) { - ASSERT_FALSE(handler.failed()); - ASSERT_TRUE(handler.stopped()); -} - -// brpc::StreamInputHandler2 is failed, not closed. -void HandleFailed(OrderedInputHandler2& handler) { - ASSERT_TRUE(handler.failed()); - ASSERT_FALSE(handler.stopped()); -} - -template -void StreamingRpcTest::TestAutoClose(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc) { - hc.block = true; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; @@ -439,36 +331,23 @@ void StreamingRpcTest::TestAutoClose(brpc::StreamOptions& opt, ASSERT_EQ(0, brpc::Socket::Address(request_stream, &ptr)); brpc::Stream* s = (brpc::Stream*)ptr->conn(); ASSERT_TRUE(s->_host_socket != NULL); - s->_host_socket->SetFailed(10000, ""); + s->_host_socket->SetFailed(); } usleep(100); butil::IOBuf out; out.append("test"); ASSERT_EQ(EINVAL, brpc::StreamWrite(request_stream, out)); - while (!handler.stopped() && !handler.failed()) { + while (!handler.stopped()) { usleep(100); } - HandleFailed(handler); + ASSERT_TRUE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); ASSERT_EQ(0, handler._expected_next_value); } -TEST_F(StreamingRpcTest, auto_close_if_host_socket_closed) { - HandlerControl hc; - OrderedInputHandler handler(&hc); - brpc::StreamOptions opt; - opt.handler = &handler; - TestAutoClose(opt, handler, hc); - - OrderedInputHandler2 handler2(&hc); - brpc::StreamOptions opt2; - opt2.handler = &handler2; - TestAutoClose(opt2, handler2, hc); -} - TEST_F(StreamingRpcTest, failed_when_rst) { - OrderedInputHandler2 handler; + OrderedInputHandler handler; brpc::StreamOptions opt; opt.handler = &handler; opt.messages_in_batch = 100; @@ -510,15 +389,17 @@ TEST_F(StreamingRpcTest, failed_when_rst) { while (!handler.stopped() && !handler.failed()) { usleep(100); } - HandleFailed(handler); + ASSERT_TRUE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); ASSERT_EQ(N, handler._expected_next_value); } -template -void StreamingRpcTest::TestIdleTimeout(brpc::StreamOptions& opt, - Handler& handler, HandlerControl& hc) { +TEST_F(StreamingRpcTest, idle_timeout) { + HandlerControl hc; hc.block = true; + OrderedInputHandler handler(&hc); + brpc::StreamOptions opt; + opt.handler = &handler; opt.idle_timeout_ms = 2; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; @@ -548,19 +429,6 @@ void StreamingRpcTest::TestIdleTimeout(brpc::StreamOptions& opt, ASSERT_EQ(0, handler._expected_next_value); } -TEST_F(StreamingRpcTest, idle_timeout) { - HandlerControl hc; - OrderedInputHandler handler(&hc); - brpc::StreamOptions opt; - opt.handler = &handler; - TestIdleTimeout(opt, handler, hc); - - OrderedInputHandler2 handler2(&hc); - brpc::StreamOptions opt2; - opt2.handler = &handler2; - TestIdleTimeout(opt2, handler2, hc); -} - class PingPongHandler : public brpc::StreamInputHandler { public: explicit PingPongHandler() @@ -570,59 +438,6 @@ class PingPongHandler : public brpc::StreamInputHandler { , _idle_times(0) { } - int on_received_messages(brpc::StreamId id, - butil::IOBuf *const messages[], - size_t size) { - if (size != 1) { - _failed = true; - return 0; - } - for (size_t i = 0; i < size; ++i) { - CHECK(messages[i]->length() == sizeof(int)); - int network = 0; - messages[i]->cutn(&network, sizeof(int)); - if ((int)ntohl(network) != _expected_next_value) { - _failed = true; - } - int send_back = ntohl(network) + 1; - _expected_next_value = send_back + 1; - butil::IOBuf out; - network = htonl(send_back); - out.append(&network, sizeof(network)); - // don't care the return value - brpc::StreamWrite(id, out); - } - return 0; - } - - void on_idle_timeout(brpc::StreamId /*id*/) { - ++_idle_times; - } - - void on_closed(brpc::StreamId /*id*/) { - ASSERT_FALSE(_stopped); - _stopped = true; - } - - bool failed() const { return _failed; } - bool stopped() const { return _stopped; } - int idle_times() const { return _idle_times; } -private: - int _expected_next_value; - bool _failed; - bool _stopped; - int _idle_times; -}; - -class PingPongHandler2 : public brpc::StreamInputHandler { -public: - explicit PingPongHandler2() - : _expected_next_value(0) - , _failed(false) - , _stopped(false) - , _idle_times(0) - {} - int on_received_messages(brpc::StreamId id, butil::IOBuf *const messages[], size_t size) override { @@ -657,18 +472,14 @@ class PingPongHandler2 : public brpc::StreamInputHandler { _stopped = true; } + void on_failed(brpc::StreamId id, int error_code, - const std::string& /*error_text*/) override { - ASSERT_FALSE(_stopped || _failed); - if (error_code == 0) { - _stopped = true; - } else { - _failed = true; - } + const std::string& /*error_text*/) override { + ASSERT_FALSE(_failed); + ASSERT_NE(0, error_code); + _failed = true; } - bool split_closed_and_failed() const override { return true; } - bool failed() const { return _failed; } bool stopped() const { return _stopped; } int idle_times() const { return _idle_times; } @@ -679,9 +490,10 @@ class PingPongHandler2 : public brpc::StreamInputHandler { int _idle_times; }; -template -void StreamingRpcTest::TestPingPong(brpc::StreamOptions& opt, - Handler& resh) { +TEST_F(StreamingRpcTest, ping_pong) { + PingPongHandler resh; + brpc::StreamOptions opt; + opt.handler = &resh; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; @@ -717,18 +529,6 @@ void StreamingRpcTest::TestPingPong(brpc::StreamOptions& opt, ASSERT_EQ(0, reqh.idle_times()); } -TEST_F(StreamingRpcTest, ping_pong) { - PingPongHandler resh; - brpc::StreamOptions opt; - opt.handler = &resh; - TestPingPong(opt, resh); - - PingPongHandler2 resh2; - brpc::StreamOptions opt2; - opt2.handler = &resh2; - TestPingPong(opt2, resh2); -} - class SendNAfterAcceptStream : public AfterAcceptStream { public: explicit SendNAfterAcceptStream(int n) @@ -745,9 +545,7 @@ class SendNAfterAcceptStream : public AfterAcceptStream { int _n; }; -template -void StreamingRpcTest::TestServerSendDataBeforeRunDone( - brpc::StreamOptions& request_stream_options, Handler& handler) { +TEST_F(StreamingRpcTest, server_send_data_before_run_done) { const int N = 10000; SendNAfterAcceptStream after_accept(N); brpc::StreamOptions opt; @@ -758,6 +556,9 @@ void StreamingRpcTest::TestServerSendDataBeforeRunDone( ASSERT_EQ(0, server.Start(9007, NULL)); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); + OrderedInputHandler handler; + brpc::StreamOptions request_stream_options; + request_stream_options.handler = &handler; brpc::StreamId request_stream; brpc::Controller cntl; ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); @@ -776,15 +577,3 @@ void StreamingRpcTest::TestServerSendDataBeforeRunDone( ASSERT_FALSE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); } - -TEST_F(StreamingRpcTest, server_send_data_before_run_done) { - OrderedInputHandler handler; - brpc::StreamOptions request_stream_options; - request_stream_options.handler = &handler; - TestServerSendDataBeforeRunDone(request_stream_options, handler); - - OrderedInputHandler2 handler2; - brpc::StreamOptions request_stream_options2; - request_stream_options2.handler = &handler2; - TestServerSendDataBeforeRunDone(request_stream_options2, handler2); -}