diff --git a/.gitignore b/.gitignore index bdd210096b..9a60889afa 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,6 @@ CTestTestfile.cmake /test/curl.out /test/out.txt /test/recordio_ref.io + +# Ignore protoc-gen-mcpack files +/protoc-gen-mcpack*/ \ No newline at end of file diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index bfe278ffb9..2336601523 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1387,7 +1387,8 @@ void Controller::HandleStreamConnection(Socket *host_socket) { } } if (FailedInline()) { - Stream::SetFailed(_request_stream); + Stream::SetFailed(_request_stream, _error_code, + "%s", _error_text.c_str()); if (_remote_stream_settings != NULL) { policy::SendStreamRst(host_socket, _remote_stream_settings->stream_id()); diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index b19fbb379b..6fa17d6ca2 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -254,11 +254,13 @@ void SendRpcResponse(int64_t correlation_id, accessor.remote_stream_settings()->stream_id(), accessor.response_stream()) != 0) { const int errcode = errno; - PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; - cntl->SetFailed(errcode, "Fail to write into %s", - sock->description().c_str()); + std::string error_text = butil::string_printf(64, "Fail to write into %s", + sock->description().c_str()); + PLOG_IF(WARNING, errcode != EPIPE) << error_text; + cntl->SetFailed(errcode, "%s", error_text.c_str()); if(stream_ptr) { - ((Stream*)stream_ptr->conn())->Close(); + ((Stream*)stream_ptr->conn())->Close(errcode, "%s", + error_text.c_str()); } return; } diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 1f9b2a2e9b..edc3dee0b0 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -986,7 +986,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { &_id_wait_list, error_code, error_text, &_id_wait_list_mutex)); - ResetAllStreams(); + ResetAllStreams(error_code, error_text); // _app_connect shouldn't be set to NULL in SetFailed otherwise // HC is always not supported. // FIXME: Design a better interface for AppConnect @@ -2541,7 +2541,7 @@ int Socket::RemoveStream(StreamId stream_id) { return 0; } -void Socket::ResetAllStreams() { +void Socket::ResetAllStreams(int error_code, const std::string& error_text) { DCHECK(Failed()); std::set saved_stream_set; _stream_mutex.lock(); @@ -2552,9 +2552,9 @@ void Socket::ResetAllStreams() { saved_stream_set.swap(*_stream_set); } _stream_mutex.unlock(); - for (std::set::const_iterator - it = saved_stream_set.begin(); it != saved_stream_set.end(); ++it) { - Stream::SetFailed(*it); + for (auto it = saved_stream_set.begin(); + it != saved_stream_set.end(); ++it) { + Stream::SetFailed(*it, error_code, "%s", error_text.c_str()); } } diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 9d85aafaff..faf6baac64 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -706,7 +706,7 @@ friend void DereferenceSocket(Socket*); // broken socket. int AddStream(StreamId stream_id); int RemoveStream(StreamId stream_id); - void ResetAllStreams(); + void ResetAllStreams(int error_code, const std::string& error_text); bool ValidFileDescriptor(int fd); diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index b311c4b780..27f87c6f49 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -44,6 +44,7 @@ Stream::Stream() , _fake_socket_weak_ref(NULL) , _connected(false) , _closed(false) + , _error_code(0) , _produced(0) , _remote_consumed(0) , _cur_buf_size(0) @@ -74,6 +75,7 @@ int Stream::Create(const StreamOptions &options, s->_connected = false; s->_options = options; s->_closed = false; + s->_error_code = 0; s->_cur_buf_size = options.max_buf_size > 0 ? options.max_buf_size : 0; if (options.max_buf_size > 0 && options.min_buf_size > options.max_buf_size) { // set 0 if min_buf_size is invalid. @@ -131,7 +133,7 @@ void Stream::BeforeRecycle(Socket *) { if (_host_socket) { _host_socket->RemoveStream(id()); } - + // The instance is to be deleted in the consumer thread bthread::execution_queue_stop(_consumer_queue); } @@ -466,21 +468,22 @@ int Stream::OnReceived(const StreamFrameMeta& fm, butil::IOBuf *buf, Socket* soc if (!fm.has_continuation()) { butil::IOBuf *tmp = _pending_buf; _pending_buf = NULL; - if (bthread::execution_queue_execute(_consumer_queue, tmp) != 0) { + int rc = bthread::execution_queue_execute(_consumer_queue, tmp); + if (rc != 0) { CHECK(false) << "Fail to push into channel"; delete tmp; - Close(); + Close(rc, "Fail to push into channel"); } } break; case FRAME_TYPE_RST: RPC_VLOG << "stream=" << id() << " received rst frame"; - Close(); + Close(ECONNRESET, "Received RST frame"); break; case FRAME_TYPE_CLOSE: RPC_VLOG << "stream=" << id() << " received close frame"; // TODO:: See the comments in Consume - Close(); + Close(0, "Received CLOSE frame"); break; case FRAME_TYPE_UNKNOWN: RPC_VLOG << "Received unknown frame"; @@ -517,7 +520,7 @@ class MessageBatcher { _total_length += buf->length(); } - size_t total_length() { return _total_length; } + size_t total_length() const { return _total_length; } private: butil::IOBuf** _storage; size_t _cap; @@ -530,15 +533,26 @@ int Stream::Consume(void *meta, bthread::TaskIterator& iter) { Stream* s = (Stream*)meta; s->StopIdleTimer(); if (iter.is_queue_stopped()) { - // indicating the queue was closed + scoped_ptr recycled_stream(s); + // Indicating the queue was closed. if (s->_host_socket) { DereferenceSocket(s->_host_socket); s->_host_socket = NULL; } if (s->_options.handler != NULL) { + int error_code; + std::string error_text; + { + BAIDU_SCOPED_LOCK(s->_connect_mutex); + error_code = s->_error_code; + error_text = s->_error_text; + } + if (error_code != 0) { + // The stream is closed abnormally. + s->_options.handler->on_failed(s->id(), error_code, error_text); + } s->_options.handler->on_closed(s->id()); } - delete s; return 0; } DEFINE_SMALL_ARRAY(butil::IOBuf*, buf_list, s->_options.messages_in_batch, 256); @@ -630,7 +644,7 @@ void Stream::StopIdleTimer() { } } -void Stream::Close() { +void Stream::Close(int error_code, const char* reason_fmt, ...) { _fake_socket_weak_ref->SetFailed(); bthread_mutex_lock(&_connect_mutex); if (_closed) { @@ -638,6 +652,13 @@ void Stream::Close() { return; } _closed = true; + _error_code = error_code; + + va_list ap; + va_start(ap, reason_fmt); + butil::string_vappendf(&_error_text, reason_fmt, ap); + va_end(ap); + if (_connected) { bthread_mutex_unlock(&_connect_mutex); return; @@ -647,14 +668,17 @@ void Stream::Close() { return TriggerOnConnectIfNeed(); } -int Stream::SetFailed(StreamId id) { +int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) { SocketUniquePtr ptr; if (Socket::AddressFailedAsWell(id, &ptr) == -1) { // Don't care recycled stream return 0; } Stream* s = (Stream*)ptr->conn(); - s->Close(); + va_list ap; + va_start(ap, reason_fmt); + s->Close(error_code, reason_fmt, ap); + va_end(ap); return 0; } @@ -665,13 +689,13 @@ void Stream::HandleRpcResponse(butil::IOBuf* response_buffer) { ParseResult pr = policy::ParseRpcMessage(response_buffer, NULL, true, NULL); if (!pr.is_ok()) { CHECK(false); - Close(); + Close(EPROTO, "Fail to parse rpc response message"); return; } InputMessageBase* msg = pr.message(); if (msg == NULL) { CHECK(false); - Close(); + Close(ENOMEM, "Message is NULL"); return; } _host_socket->PostponeEOF(); @@ -730,7 +754,7 @@ int StreamWait(StreamId stream_id, const timespec* due_time) { } int StreamClose(StreamId stream_id) { - return Stream::SetFailed(stream_id); + return Stream::SetFailed(stream_id, 0, "Local close"); } int StreamCreate(StreamId *request_stream, Controller &cntl, diff --git a/src/brpc/stream.h b/src/brpc/stream.h index 90965f371f..f222ba0940 100644 --- a/src/brpc/stream.h +++ b/src/brpc/stream.h @@ -44,7 +44,11 @@ class StreamInputHandler { butil::IOBuf *const messages[], size_t size) = 0; virtual void on_idle_timeout(StreamId id) = 0; - virtual void on_closed(StreamId id) = 0; + virtual void on_closed(StreamId id) = 0; + // `on_failed` will be called before `on_closed` + // when the stream is closed abnormally. + virtual void on_failed(StreamId id, int error_code, + const std::string& error_text) {} }; struct StreamOptions { @@ -82,8 +86,7 @@ struct StreamOptions { StreamInputHandler* handler; }; -struct StreamWriteOptions -{ +struct StreamWriteOptions { StreamWriteOptions() : write_in_background(false) {} // Write message to socket in background thread. diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h index f24b75a352..db92dd63d6 100644 --- a/src/brpc/stream_impl.h +++ b/src/brpc/stream_impl.h @@ -61,13 +61,16 @@ class BAIDU_CACHELINE_ALIGNMENT Stream : public SocketConnection { const timespec *due_time); int Wait(const timespec* due_time); void FillSettings(StreamSettings *settings); - static int SetFailed(StreamId id); - void Close(); + static int SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) + __attribute__ ((__format__ (__printf__, 3, 4))); + void Close(int error_code, const char* reason_fmt, ...) + __attribute__ ((__format__ (__printf__, 3, 4))); private: friend void StreamWait(StreamId stream_id, const timespec *due_time, - void (*on_writable)(StreamId, void*, int), void *arg); + void (*on_writable)(StreamId, void*, int), void *arg); friend class MessageBatcher; +friend struct butil::DefaultDeleter; Stream(); ~Stream(); int Init(const StreamOptions options); @@ -111,6 +114,8 @@ friend class MessageBatcher; ConnectMeta _connect_meta; bool _connected; bool _closed; + int _error_code; + std::string _error_text; bthread_mutex_t _congestion_control_mutex; size_t _produced; diff --git a/test/brpc_streaming_rpc_unittest.cpp b/test/brpc_streaming_rpc_unittest.cpp index f7e62c8114..df6a37d888 100644 --- a/test/brpc_streaming_rpc_unittest.cpp +++ b/test/brpc_streaming_rpc_unittest.cpp @@ -20,11 +20,12 @@ // Date: 2015/10/22 16:28:44 #include - #include "brpc/server.h" + #include "brpc/controller.h" #include "brpc/channel.h" #include "brpc/stream_impl.h" +#include "brpc/policy/streaming_rpc_protocol.h" #include "echo.pb.h" class AfterAcceptStream { @@ -69,10 +70,11 @@ class MyServiceWithStream : public test::EchoService { class StreamingRpcTest : public testing::Test { protected: - test::EchoRequest request; - test::EchoResponse response; void SetUp() { request.set_message("hello world"); } void TearDown() {} + + test::EchoRequest request; + test::EchoResponse response; }; TEST_F(StreamingRpcTest, sanity) { @@ -96,7 +98,7 @@ TEST_F(StreamingRpcTest, sanity) { } struct HandlerControl { - HandlerControl() + HandlerControl() : block(false) {} bool block; @@ -110,11 +112,11 @@ class OrderedInputHandler : public brpc::StreamInputHandler { , _stopped(false) , _idle_times(0) , _cntl(cntl) - { - } + {} + int on_received_messages(brpc::StreamId /*id*/, butil::IOBuf *const messages[], - size_t size) { + size_t size) override { if (_cntl && _cntl->block) { while (_cntl->block) { usleep(100); @@ -129,15 +131,22 @@ class OrderedInputHandler : public brpc::StreamInputHandler { return 0; } - void on_idle_timeout(brpc::StreamId /*id*/) { + void on_idle_timeout(brpc::StreamId /*id*/) override { ++_idle_times; } - void on_closed(brpc::StreamId /*id*/) { + void on_closed(brpc::StreamId /*id*/) override { ASSERT_FALSE(_stopped); _stopped = true; } + void on_failed(brpc::StreamId id, int error_code, + const std::string& /*error_text*/) override { + ASSERT_FALSE(_failed); + ASSERT_NE(0, error_code); + _failed = true; + } + bool failed() const { return _failed; } bool stopped() const { return _stopped; } int idle_times() const { return _idle_times; } @@ -196,8 +205,8 @@ void on_writable(brpc::StreamId, void* arg, int error_code) { TEST_F(StreamingRpcTest, block) { HandlerControl hc; - OrderedInputHandler handler(&hc); hc.block = true; + OrderedInputHandler handler(&hc); brpc::StreamOptions opt; opt.handler = &handler; const int N = 10000; @@ -216,7 +225,7 @@ TEST_F(StreamingRpcTest, block) { ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); test::EchoService_Stub stub(&channel); stub.Echo(&cntl, &request, &response, NULL); - ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; for (int i = 0; i < N; ++i) { int network = htonl(i); @@ -295,8 +304,8 @@ TEST_F(StreamingRpcTest, block) { TEST_F(StreamingRpcTest, auto_close_if_host_socket_closed) { HandlerControl hc; - OrderedInputHandler handler(&hc); hc.block = true; + OrderedInputHandler handler(&hc); brpc::StreamOptions opt; opt.handler = &handler; const int N = 10000; @@ -332,15 +341,63 @@ TEST_F(StreamingRpcTest, auto_close_if_host_socket_closed) { while (!handler.stopped()) { usleep(100); } - ASSERT_FALSE(handler.failed()); + ASSERT_TRUE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); ASSERT_EQ(0, handler._expected_next_value); } +TEST_F(StreamingRpcTest, failed_when_rst) { + OrderedInputHandler handler; + brpc::StreamOptions opt; + opt.handler = &handler; + opt.messages_in_batch = 100; + brpc::Server server; + MyServiceWithStream service(opt); + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + ASSERT_EQ(0, server.Start(9007, NULL)); + brpc::Channel channel; + ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); + brpc::Controller cntl; + brpc::StreamId request_stream; + brpc::StreamOptions request_stream_options; + request_stream_options.max_buf_size = 0; + ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); + brpc::ScopedStream stream_guard(request_stream); + test::EchoService_Stub stub(&channel); + stub.Echo(&cntl, &request, &response, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; + const int N = 10000; + for (int i = 0; i < N; ++i) { + int network = htonl(i); + butil::IOBuf out; + out.append(&network, sizeof(network)); + ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i; + } + + usleep(1000 * 10); + { + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(request_stream, &ptr)); + brpc::Stream* s = (brpc::Stream*)ptr->conn(); + ASSERT_TRUE(s->_host_socket != NULL); + brpc::policy::SendStreamRst(s->_host_socket, + s->_remote_settings.stream_id()); + } + // ASSERT_EQ(0, brpc::StreamClose(request_stream)); + server.Stop(0); + server.Join(); + while (!handler.stopped() && !handler.failed()) { + usleep(100); + } + ASSERT_TRUE(handler.failed()); + ASSERT_EQ(0, handler.idle_times()); + ASSERT_EQ(N, handler._expected_next_value); +} + TEST_F(StreamingRpcTest, idle_timeout) { HandlerControl hc; - OrderedInputHandler handler(&hc); hc.block = true; + OrderedInputHandler handler(&hc); brpc::StreamOptions opt; opt.handler = &handler; opt.idle_timeout_ms = 2; @@ -383,7 +440,7 @@ class PingPongHandler : public brpc::StreamInputHandler { } int on_received_messages(brpc::StreamId id, butil::IOBuf *const messages[], - size_t size) { + size_t size) override { if (size != 1) { _failed = true; return 0; @@ -406,15 +463,23 @@ class PingPongHandler : public brpc::StreamInputHandler { return 0; } - void on_idle_timeout(brpc::StreamId /*id*/) { + void on_idle_timeout(brpc::StreamId /*id*/) override { ++_idle_times; } - void on_closed(brpc::StreamId /*id*/) { + void on_closed(brpc::StreamId /*id*/) override { ASSERT_FALSE(_stopped); _stopped = true; } + + void on_failed(brpc::StreamId id, int error_code, + const std::string& /*error_text*/) override { + ASSERT_FALSE(_failed); + ASSERT_NE(0, error_code); + _failed = true; + } + bool failed() const { return _failed; } bool stopped() const { return _stopped; } int idle_times() const { return _idle_times; } @@ -493,9 +558,9 @@ TEST_F(StreamingRpcTest, server_send_data_before_run_done) { ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); OrderedInputHandler handler; brpc::StreamOptions request_stream_options; + request_stream_options.handler = &handler; brpc::StreamId request_stream; brpc::Controller cntl; - request_stream_options.handler = &handler; ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); brpc::ScopedStream stream_guard(request_stream); test::EchoService_Stub stub(&channel);