Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support arena rpc pb message factory #2751

Merged
merged 3 commits into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions docs/cn/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ public:

Server默认使用`DefaultRpcPBMessageFactory`。它是一个简单的工厂类,通过`new`来创建请求/响应message和`delete`来销毁请求/响应message。

如果用户希望自定义创建销毁机制,可以实现`RpcPBMessages`(请求/响应message的封装)和`RpcPBMessageFactory`(工厂类),并通过`ServerOptions.rpc_pb_message_factory`。
如果用户希望自定义创建销毁机制,可以实现`RpcPBMessages`(请求/响应message的封装)和`RpcPBMessageFactory`(工厂类),并设置`ServerOptions.rpc_pb_message_factory`为自定义的`RpcPBMessageFactory`。注意:server启动后,server拥有了`RpcPBMessageFactory`的所有权

接口如下:

Expand All @@ -1027,20 +1027,36 @@ Server默认使用`DefaultRpcPBMessageFactory`。它是一个简单的工厂类
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
// Get protobuf request message.
virtual google::protobuf::Message* Request() = 0;
// Get protobuf response message.
virtual google::protobuf::Message* Response() = 0;
};

// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;

// Get `RpcPBMessages' according to `service' and `method'.
// Common practice to create protobuf message:
// service.GetRequestPrototype(&method).New() -> request;
// service.GetRequestPrototype(&method).New() -> response.
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
chenBright marked this conversation as resolved.
Show resolved Hide resolved
const ::google::protobuf::MethodDescriptor& method) = 0;
virtual void Return(RpcPBMessages* protobuf_message) = 0;
// Return `RpcPBMessages' to factory.
virtual void Return(RpcPBMessages* messages) = 0;
};
```

### Protobuf arena

chenBright marked this conversation as resolved.
Show resolved Hide resolved
Protobuf arena是一种Protobuf message内存管理机制,有着提高内存分配效率、减少内存碎片、对缓存友好等优点。详细信息见[C++ Arena Allocation Guide](https://protobuf.dev/reference/cpp/arenas/)。

如果用户希望使用protobuf arena来管理Protobuf message内存,可以设置`ServerOptions.rpc_pb_message_factory = brpc::GetArenaRpcPBMessageFactory();`,使用默认的`start_block_size`(256 bytes)和`max_block_size`(8192 bytes)来创建arena。用户可以调用`brpc::GetArenaRpcPBMessageFactory<StartBlockSize, MaxBlockSize>();`自定义arena大小。

注意:从Protobuf v3.14.0开始,[默认开启arena](https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0)。但是Protobuf v3.14.0之前的版本,用户需要再proto文件中加上选项:`option cc_enable_arenas = true;`,所以为了兼容性,可以统一都加上该选项。

# FAQ

### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got EOF是什么意思
Expand Down
43 changes: 43 additions & 0 deletions docs/en/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,49 @@ public:
...
```

## RPC Protobuf message factory

`DefaultRpcPBMessageFactory' is used at server-side by default. It is a simple factory class that uses `new' to create request/response messages and `delete' to destroy request/response messages. Currently, the baidu_std protocol and HTTP protocol support this feature.

Users can implement `RpcPBMessages' (encapsulation of request/response message) and `RpcPBMessageFactory' (factory class) to customize the creation and destruction mechanism of protobuf message, and then set to `ServerOptions.rpc_pb_message_factory`. Note: After the server is started, the server owns the `RpcPBMessageFactory`.

The interface is as follows:

```c++
// Inherit this class to customize rpc protobuf messages,
// include request and response.
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
// Get protobuf request message.
virtual google::protobuf::Message* Request() = 0;
// Get protobuf response message.
virtual google::protobuf::Message* Response() = 0;
};

// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;
// Get `RpcPBMessages' according to `service' and `method'.
// Common practice to create protobuf message:
// service.GetRequestPrototype(&method).New() -> request;
// service.GetRequestPrototype(&method).New() -> response.
chenBright marked this conversation as resolved.
Show resolved Hide resolved
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) = 0;
// Return `RpcPBMessages' to factory.
virtual void Return(RpcPBMessages* protobuf_message) = 0;
};
```

### Protobuf arena

Protobuf arena is a Protobuf message memory management mechanism with the advantages of improving memory allocation efficiency, reducing memory fragmentation, and being cache-friendly. For more information, see [C++ Arena Allocation Guide](https://protobuf.dev/reference/cpp/arenas/).

Users can set `ServerOptions.rpc_pb_message_factory = brpc::GetArenaRpcPBMessageFactory();` to manage Protobuf message memory, with the default `start_block_size` (256 bytes) and `max_block_size` (8192 bytes). Alternatively, users can use `brpc::GetArenaRpcPBMessageFactory<StartBlockSize, MaxBlockSize>();` to customize the arena size.

Note: Since Protocol Buffers v3.14.0, Arenas are now unconditionally enabled. However, for versions prior to Protobuf v3.14.0, users need to add the option `option cc_enable_arenas = true;` to the proto file. so for compatibility, this option can be added uniformly.

# FAQ

### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got EOF
Expand Down
36 changes: 23 additions & 13 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,30 +710,33 @@ friend class HttpResponseSenderAsDone;
HttpResponseSender()
: HttpResponseSender(NULL) {}
explicit HttpResponseSender(Controller* cntl/*own*/)
: _cntl(cntl), _method_status(NULL), _received_us(0), _h2_stream_id(-1) {}
: _cntl(cntl)
, _messages(NULL)
, _method_status(NULL)
, _received_us(0)
, _h2_stream_id(-1) {}

HttpResponseSender(HttpResponseSender&& s) noexcept
: _cntl(std::move(s._cntl))
, _req(std::move(s._req))
, _res(std::move(s._res))
, _messages(s._messages)
, _method_status(s._method_status)
, _received_us(s._received_us)
, _h2_stream_id(s._h2_stream_id) {
s._messages = NULL;
s._method_status = NULL;
s._received_us = 0;
s._h2_stream_id = -1;
}
~HttpResponseSender();

void own_request(google::protobuf::Message* req) { _req.reset(req); }
void own_response(google::protobuf::Message* res) { _res.reset(res); }
void set_messages(RpcPBMessages* messages) { _messages = messages; }
void set_method_status(MethodStatus* ms) { _method_status = ms; }
void set_received_us(int64_t t) { _received_us = t; }
void set_h2_stream_id(int id) { _h2_stream_id = id; }

private:
std::unique_ptr<Controller, LogErrorTextAndDelete> _cntl;
std::unique_ptr<google::protobuf::Message> _req;
std::unique_ptr<google::protobuf::Message> _res;
RpcPBMessages* _messages;
MethodStatus* _method_status;
int64_t _received_us;
int _h2_stream_id;
Expand All @@ -743,7 +746,8 @@ class HttpResponseSenderAsDone : public google::protobuf::Closure {
public:
explicit HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
void Run() override {
_sender._cntl->CallAfterRpcResp(_sender._req.get(), _sender._res.get());
_sender._cntl->CallAfterRpcResp(
_sender._messages->Request(), _sender._messages->Response());
delete this;
}

Expand All @@ -752,6 +756,12 @@ class HttpResponseSenderAsDone : public google::protobuf::Closure {
};

HttpResponseSender::~HttpResponseSender() {
// Return messages to factory at the end.
BRPC_SCOPE_EXIT {
if (NULL != _messages) {
_cntl->server()->options().rpc_pb_message_factory->Return(_messages);
}
};
Controller* cntl = _cntl.get();
if (cntl == NULL) {
return;
Expand All @@ -763,7 +773,7 @@ HttpResponseSender::~HttpResponseSender() {
}
ConcurrencyRemover concurrency_remover(_method_status, cntl, _received_us);
Socket* socket = accessor.get_sending_socket();
const google::protobuf::Message* res = _res.get();
const google::protobuf::Message* res = NULL != _messages ? _messages->Response() : NULL;

if (cntl->IsCloseConnection()) {
socket->SetFailed();
Expand Down Expand Up @@ -1488,10 +1498,10 @@ void ProcessHttpRequest(InputMessageBase *msg) {
google::protobuf::Service* svc = sp->service;
const google::protobuf::MethodDescriptor* method = sp->method;
accessor.set_method(method);
google::protobuf::Message* req = svc->GetRequestPrototype(method).New();
resp_sender.own_request(req);
google::protobuf::Message* res = svc->GetResponsePrototype(method).New();
resp_sender.own_response(res);
RpcPBMessages* messages = server->options().rpc_pb_message_factory->Get(*svc, *method);;
resp_sender.set_messages(messages);
google::protobuf::Message* req = messages->Request();
google::protobuf::Message* res = messages->Response();

if (__builtin_expect(!req || !res, 0)) {
PLOG(FATAL) << "Fail to new req or res";
Expand Down
1 change: 0 additions & 1 deletion src/brpc/rpc_pb_message_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

#include "brpc/rpc_pb_message_factory.h"
#include "butil/object_pool.h"

namespace brpc {

Expand Down
87 changes: 85 additions & 2 deletions src/brpc/rpc_pb_message_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <google/protobuf/service.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>
#include <google/protobuf/arena.h>
#include "butil/object_pool.h"

namespace brpc {

Expand All @@ -29,17 +31,24 @@ namespace brpc {
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
// Get protobuf request message.
virtual google::protobuf::Message* Request() = 0;
// Get protobuf response message.
virtual google::protobuf::Message* Response() = 0;
};

// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;
// Get `RpcPBMessages' according to `service' and `method'.
// Common practice to create protobuf message:
// service.GetRequestPrototype(&method).New() -> request;
// service.GetRequestPrototype(&method).New() -> response.
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) = 0;
virtual void Return(RpcPBMessages* protobuf_message) = 0;
// Return `RpcPBMessages' to factory.
virtual void Return(RpcPBMessages* messages) = 0;
};

