Skip to content

Commit

Permalink
Support rpc protobuf message factory interface
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Jul 30, 2024
1 parent 64ce760 commit e20d804
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 47 deletions.
99 changes: 67 additions & 32 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
#include "butil/time.h"
#include "butil/iobuf.h" // butil::IOBuf
#include "butil/raw_pack.h" // RawPacker RawUnpacker
#include "butil/memory/scope_guard.h"
#include "brpc/controller.h" // Controller
#include "brpc/socket.h" // Socket
#include "brpc/server.h" // Server
#include "brpc/span.h"
#include "brpc/compress.h" // ParseFromCompressedData
#include "brpc/stream_impl.h"
#include "brpc/rpc_dump.h" // SampledRequest
#include "brpc/rpc_pb_message_factory.h"
#include "brpc/policy/baidu_rpc_meta.pb.h" // RpcRequestMeta
#include "brpc/policy/baidu_rpc_protocol.h"
#include "brpc/policy/most_common_message.h"
Expand Down Expand Up @@ -157,11 +159,34 @@ static bool SerializeResponse(const google::protobuf::Message& res,
return true;
}

namespace {
struct BaiduProxyPBMessages : public RpcPBMessages {
static BaiduProxyPBMessages* Get() {
return butil::get_object<BaiduProxyPBMessages>();
}

static void Return(BaiduProxyPBMessages* messages) {
messages->Clear();
butil::return_object(messages);
}

void Clear() {
request.Clear();
response.Clear();
}

::google::protobuf::Message* Request() override { return &request; }
::google::protobuf::Message* Response() override { return &response; }

SerializedRequest request;
SerializedResponse response;
};
}

// Used by UT, can't be static.
void SendRpcResponse(int64_t correlation_id,
Controller* cntl,
const google::protobuf::Message* req,
const google::protobuf::Message* res,
Controller* cntl,
RpcPBMessages* messages,
const Server* server,
MethodStatus* method_status,
int64_t received_us) {
Expand All @@ -172,13 +197,24 @@ void SendRpcResponse(int64_t correlation_id,
}
Socket* sock = accessor.get_sending_socket();

std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);

std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);

ClosureGuard guard(brpc::NewCallback(cntl, &Controller::CallAfterRpcResp, req, res));
auto messages_guard = butil::MakeScopeGuard([server, messages] {
if (NULL == messages) {
return;
}
if (NULL != server->options().baidu_master_service) {
BaiduProxyPBMessages::Return(static_cast<BaiduProxyPBMessages*>(messages));
} else {
server->options().rpc_pb_message_factory->Return(messages);
}
});

const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request();
const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response();
ClosureGuard guard(brpc::NewCallback(
cntl, &Controller::CallAfterRpcResp, req, res));

StreamId response_stream_id = accessor.response_stream();

Expand Down Expand Up @@ -375,8 +411,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
return;
}

std::unique_ptr<google::protobuf::Message> req;
std::unique_ptr<google::protobuf::Message> res;
RpcPBMessages* messages = NULL;

ServerPrivateAccessor server_accessor(server);
ControllerPrivateAccessor accessor(cntl.get());
Expand Down Expand Up @@ -496,13 +531,10 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
span->ResetServerSpanName(sampled_request->meta.method_name());
}

auto serialized_request = (SerializedRequest*)
svc->GetRequestPrototype(NULL).New();
req.reset(serialized_request);
res.reset(svc->GetResponsePrototype(NULL).New());

msg->payload.cutn(&serialized_request->serialized_data(),
req_size - meta.attachment_size());
messages = BaiduProxyPBMessages::Get();
msg->payload.cutn(
&((SerializedRequest*)messages->Request())->serialized_data(),
req_size - meta.attachment_size());
if (!msg->payload.empty()) {
cntl->request_attachment().swap(msg->payload);
}
Expand Down Expand Up @@ -568,26 +600,25 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
}

