Skip to content

Commit

Permalink
Support rpc protobuf message factory interface (#2718)
Browse files Browse the repository at this point in the history
* Support rpc protobuf message factory interface

* Update cn/server.md
  • Loading branch information
chenBright authored Aug 26, 2024
1 parent 9a1a9b2 commit 48996cf
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 47 deletions.
28 changes: 28 additions & 0 deletions docs/cn/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,34 @@ public:
...
```
## RPC Protobuf message factory
Server默认使用`DefaultRpcPBMessageFactory`。它是一个简单的工厂类,通过`new`来创建请求/响应message和`delete`来销毁请求/响应message。
如果用户希望自定义创建销毁机制,可以实现`RpcPBMessages`(请求/响应message的封装)和`RpcPBMessageFactory`(工厂类),并通过`ServerOptions.rpc_pb_message_factory`。
接口如下:
```c++
// Inherit this class to customize rpc protobuf messages,
// include request and response.
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
virtual google::protobuf::Message* Request() = 0;
virtual google::protobuf::Message* Response() = 0;
};
// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) = 0;
virtual void Return(RpcPBMessages* protobuf_message) = 0;
};
```

# FAQ

### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got EOF是什么意思
Expand Down
99 changes: 67 additions & 32 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
#include "butil/time.h"
#include "butil/iobuf.h" // butil::IOBuf
#include "butil/raw_pack.h" // RawPacker RawUnpacker
#include "butil/memory/scope_guard.h"
#include "brpc/controller.h" // Controller
#include "brpc/socket.h" // Socket
#include "brpc/server.h" // Server
#include "brpc/span.h"
#include "brpc/compress.h" // ParseFromCompressedData
#include "brpc/stream_impl.h"
#include "brpc/rpc_dump.h" // SampledRequest
#include "brpc/rpc_pb_message_factory.h"
#include "brpc/policy/baidu_rpc_meta.pb.h" // RpcRequestMeta
#include "brpc/policy/baidu_rpc_protocol.h"
#include "brpc/policy/most_common_message.h"
Expand Down Expand Up @@ -157,11 +159,34 @@ static bool SerializeResponse(const google::protobuf::Message& res,
return true;
}

namespace {
struct BaiduProxyPBMessages : public RpcPBMessages {
static BaiduProxyPBMessages* Get() {
return butil::get_object<BaiduProxyPBMessages>();
}

static void Return(BaiduProxyPBMessages* messages) {
messages->Clear();
butil::return_object(messages);
}

void Clear() {
request.Clear();
response.Clear();
}

::google::protobuf::Message* Request() override { return &request; }
::google::protobuf::Message* Response() override { return &response; }

SerializedRequest request;
SerializedResponse response;
};
}

// Used by UT, can't be static.
void SendRpcResponse(int64_t correlation_id,
Controller* cntl,
const google::protobuf::Message* req,
const google::protobuf::Message* res,
Controller* cntl,
RpcPBMessages* messages,
const Server* server,
MethodStatus* method_status,
int64_t received_us) {
Expand All @@ -172,13 +197,24 @@ void SendRpcResponse(int64_t correlation_id,
}
Socket* sock = accessor.get_sending_socket();

std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);

std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);

ClosureGuard guard(brpc::NewCallback(cntl, &Controller::CallAfterRpcResp, req, res));
auto messages_guard = butil::MakeScopeGuard([server, messages] {
if (NULL == messages) {
return;
}
if (NULL != server->options().baidu_master_service) {
BaiduProxyPBMessages::Return(static_cast<BaiduProxyPBMessages*>(messages));
} else {
server->options().rpc_pb_message_factory->Return(messages);
}
});

const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request();
const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response();
ClosureGuard guard(brpc::NewCallback(
cntl, &Controller::CallAfterRpcResp, req, res));

StreamId response_stream_id = accessor.response_stream();

Expand Down Expand Up @@ -375,8 +411,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
return;
}

std::unique_ptr<google::protobuf::Message> req;
std::unique_ptr<google::protobuf::Message> res;
RpcPBMessages* messages = NULL;

ServerPrivateAccessor server_accessor(server);
ControllerPrivateAccessor accessor(cntl.get());
Expand Down Expand Up @@ -496,13 +531,10 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
span->ResetServerSpanName(sampled_request->meta.method_name());
}

auto serialized_request = (SerializedRequest*)
svc->GetRequestPrototype(NULL).New();
req.reset(serialized_request);
res.reset(svc->GetResponsePrototype(NULL).New());

msg->payload.cutn(&serialized_request->serialized_data(),
req_size - meta.attachment_size());
messages = BaiduProxyPBMessages::Get();
msg->payload.cutn(
&((SerializedRequest*)messages->Request())->serialized_data(),
req_size - meta.attachment_size());
if (!msg->payload.empty()) {
cntl->request_attachment().swap(msg->payload);
}
Expand Down Expand Up @@ -568,26 +600,25 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
}

auto req_cmp_type = static_cast<CompressType>(meta.compress_type());
req.reset(svc->GetRequestPrototype(method).New());
if (!ParseFromCompressedData(req_buf, req.get(), req_cmp_type)) {
messages = server->options().rpc_pb_message_factory->Get(*svc, *method);
if (!ParseFromCompressedData(req_buf, messages->Request(), req_cmp_type)) {
cntl->SetFailed(EREQUEST, "Fail to parse request message, "
"CompressType=%s, request_size=%d",
CompressTypeToCStr(req_cmp_type), req_size);
server->options().rpc_pb_message_factory->Return(messages);
break;
}

res.reset(svc->GetResponsePrototype(method).New());
req_buf.clear();
}

// `socket' will be held until response has been sent
google::protobuf::Closure* done = ::brpc::NewCallback<
int64_t, Controller*, const google::protobuf::Message*,
const google::protobuf::Message*, const Server*,
MethodStatus*, int64_t>(
&SendRpcResponse, meta.correlation_id(), cntl.get(),
req.get(), res.get(), server,
method_status, msg->received_us());
int64_t, Controller*, RpcPBMessages*,
const Server*, MethodStatus*, int64_t>(&SendRpcResponse,
meta.correlation_id(),
cntl.get(), messages,
server, method_status,
msg->received_us());

