diff --git a/docs/cn/server.md b/docs/cn/server.md index 5e3fc42a6f..9606ab1d40 100644 --- a/docs/cn/server.md +++ b/docs/cn/server.md @@ -1017,7 +1017,7 @@ public: Server默认使用`DefaultRpcPBMessageFactory`。它是一个简单的工厂类,通过`new`来创建请求/响应message和`delete`来销毁请求/响应message。 -如果用户希望自定义创建销毁机制,可以实现`RpcPBMessages`(请求/响应message的封装)和`RpcPBMessageFactory`(工厂类),并通过`ServerOptions.rpc_pb_message_factory`。 +如果用户希望自定义创建销毁机制,可以实现`RpcPBMessages`(请求/响应message的封装)和`RpcPBMessageFactory`(工厂类),并设置`ServerOptions.rpc_pb_message_factory`为自定义的`RpcPBMessageFactory`。注意:server启动后,server拥有了`RpcPBMessageFactory`的所有权。 接口如下: @@ -1027,7 +1027,9 @@ Server默认使用`DefaultRpcPBMessageFactory`。它是一个简单的工厂类 class RpcPBMessages { public: virtual ~RpcPBMessages() = default; + // Get protobuf request message. virtual google::protobuf::Message* Request() = 0; + // Get protobuf response message. virtual google::protobuf::Message* Response() = 0; }; @@ -1035,12 +1037,26 @@ public: class RpcPBMessageFactory { public: virtual ~RpcPBMessageFactory() = default; + + // Get `RpcPBMessages' according to `service' and `method'. + // Common practice to create protobuf message: + // service.GetRequestPrototype(&method).New() -> request; + // service.GetResponsePrototype(&method).New() -> response. virtual RpcPBMessages* Get(const ::google::protobuf::Service& service, const ::google::protobuf::MethodDescriptor& method) = 0; - virtual void Return(RpcPBMessages* protobuf_message) = 0; + // Return `RpcPBMessages' to factory. + virtual void Return(RpcPBMessages* messages) = 0; }; ``` +### Protobuf arena + +Protobuf arena是一种Protobuf message内存管理机制,有着提高内存分配效率、减少内存碎片、对缓存友好等优点。详细信息见[C++ Arena Allocation Guide](https://protobuf.dev/reference/cpp/arenas/)。 + +如果用户希望使用protobuf arena来管理Protobuf message内存,可以设置`ServerOptions.rpc_pb_message_factory = brpc::GetArenaRpcPBMessageFactory();`,使用默认的`start_block_size`(256 bytes)和`max_block_size`(8192 bytes)来创建arena。用户可以调用`brpc::GetArenaRpcPBMessageFactory();`自定义arena大小。 + +注意:从Protobuf v3.14.0开始,[默认开启arena](https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0)。但是Protobuf v3.14.0之前的版本,用户需要再proto文件中加上选项:`option cc_enable_arenas = true;`,所以为了兼容性,可以统一都加上该选项。 + # FAQ ### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got EOF是什么意思 diff --git a/docs/en/server.md b/docs/en/server.md index 62480d2801..1c2923a6b3 100644 --- a/docs/en/server.md +++ b/docs/en/server.md @@ -1008,6 +1008,49 @@ public: ... ``` +## RPC Protobuf message factory + +`DefaultRpcPBMessageFactory' is used at server-side by default. It is a simple factory class that uses `new' to create request/response messages and `delete' to destroy request/response messages. Currently, the baidu_std protocol and HTTP protocol support this feature. + +Users can implement `RpcPBMessages' (encapsulation of request/response message) and `RpcPBMessageFactory' (factory class) to customize the creation and destruction mechanism of protobuf message, and then set to `ServerOptions.rpc_pb_message_factory`. Note: After the server is started, the server owns the `RpcPBMessageFactory`. + +The interface is as follows: + +```c++ +// Inherit this class to customize rpc protobuf messages, +// include request and response. +class RpcPBMessages { +public: + virtual ~RpcPBMessages() = default; + // Get protobuf request message. + virtual google::protobuf::Message* Request() = 0; + // Get protobuf response message. + virtual google::protobuf::Message* Response() = 0; +}; + +// Factory to manage `RpcPBMessages'. +class RpcPBMessageFactory { +public: + virtual ~RpcPBMessageFactory() = default; + // Get `RpcPBMessages' according to `service' and `method'. + // Common practice to create protobuf message: + // service.GetRequestPrototype(&method).New() -> request; + // service.GetResponsePrototype(&method).New() -> response. + virtual RpcPBMessages* Get(const ::google::protobuf::Service& service, + const ::google::protobuf::MethodDescriptor& method) = 0; + // Return `RpcPBMessages' to factory. + virtual void Return(RpcPBMessages* protobuf_message) = 0; +}; +``` + +### Protobuf arena + +Protobuf arena is a Protobuf message memory management mechanism with the advantages of improving memory allocation efficiency, reducing memory fragmentation, and being cache-friendly. For more information, see [C++ Arena Allocation Guide](https://protobuf.dev/reference/cpp/arenas/). + +Users can set `ServerOptions.rpc_pb_message_factory = brpc::GetArenaRpcPBMessageFactory();` to manage Protobuf message memory, with the default `start_block_size` (256 bytes) and `max_block_size` (8192 bytes). Alternatively, users can use `brpc::GetArenaRpcPBMessageFactory();` to customize the arena size. + +Note: Since Protocol Buffers v3.14.0, Arenas are now unconditionally enabled. However, for versions prior to Protobuf v3.14.0, users need to add the option `option cc_enable_arenas = true;` to the proto file. so for compatibility, this option can be added uniformly. + # FAQ ### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got EOF diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index 52fff2c9cf..f8820d44ef 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -710,30 +710,33 @@ friend class HttpResponseSenderAsDone; HttpResponseSender() : HttpResponseSender(NULL) {} explicit HttpResponseSender(Controller* cntl/*own*/) - : _cntl(cntl), _method_status(NULL), _received_us(0), _h2_stream_id(-1) {} + : _cntl(cntl) + , _messages(NULL) + , _method_status(NULL) + , _received_us(0) + , _h2_stream_id(-1) {} + HttpResponseSender(HttpResponseSender&& s) noexcept : _cntl(std::move(s._cntl)) - , _req(std::move(s._req)) - , _res(std::move(s._res)) + , _messages(s._messages) , _method_status(s._method_status) , _received_us(s._received_us) , _h2_stream_id(s._h2_stream_id) { + s._messages = NULL; s._method_status = NULL; s._received_us = 0; s._h2_stream_id = -1; } ~HttpResponseSender(); - void own_request(google::protobuf::Message* req) { _req.reset(req); } - void own_response(google::protobuf::Message* res) { _res.reset(res); } + void set_messages(RpcPBMessages* messages) { _messages = messages; } void set_method_status(MethodStatus* ms) { _method_status = ms; } void set_received_us(int64_t t) { _received_us = t; } void set_h2_stream_id(int id) { _h2_stream_id = id; } private: std::unique_ptr _cntl; - std::unique_ptr _req; - std::unique_ptr _res; + RpcPBMessages* _messages; MethodStatus* _method_status; int64_t _received_us; int _h2_stream_id; @@ -743,7 +746,8 @@ class HttpResponseSenderAsDone : public google::protobuf::Closure { public: explicit HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {} void Run() override { - _sender._cntl->CallAfterRpcResp(_sender._req.get(), _sender._res.get()); + _sender._cntl->CallAfterRpcResp( + _sender._messages->Request(), _sender._messages->Response()); delete this; } @@ -752,6 +756,12 @@ class HttpResponseSenderAsDone : public google::protobuf::Closure { }; HttpResponseSender::~HttpResponseSender() { + // Return messages to factory at the end. + BRPC_SCOPE_EXIT { + if (NULL != _messages) { + _cntl->server()->options().rpc_pb_message_factory->Return(_messages); + } + }; Controller* cntl = _cntl.get(); if (cntl == NULL) { return; @@ -763,7 +773,7 @@ HttpResponseSender::~HttpResponseSender() { } ConcurrencyRemover concurrency_remover(_method_status, cntl, _received_us); Socket* socket = accessor.get_sending_socket(); - const google::protobuf::Message* res = _res.get(); + const google::protobuf::Message* res = NULL != _messages ? _messages->Response() : NULL; if (cntl->IsCloseConnection()) { socket->SetFailed(); @@ -1488,10 +1498,10 @@ void ProcessHttpRequest(InputMessageBase *msg) { google::protobuf::Service* svc = sp->service; const google::protobuf::MethodDescriptor* method = sp->method; accessor.set_method(method); - google::protobuf::Message* req = svc->GetRequestPrototype(method).New(); - resp_sender.own_request(req); - google::protobuf::Message* res = svc->GetResponsePrototype(method).New(); - resp_sender.own_response(res); + RpcPBMessages* messages = server->options().rpc_pb_message_factory->Get(*svc, *method);; + resp_sender.set_messages(messages); + google::protobuf::Message* req = messages->Request(); + google::protobuf::Message* res = messages->Response(); if (__builtin_expect(!req || !res, 0)) { PLOG(FATAL) << "Fail to new req or res"; diff --git a/src/brpc/rpc_pb_message_factory.cpp b/src/brpc/rpc_pb_message_factory.cpp index 828a289d8d..27e2457e05 100644 --- a/src/brpc/rpc_pb_message_factory.cpp +++ b/src/brpc/rpc_pb_message_factory.cpp @@ -16,7 +16,6 @@ // under the License. #include "brpc/rpc_pb_message_factory.h" -#include "butil/object_pool.h" namespace brpc { diff --git a/src/brpc/rpc_pb_message_factory.h b/src/brpc/rpc_pb_message_factory.h index 0da0ff2a67..52787be1e4 100644 --- a/src/brpc/rpc_pb_message_factory.h +++ b/src/brpc/rpc_pb_message_factory.h @@ -21,6 +21,8 @@ #include #include #include +#include +#include "butil/object_pool.h" namespace brpc { @@ -29,7 +31,9 @@ namespace brpc { class RpcPBMessages { public: virtual ~RpcPBMessages() = default; + // Get protobuf request message. virtual google::protobuf::Message* Request() = 0; + // Get protobuf response message. virtual google::protobuf::Message* Response() = 0; }; @@ -37,9 +41,14 @@ class RpcPBMessages { class RpcPBMessageFactory { public: virtual ~RpcPBMessageFactory() = default; + // Get `RpcPBMessages' according to `service' and `method'. + // Common practice to create protobuf message: + // service.GetRequestPrototype(&method).New() -> request; + // service.GetResponsePrototype(&method).New() -> response. virtual RpcPBMessages* Get(const ::google::protobuf::Service& service, const ::google::protobuf::MethodDescriptor& method) = 0; - virtual void Return(RpcPBMessages* protobuf_message) = 0; + // Return `RpcPBMessages' to factory. + virtual void Return(RpcPBMessages* messages) = 0; }; class DefaultRpcPBMessageFactory : public RpcPBMessageFactory { @@ -49,6 +58,80 @@ class DefaultRpcPBMessageFactory : public RpcPBMessageFactory { void Return(RpcPBMessages* messages) override; }; +namespace internal { + +// Allocate protobuf message from arena. +// The arena is created with `StartBlockSize' and `MaxBlockSize' options. +// For more details, see `google::protobuf::ArenaOptions'. +template +struct ArenaRpcPBMessages : public RpcPBMessages { + struct ArenaOptionsWrapper { + public: + ArenaOptionsWrapper() { + options.start_block_size = StartBlockSize; + options.max_block_size = MaxBlockSize; + } + + private: + friend struct ArenaRpcPBMessages; + ::google::protobuf::ArenaOptions options; + }; + + explicit ArenaRpcPBMessages(ArenaOptionsWrapper options_wrapper) + : arena(options_wrapper.options) + , request(NULL) + , response(NULL) {} + + ::google::protobuf::Message* Request() override { return request; } + ::google::protobuf::Message* Response() override { return response; } + + ::google::protobuf::Arena arena; + ::google::protobuf::Message* request; + ::google::protobuf::Message* response; +}; + +template +class ArenaRpcPBMessageFactory : public RpcPBMessageFactory { + typedef ::brpc::internal::ArenaRpcPBMessages + ArenaRpcPBMessages; +public: + ArenaRpcPBMessageFactory() { + _arena_options.start_block_size = StartBlockSize; + _arena_options.max_block_size = MaxBlockSize; + } + + RpcPBMessages* Get(const ::google::protobuf::Service& service, + const ::google::protobuf::MethodDescriptor& method) override { + typename ArenaRpcPBMessages::ArenaOptionsWrapper options_wrapper; + auto messages = butil::get_object(options_wrapper); + messages->request = service.GetRequestPrototype(&method).New(&messages->arena); + messages->response = service.GetResponsePrototype(&method).New(&messages->arena); + return messages; + } + + void Return(RpcPBMessages* messages) override { + auto arena_messages = static_cast(messages); + arena_messages->request = NULL; + arena_messages->response = NULL; + butil::return_object(arena_messages); + } + +private: + ::google::protobuf::ArenaOptions _arena_options; +}; + +} + +template +RpcPBMessageFactory* GetArenaRpcPBMessageFactory() { + return new ::brpc::internal::ArenaRpcPBMessageFactory(); +} + +BUTIL_FORCE_INLINE RpcPBMessageFactory* GetArenaRpcPBMessageFactory() { + // Default arena options, same as `google::protobuf::ArenaOptions'. + return GetArenaRpcPBMessageFactory<256, 8192>(); +} + } // namespace brpc -#endif //BRPC_RPC_PB_MESSAGE_FACTORY_H +#endif // BRPC_RPC_PB_MESSAGE_FACTORY_H diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp index 5f06887a52..c740a5d52b 100644 --- a/test/brpc_server_unittest.cpp +++ b/test/brpc_server_unittest.cpp @@ -56,6 +56,7 @@ #include "echo.pb.h" #include "v1.pb.h" #include "v2.pb.h" +#include "v3.pb.h" int main(int argc, char* argv[]) { testing::InitGoogleTest(&argc, argv); @@ -350,6 +351,18 @@ class EchoServiceV2 : public v2::EchoService { butil::atomic ncalled; }; +class EchoServiceV3 : public v3::EchoService { +public: + void Echo(::google::protobuf::RpcController*, + const v3::EchoRequest* request, + v3::EchoResponse* response, + ::google::protobuf::Closure* done) override { + brpc::ClosureGuard done_guard(done); + ASSERT_EQ(request->message(), EXP_REQUEST); + response->set_message(EXP_RESPONSE); + } +}; + TEST_F(ServerTest, empty_enabled_protocols) { butil::EndPoint ep; ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); @@ -1782,8 +1795,8 @@ class TestRpcPBMessageFactory : public brpc::RpcPBMessageFactory { brpc::RpcPBMessages* Get(const google::protobuf::Service& service, const google::protobuf::MethodDescriptor& method) override { auto messages = butil::get_object(); - auto request = butil::get_object(); - auto response = butil::get_object(); + auto request = butil::get_object(); + auto response = butil::get_object(); request->clear_message(); response->clear_message(); messages->request = request; @@ -1793,8 +1806,8 @@ class TestRpcPBMessageFactory : public brpc::RpcPBMessageFactory { void Return(brpc::RpcPBMessages* messages) override { auto test_messages = static_cast(messages); - butil::return_object(static_cast(test_messages->request)); - butil::return_object(static_cast(test_messages->response)); + butil::return_object(static_cast(test_messages->request)); + butil::return_object(static_cast(test_messages->response)); test_messages->request = NULL; test_messages->response = NULL; butil::return_object(test_messages); @@ -1805,24 +1818,85 @@ TEST_F(ServerTest, rpc_pb_message_factory) { butil::EndPoint ep; ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); brpc::Server server; - EchoServiceImpl service; + EchoServiceV1 service; ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); brpc::ServerOptions opt; opt.rpc_pb_message_factory = new TestRpcPBMessageFactory; ASSERT_EQ(0, server.Start(ep, &opt)); - brpc::Channel chan; - brpc::ChannelOptions copt; - copt.protocol = "baidu_std"; - ASSERT_EQ(0, chan.Init(ep, &copt)); - brpc::Controller cntl; - test::EchoRequest req; - test::EchoResponse res; - req.set_message(EXP_REQUEST); - test::EchoService_Stub stub(&chan); - stub.Echo(&cntl, &req, &res, NULL); - ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); - ASSERT_EQ(EXP_RESPONSE, res.message()); + brpc::Channel baidu_chan; + brpc::ChannelOptions baidu_copt; + baidu_copt.protocol = "baidu_std"; + ASSERT_EQ(0, baidu_chan.Init(ep, &baidu_copt)); + for (int i = 0; i < 1000; ++i) { + brpc::Controller cntl; + v1::EchoRequest req; + v1::EchoResponse res; + req.set_message(EXP_REQUEST); + v1::EchoService_Stub stub(&baidu_chan); + stub.Echo(&cntl, &req, &res, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + ASSERT_EQ(EXP_REQUEST + "_v1", res.message()); + } + + brpc::Channel http_chan; + brpc::ChannelOptions http_copt; + http_copt.protocol = "http"; + ASSERT_EQ(0, http_chan.Init(ep, &http_copt)); + for (int i = 0; i < 1000; ++i) { + brpc::Controller cntl; + cntl.request_attachment().append( + butil::string_printf(R"({"message":"%s"})", EXP_REQUEST.c_str())); + v1::EchoService_Stub stub(&http_chan); + stub.Echo(&cntl, NULL, NULL, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + ASSERT_EQ(butil::string_printf(R"({"message":"%s_v1"})", EXP_REQUEST.c_str()), + cntl.response_attachment().to_string()); + } + + ASSERT_EQ(0, server.Stop(0)); + ASSERT_EQ(0, server.Join()); +} + +TEST_F(ServerTest, arena_rpc_pb_message_factory) { + butil::EndPoint ep; + ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); + brpc::Server server; + EchoServiceV3 service; + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + brpc::ServerOptions opt; + opt.rpc_pb_message_factory = brpc::GetArenaRpcPBMessageFactory(); + ASSERT_EQ(0, server.Start(ep, &opt)); + + brpc::Channel baidu_chan; + brpc::ChannelOptions baidu_copt; + baidu_copt.protocol = "baidu_std"; + ASSERT_EQ(0, baidu_chan.Init(ep, &baidu_copt)); + for (int i = 0; i < 1000; ++i) { + brpc::Controller cntl; + v3::EchoRequest req; + v3::EchoResponse res; + req.set_message(EXP_REQUEST); + v3::EchoService_Stub stub(&baidu_chan); + stub.Echo(&cntl, &req, &res, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + ASSERT_EQ(EXP_RESPONSE, res.message()); + } + + brpc::Channel http_chan; + brpc::ChannelOptions http_copt; + http_copt.protocol = "http"; + ASSERT_EQ(0, http_chan.Init(ep, &http_copt)); + for (int i = 0; i < 1000; ++i) { + brpc::Controller cntl; + cntl.request_attachment().append( + butil::string_printf(R"({"message":"%s"})", EXP_REQUEST.c_str())); + v3::EchoService_Stub stub(&http_chan); + stub.Echo(&cntl, NULL, NULL, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + ASSERT_EQ(butil::string_printf(R"({"message":"%s"})", EXP_RESPONSE.c_str()), + cntl.response_attachment().to_string()); + } ASSERT_EQ(0, server.Stop(0)); ASSERT_EQ(0, server.Join()); diff --git a/test/v3.proto b/test/v3.proto new file mode 100644 index 0000000000..b920832a0d --- /dev/null +++ b/test/v3.proto @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax="proto3"; + +package v3; + +option cc_generic_services = true; +// https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0 +// Since Protocol Buffers v3.14.0, Arenas are now unconditionally enabled. +// cc_enable_arenas no longer has any effect. +option cc_enable_arenas = true; + +message EchoRequest { + string message = 1; +}; + +message EchoResponse { + string message = 1; +}; + +service EchoService { + rpc Echo(EchoRequest) returns (EchoResponse); +};