auto req_cmp_type = static_cast<CompressType>(meta.compress_type());
req.reset(svc->GetRequestPrototype(method).New());
if (!ParseFromCompressedData(req_buf, req.get(), req_cmp_type)) {
messages = server->options().rpc_pb_message_factory->Get(*svc, *method);
if (!ParseFromCompressedData(req_buf, messages->Request(), req_cmp_type)) {
cntl->SetFailed(EREQUEST, "Fail to parse request message, "
"CompressType=%s, request_size=%d",
CompressTypeToCStr(req_cmp_type), req_size);
server->options().rpc_pb_message_factory->Return(messages);
break;
}

res.reset(svc->GetResponsePrototype(method).New());
req_buf.clear();
}

// `socket' will be held until response has been sent
google::protobuf::Closure* done = ::brpc::NewCallback<
int64_t, Controller*, const google::protobuf::Message*,
const google::protobuf::Message*, const Server*,
MethodStatus*, int64_t>(
&SendRpcResponse, meta.correlation_id(), cntl.get(),
req.get(), res.get(), server,
method_status, msg->received_us());
int64_t, Controller*, RpcPBMessages*,
const Server*, MethodStatus*, int64_t>(&SendRpcResponse,
meta.correlation_id(),
cntl.get(), messages,
server, method_status,
msg->received_us());

// optional, just release resource ASAP
msg.reset();
Expand All @@ -598,24 +629,28 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
}
if (!FLAGS_usercode_in_pthread) {
return svc->CallMethod(method, cntl.release(),
req.release(), res.release(), done);
messages->Request(),
messages->Response(), done);
}
if (BeginRunningUserCode()) {
svc->CallMethod(method, cntl.release(),
req.release(), res.release(), done);
messages->Request(),
messages->Response(), done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(
svc, method, cntl.release(),
req.release(), res.release(), done);
messages->Request(),
messages->Response(), done);
}
} while (false);

// `cntl', `req' and `res' will be deleted inside `SendRpcResponse'
// `socket' will be held until response has been sent
SendRpcResponse(meta.correlation_id(), cntl.release(),
req.release(), res.release(), server,
method_status, msg->received_us());
SendRpcResponse(meta.correlation_id(),
cntl.release(), messages,
server, method_status,
msg->received_us());
}

bool VerifyRpcRequest(const InputMessageBase* msg_base) {
Expand Down
58 changes: 58 additions & 0 deletions src/brpc/rpc_pb_message_factory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "brpc/rpc_pb_message_factory.h"
#include "butil/object_pool.h"

namespace brpc {

struct DefaultRpcPBMessages : public RpcPBMessages {
DefaultRpcPBMessages() : request(NULL), response(NULL) {}
::google::protobuf::Message* Request() override { return request; }
::google::protobuf::Message* Response() override { return response; }

::google::protobuf::Message* request;
::google::protobuf::Message* response;
};

class DefaultRpcPBMessageFactory : public RpcPBMessageFactory {
public:
RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) override {
auto messages = butil::get_object<DefaultRpcPBMessages>();
messages->request = service.GetRequestPrototype(&method).New();
messages->response = service.GetResponsePrototype(&method).New();
return messages;
}

void Return(RpcPBMessages* messages) override {
auto default_messages = static_cast<DefaultRpcPBMessages*>(messages);
delete default_messages->request;
delete default_messages->response;
default_messages->request = NULL;
default_messages->response = NULL;
butil::return_object(default_messages);
}
};

RpcPBMessageFactory* GetDefaultRpcPBMessageFactory() {
static DefaultRpcPBMessageFactory factory;
return &factory;
}


} // namespace brpc
50 changes: 50 additions & 0 deletions src/brpc/rpc_pb_message_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef BRPC_RPC_PB_MESSAGE_FACTORY_H
#define BRPC_RPC_PB_MESSAGE_FACTORY_H

#include <google/protobuf/service.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>

namespace brpc {

// Inherit this class to customize rpc protobuf messages,
// include request and response.
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
virtual google::protobuf::Message* Request() = 0;
virtual google::protobuf::Message* Response() = 0;
};

// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) = 0;
virtual void Return(RpcPBMessages* protobuf_message) = 0;
};

// Get the `RpcPBMessageFactory' used by `Server'.
RpcPBMessageFactory* GetDefaultRpcPBMessageFactory();

} // namespace brpc

