Skip to content

Commit

Permalink
[feat][WIP] add a method option to disable check eovercrowded
Browse files Browse the repository at this point in the history
  • Loading branch information
lianxuechao authored and lianxuechao committed Sep 30, 2024
1 parent f17048e commit 216dd33
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 6 deletions.
16 changes: 10 additions & 6 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
cntl->SetFailed(ELOGOFF, "Server is stopping");
break;
}

if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}

if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(
Expand Down Expand Up @@ -505,6 +499,11 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
google::protobuf::Service* svc = NULL;
google::protobuf::MethodDescriptor* method = NULL;
if (NULL != server->options().baidu_master_service) {
if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
svc = server->options().baidu_master_service;
auto sampled_request = new (std::nothrow) SampledRequest;
if (NULL == sampled_request) {
Expand Down Expand Up @@ -567,6 +566,11 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL);
break;
}
if (socket->is_overcrowded() && !server->options().ignore_eovercrowded && !mp->ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
// Switch to service-specific error.
non_service_error.release();
method_status = mp->status;
Expand Down
69 changes: 69 additions & 0 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ static bool OptionsAvailableOverRdma(const ServerOptions* opt) {
#endif

static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0);
static bool g_default_ignore_eovercrowded(false);

int Server::StartInternal(const butil::EndPoint& endpoint,
const PortRange& port_range,
Expand Down Expand Up @@ -2305,6 +2306,74 @@ int Server::MaxConcurrencyOf(google::protobuf::Service* service,
return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name);
}

bool& Server::IgnoreEovercrowdedOf(MethodProperty* mp) {
if (IsRunning()) {
LOG(WARNING) << "IgnoreEovercrowdedOf is only allowd before Server started";
return g_default_ignore_eovercrowded;
}
if (mp->status == NULL) {
LOG(ERROR) << "method=" << mp->method->full_name()
<< " does not support max_concurrency";
_failed_to_set_max_concurrency_of_method = true;
return g_default_ignore_eovercrowded;
}
return mp->ignore_eovercrowded;
}

bool Server::IgnoreEovercrowdedOf(const MethodProperty* mp) const {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started";
return g_default_ignore_eovercrowded;
}
if (mp == NULL || mp->status == NULL) {
return false;
}
return mp->ignore_eovercrowded;
}

bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) {
MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_ignore_eovercrowded;
}
return IgnoreEovercrowdedOf(mp);
}

bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const {
return IgnoreEovercrowdedOf(_method_map.seek(full_method_name));
}

bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) {
MethodProperty* mp = const_cast<MethodProperty*>(
FindMethodPropertyByFullName(full_service_name, method_name));
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_service_name
<< '/' << method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_ignore_eovercrowded;
}
return IgnoreEovercrowdedOf(mp);
}

bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) const {
return IgnoreEovercrowdedOf(FindMethodPropertyByFullName(
full_service_name, method_name));
}

bool& Server::IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) {
return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name);
}

bool Server::IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const {
return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name);
}

bool Server::AcceptRequest(Controller* cntl) const {
const Interceptor* interceptor = _options.interceptor;
if (!interceptor) {
Expand Down
18 changes: 18 additions & 0 deletions src/brpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ struct ServerOptions {
// Owned by Server and deleted in server's destructor.
RpcPBMessageFactory* rpc_pb_message_factory;

bool ignore_eovercrowded;

private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ServerOptions from being bloated in most cases.
Expand Down Expand Up @@ -412,6 +414,7 @@ class Server {
const google::protobuf::MethodDescriptor* method;
MethodStatus* status;
AdaptiveMaxConcurrency max_concurrency;
bool ignore_eovercrowded = false;

MethodProperty();
};
Expand Down Expand Up @@ -590,6 +593,19 @@ class Server {
int MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const;

bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name);
bool IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const;

bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name);
bool IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) const;

bool& IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name);
bool IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const;

int Concurrency() const {
return butil::subtle::NoBarrier_Load(&_concurrency);
};
Expand Down Expand Up @@ -694,6 +710,8 @@ friend class Controller;

AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*);
int MaxConcurrencyOf(const MethodProperty*) const;
bool& IgnoreEovercrowdedOf(MethodProperty*);
bool IgnoreEovercrowdedOf(const MethodProperty*) const;

static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out);
Expand Down

0 comments on commit 216dd33

Please sign in to comment.