// optional, just release resource ASAP
msg.reset();
Expand All @@ -598,24 +629,28 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
}
if (!FLAGS_usercode_in_pthread) {
return svc->CallMethod(method, cntl.release(),
req.release(), res.release(), done);
messages->Request(),
messages->Response(), done);
}
if (BeginRunningUserCode()) {
svc->CallMethod(method, cntl.release(),
req.release(), res.release(), done);
messages->Request(),
messages->Response(), done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(
svc, method, cntl.release(),
req.release(), res.release(), done);
messages->Request(),
messages->Response(), done);
}
} while (false);

// `cntl', `req' and `res' will be deleted inside `SendRpcResponse'
// `socket' will be held until response has been sent
SendRpcResponse(meta.correlation_id(), cntl.release(),
req.release(), res.release(), server,
method_status, msg->received_us());
SendRpcResponse(meta.correlation_id(),
cntl.release(), messages,
server, method_status,
msg->received_us());
}

bool VerifyRpcRequest(const InputMessageBase* msg_base) {
Expand Down
51 changes: 51 additions & 0 deletions src/brpc/rpc_pb_message_factory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "brpc/rpc_pb_message_factory.h"
#include "butil/object_pool.h"

namespace brpc {

struct DefaultRpcPBMessages : public RpcPBMessages {
DefaultRpcPBMessages() : request(NULL), response(NULL) {}
::google::protobuf::Message* Request() override { return request; }
::google::protobuf::Message* Response() override { return response; }

::google::protobuf::Message* request;
::google::protobuf::Message* response;
};


RpcPBMessages* DefaultRpcPBMessageFactory::Get(
const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) {
auto messages = butil::get_object<DefaultRpcPBMessages>();
messages->request = service.GetRequestPrototype(&method).New();
messages->response = service.GetResponsePrototype(&method).New();
return messages;
}

void DefaultRpcPBMessageFactory::Return(RpcPBMessages* messages) {
auto default_messages = static_cast<DefaultRpcPBMessages*>(messages);
delete default_messages->request;
delete default_messages->response;
default_messages->request = NULL;
default_messages->response = NULL;
butil::return_object(default_messages);
}

} // namespace brpc
54 changes: 54 additions & 0 deletions src/brpc/rpc_pb_message_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef BRPC_RPC_PB_MESSAGE_FACTORY_H
#define BRPC_RPC_PB_MESSAGE_FACTORY_H

#include <google/protobuf/service.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>

namespace brpc {

// Inherit this class to customize rpc protobuf messages,
// include request and response.
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
virtual google::protobuf::Message* Request() = 0;
virtual google::protobuf::Message* Response() = 0;
};

// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) = 0;
virtual void Return(RpcPBMessages* protobuf_message) = 0;
};

class DefaultRpcPBMessageFactory : public RpcPBMessageFactory {
public:
RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) override;
void Return(RpcPBMessages* messages) override;
};

} // namespace brpc

#endif //BRPC_RPC_PB_MESSAGE_FACTORY_H
6 changes: 5 additions & 1 deletion src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ ServerOptions::ServerOptions()
, health_reporter(NULL)
, rtmp_service(NULL)
, redis_service(NULL)
, bthread_tag(BTHREAD_TAG_INVALID) {
, bthread_tag(BTHREAD_TAG_INVALID)
, rpc_pb_message_factory(new DefaultRpcPBMessageFactory()) {
if (s_ncore > 0) {
num_threads = s_ncore + 1;
}
Expand Down Expand Up @@ -449,6 +450,9 @@ Server::~Server() {
delete _options.http_master_service;
_options.http_master_service = NULL;

delete _options.rpc_pb_message_factory;
_options.rpc_pb_message_factory = NULL;

delete _am;
_am = NULL;
delete _internal_am;
Expand Down
10 changes: 10 additions & 0 deletions src/brpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "brpc/interceptor.h"
#include "brpc/concurrency_limiter.h"
#include "brpc/baidu_master_service.h"
#include "brpc/rpc_pb_message_factory.h"

namespace brpc {

Expand Down Expand Up @@ -277,6 +278,15 @@ struct ServerOptions {
// Default: BTHREAD_TAG_DEFAULT
bthread_tag_t bthread_tag;

// [CAUTION] This option is for implementing specialized rpc protobuf
// message factory, most users don't need it. Don't change this option
// unless you fully understand the description below.
// If this option is set, all baidu-std rpc request message and response
// message will be created by this factory.
//
// Owned by Server and deleted in server's destructor.
RpcPBMessageFactory* rpc_pb_message_factory;

private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ServerOptions from being bloated in most cases.
Expand Down
Loading

0 comments on commit 48996cf

Please sign in to comment.