Skip to content

Commit

Permalink
[feat] only service level ignore_eovercrowded
Browse files Browse the repository at this point in the history
  • Loading branch information
lianxuechao authored and lianxuechao committed Oct 7, 2024
1 parent 216dd33 commit dbe25f3
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 90 deletions.
11 changes: 6 additions & 5 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,12 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
break;
}

if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}

if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
Expand Down Expand Up @@ -566,11 +572,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL);
break;
}
if (socket->is_overcrowded() && !server->options().ignore_eovercrowded && !mp->ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
// Switch to service-specific error.
non_service_error.release();
method_status = mp->status;
Expand Down
71 changes: 2 additions & 69 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ ServerOptions::ServerOptions()
, rtmp_service(NULL)
, redis_service(NULL)
, bthread_tag(BTHREAD_TAG_INVALID)
, rpc_pb_message_factory(new DefaultRpcPBMessageFactory()) {
, rpc_pb_message_factory(new DefaultRpcPBMessageFactory())
, ignore_eovercrowded(false) {
if (s_ncore > 0) {
num_threads = s_ncore + 1;
}
Expand Down Expand Up @@ -2306,74 +2307,6 @@ int Server::MaxConcurrencyOf(google::protobuf::Service* service,
return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name);
}

bool& Server::IgnoreEovercrowdedOf(MethodProperty* mp) {
if (IsRunning()) {
LOG(WARNING) << "IgnoreEovercrowdedOf is only allowd before Server started";
return g_default_ignore_eovercrowded;
}
if (mp->status == NULL) {
LOG(ERROR) << "method=" << mp->method->full_name()
<< " does not support max_concurrency";
_failed_to_set_max_concurrency_of_method = true;
return g_default_ignore_eovercrowded;
}
return mp->ignore_eovercrowded;
}

bool Server::IgnoreEovercrowdedOf(const MethodProperty* mp) const {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started";
return g_default_ignore_eovercrowded;
}
if (mp == NULL || mp->status == NULL) {
return false;
}
return mp->ignore_eovercrowded;
}

bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) {
MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_ignore_eovercrowded;
}
return IgnoreEovercrowdedOf(mp);
}

bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const {
return IgnoreEovercrowdedOf(_method_map.seek(full_method_name));
}

bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) {
MethodProperty* mp = const_cast<MethodProperty*>(
FindMethodPropertyByFullName(full_service_name, method_name));
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_service_name
<< '/' << method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_ignore_eovercrowded;
}
return IgnoreEovercrowdedOf(mp);
}

bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) const {
return IgnoreEovercrowdedOf(FindMethodPropertyByFullName(
full_service_name, method_name));
}

bool& Server::IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) {
return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name);
}

bool Server::IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const {
return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name);
}

bool Server::AcceptRequest(Controller* cntl) const {
const Interceptor* interceptor = _options.interceptor;
if (!interceptor) {
Expand Down
19 changes: 3 additions & 16 deletions src/brpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ struct ServerOptions {
// Owned by Server and deleted in server's destructor.
RpcPBMessageFactory* rpc_pb_message_factory;

// Ignore eovercrowded error on server side, i.e. , if eovercrowded is reported when server is processing a rpc request,
// server will keep processing this request, it is expected to be used by some light-weight control-frame rpcs.
// [CUATION] You should not enabling this option if your rpc is heavy-loaded.
bool ignore_eovercrowded;

private:
Expand Down Expand Up @@ -414,7 +417,6 @@ class Server {
const google::protobuf::MethodDescriptor* method;
MethodStatus* status;
AdaptiveMaxConcurrency max_concurrency;
bool ignore_eovercrowded = false;

MethodProperty();
};
Expand Down Expand Up @@ -593,19 +595,6 @@ class Server {
int MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const;

bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name);
bool IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const;

bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name);
bool IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) const;

bool& IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name);
bool IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const;

int Concurrency() const {
return butil::subtle::NoBarrier_Load(&_concurrency);
};
Expand Down Expand Up @@ -710,8 +699,6 @@ friend class Controller;

AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*);
int MaxConcurrencyOf(const MethodProperty*) const;
bool& IgnoreEovercrowdedOf(MethodProperty*);
bool IgnoreEovercrowdedOf(const MethodProperty*) const;

static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out);
Expand Down

0 comments on commit dbe25f3

Please sign in to comment.