#endif //BRPC_RPC_PB_MESSAGE_FACTORY_H
8 changes: 7 additions & 1 deletion src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ ServerOptions::ServerOptions()
, health_reporter(NULL)
, rtmp_service(NULL)
, redis_service(NULL)
, bthread_tag(BTHREAD_TAG_INVALID) {
, bthread_tag(BTHREAD_TAG_INVALID)
, rpc_pb_message_factory(GetDefaultRpcPBMessageFactory()) {
if (s_ncore > 0) {
num_threads = s_ncore + 1;
}
Expand Down Expand Up @@ -449,6 +450,11 @@ Server::~Server() {
delete _options.http_master_service;
_options.http_master_service = NULL;

if (_options.rpc_pb_message_factory != GetDefaultRpcPBMessageFactory()) {
delete _options.rpc_pb_message_factory;
_options.rpc_pb_message_factory = NULL;
}

delete _am;
_am = NULL;
delete _internal_am;
Expand Down
10 changes: 10 additions & 0 deletions src/brpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "brpc/interceptor.h"
#include "brpc/concurrency_limiter.h"
#include "brpc/baidu_master_service.h"
#include "brpc/rpc_pb_message_factory.h"

namespace brpc {

Expand Down Expand Up @@ -277,6 +278,15 @@ struct ServerOptions {
// Default: BTHREAD_TAG_DEFAULT
bthread_tag_t bthread_tag;

// [CAUTION] This option is for implementing specialized rpc protobuf
// message factory, most users don't need it. Don't change this option
// unless you fully understand the description below.
// If this option is set, all baidu-std rpc request message and response
// message will be created by this factory.
//
// Owned by Server and deleted in server's destructor.
RpcPBMessageFactory* rpc_pb_message_factory;

private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ServerOptions from being bloated in most cases.
Expand Down
27 changes: 13 additions & 14 deletions test/brpc_channel_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ DECLARE_int32(max_connection_pool_size);
class Server;
class MethodStatus;
namespace policy {
void SendRpcResponse(int64_t correlation_id, Controller* cntl,
const google::protobuf::Message* req,
const google::protobuf::Message* res,
const Server* server_raw, MethodStatus *, int64_t);
void SendRpcResponse(int64_t correlation_id,
Controller* cntl,
RpcPBMessages* messages,
const Server* server_raw,
MethodStatus *, int64_t);
} // policy
} // brpc

Expand Down Expand Up @@ -255,8 +256,10 @@ class ChannelTest : public ::testing::Test{
ASSERT_EQ(ts->_svc.descriptor()->full_name(), req_meta.service_name());
const google::protobuf::MethodDescriptor* method =
ts->_svc.descriptor()->FindMethodByName(req_meta.method_name());
google::protobuf::Message* req =
ts->_svc.GetRequestPrototype(method).New();
brpc::RpcPBMessages* messages =
ts->_dummy.options().rpc_pb_message_factory->Get(ts->_svc, *method);
google::protobuf::Message* req = messages->Request();
google::protobuf::Message* res = messages->Response();
if (meta.attachment_size() != 0) {
butil::IOBuf req_buf;
msg->payload.cutn(&req_buf, msg->payload.size() - meta.attachment_size());
Expand All @@ -271,18 +274,14 @@ class ChannelTest : public ::testing::Test{
cntl->_current_call.sending_sock.reset(ptr.release());
cntl->_server = &ts->_dummy;

google::protobuf::Message* res =
ts->_svc.GetResponsePrototype(method).New();
google::protobuf::Closure* done =
brpc::NewCallback<
int64_t, brpc::Controller*,
const google::protobuf::Message*,
const google::protobuf::Message*,
brpc::RpcPBMessages*,
const brpc::Server*,
brpc::MethodStatus*, int64_t>(
&brpc::policy::SendRpcResponse,
meta.correlation_id(), cntl, req, res,
&ts->_dummy, NULL, -1);
brpc::MethodStatus*, int64_t>(&brpc::policy::SendRpcResponse,
meta.correlation_id(), cntl,
messages, &ts->_dummy, NULL, -1);
ts->_svc.CallMethod(method, cntl, req, res, done);
}

Expand Down
Loading

0 comments on commit e20d804

Please sign in to comment.