class DefaultRpcPBMessageFactory : public RpcPBMessageFactory {
Expand All @@ -49,6 +58,80 @@ class DefaultRpcPBMessageFactory : public RpcPBMessageFactory {
void Return(RpcPBMessages* messages) override;
};

namespace internal {

// Allocate protobuf message from arena.
// The arena is created with `StartBlockSize' and `MaxBlockSize' options.
// For more details, see `google::protobuf::ArenaOptions'.
template<size_t StartBlockSize, size_t MaxBlockSize>
struct ArenaRpcPBMessages : public RpcPBMessages {
struct ArenaOptionsWrapper {
public:
ArenaOptionsWrapper() {
options.start_block_size = StartBlockSize;
options.max_block_size = MaxBlockSize;
}

private:
friend struct ArenaRpcPBMessages;
::google::protobuf::ArenaOptions options;
};

explicit ArenaRpcPBMessages(ArenaOptionsWrapper options_wrapper)
: arena(options_wrapper.options)
, request(NULL)
, response(NULL) {}

::google::protobuf::Message* Request() override { return request; }
::google::protobuf::Message* Response() override { return response; }

::google::protobuf::Arena arena;
::google::protobuf::Message* request;
::google::protobuf::Message* response;
};

template<size_t StartBlockSize, size_t MaxBlockSize>
class ArenaRpcPBMessageFactory : public RpcPBMessageFactory {
typedef ::brpc::internal::ArenaRpcPBMessages<StartBlockSize, MaxBlockSize>
ArenaRpcPBMessages;
public:
ArenaRpcPBMessageFactory() {
_arena_options.start_block_size = StartBlockSize;
_arena_options.max_block_size = MaxBlockSize;
}

RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) override {
typename ArenaRpcPBMessages::ArenaOptionsWrapper options_wrapper;
auto messages = butil::get_object<ArenaRpcPBMessages>(options_wrapper);
messages->request = service.GetRequestPrototype(&method).New(&messages->arena);
messages->response = service.GetResponsePrototype(&method).New(&messages->arena);
return messages;
}

void Return(RpcPBMessages* messages) override {
auto arena_messages = static_cast<ArenaRpcPBMessages*>(messages);
arena_messages->request = NULL;
arena_messages->response = NULL;
butil::return_object(arena_messages);
}

private:
::google::protobuf::ArenaOptions _arena_options;
};

}

template<size_t StartBlockSize, size_t MaxBlockSize>
RpcPBMessageFactory* GetArenaRpcPBMessageFactory() {
return new ::brpc::internal::ArenaRpcPBMessageFactory<StartBlockSize, MaxBlockSize>();
}

BUTIL_FORCE_INLINE RpcPBMessageFactory* GetArenaRpcPBMessageFactory() {
// Default arena options, same as `google::protobuf::ArenaOptions'.
return GetArenaRpcPBMessageFactory<256, 8192>();
}

} // namespace brpc

#endif //BRPC_RPC_PB_MESSAGE_FACTORY_H
#endif // BRPC_RPC_PB_MESSAGE_FACTORY_H
Loading
Loading