From 216dd33d035b16b5f1c1d2a4434dee7ebb4caa33 Mon Sep 17 00:00:00 2001 From: lianxuechao Date: Mon, 30 Sep 2024 14:34:31 +0800 Subject: [PATCH 1/4] [feat][WIP] add a method option to disable check eovercrowded --- src/brpc/policy/baidu_rpc_protocol.cpp | 16 +++--- src/brpc/server.cpp | 69 ++++++++++++++++++++++++++ src/brpc/server.h | 18 +++++++ 3 files changed, 97 insertions(+), 6 deletions(-) diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 504895b185..7728f4cf9e 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -472,12 +472,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { cntl->SetFailed(ELOGOFF, "Server is stopping"); break; } - - if (socket->is_overcrowded()) { - cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", - butil::endpoint2str(socket->remote_side()).c_str()); - break; - } if (!server_accessor.AddConcurrency(cntl.get())) { cntl->SetFailed( @@ -505,6 +499,11 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { google::protobuf::Service* svc = NULL; google::protobuf::MethodDescriptor* method = NULL; if (NULL != server->options().baidu_master_service) { + if (socket->is_overcrowded()) { + cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", + butil::endpoint2str(socket->remote_side()).c_str()); + break; + } svc = server->options().baidu_master_service; auto sampled_request = new (std::nothrow) SampledRequest; if (NULL == sampled_request) { @@ -567,6 +566,11 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL); break; } + if (socket->is_overcrowded() && !server->options().ignore_eovercrowded && !mp->ignore_eovercrowded) { + cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", + butil::endpoint2str(socket->remote_side()).c_str()); + break; + } // Switch to service-specific error. non_service_error.release(); method_status = mp->status; diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 740873f126..007e433423 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -794,6 +794,7 @@ static bool OptionsAvailableOverRdma(const ServerOptions* opt) { #endif static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0); +static bool g_default_ignore_eovercrowded(false); int Server::StartInternal(const butil::EndPoint& endpoint, const PortRange& port_range, @@ -2305,6 +2306,74 @@ int Server::MaxConcurrencyOf(google::protobuf::Service* service, return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name); } +bool& Server::IgnoreEovercrowdedOf(MethodProperty* mp) { + if (IsRunning()) { + LOG(WARNING) << "IgnoreEovercrowdedOf is only allowd before Server started"; + return g_default_ignore_eovercrowded; + } + if (mp->status == NULL) { + LOG(ERROR) << "method=" << mp->method->full_name() + << " does not support max_concurrency"; + _failed_to_set_max_concurrency_of_method = true; + return g_default_ignore_eovercrowded; + } + return mp->ignore_eovercrowded; +} + +bool Server::IgnoreEovercrowdedOf(const MethodProperty* mp) const { + if (IsRunning()) { + LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started"; + return g_default_ignore_eovercrowded; + } + if (mp == NULL || mp->status == NULL) { + return false; + } + return mp->ignore_eovercrowded; +} + +bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) { + MethodProperty* mp = _method_map.seek(full_method_name); + if (mp == NULL) { + LOG(ERROR) << "Fail to find method=" << full_method_name; + _failed_to_set_max_concurrency_of_method = true; + return g_default_ignore_eovercrowded; + } + return IgnoreEovercrowdedOf(mp); +} + +bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const { + return IgnoreEovercrowdedOf(_method_map.seek(full_method_name)); +} + +bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name, + const butil::StringPiece& method_name) { + MethodProperty* mp = const_cast( + FindMethodPropertyByFullName(full_service_name, method_name)); + if (mp == NULL) { + LOG(ERROR) << "Fail to find method=" << full_service_name + << '/' << method_name; + _failed_to_set_max_concurrency_of_method = true; + return g_default_ignore_eovercrowded; + } + return IgnoreEovercrowdedOf(mp); +} + +bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name, + const butil::StringPiece& method_name) const { + return IgnoreEovercrowdedOf(FindMethodPropertyByFullName( + full_service_name, method_name)); +} + +bool& Server::IgnoreEovercrowdedOf(google::protobuf::Service* service, + const butil::StringPiece& method_name) { + return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name); +} + +bool Server::IgnoreEovercrowdedOf(google::protobuf::Service* service, + const butil::StringPiece& method_name) const { + return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name); +} + bool Server::AcceptRequest(Controller* cntl) const { const Interceptor* interceptor = _options.interceptor; if (!interceptor) { diff --git a/src/brpc/server.h b/src/brpc/server.h index d65e13f0b2..d002e789f2 100644 --- a/src/brpc/server.h +++ b/src/brpc/server.h @@ -287,6 +287,8 @@ struct ServerOptions { // Owned by Server and deleted in server's destructor. RpcPBMessageFactory* rpc_pb_message_factory; + bool ignore_eovercrowded; + private: // SSLOptions is large and not often used, allocate it on heap to // prevent ServerOptions from being bloated in most cases. @@ -412,6 +414,7 @@ class Server { const google::protobuf::MethodDescriptor* method; MethodStatus* status; AdaptiveMaxConcurrency max_concurrency; + bool ignore_eovercrowded = false; MethodProperty(); }; @@ -590,6 +593,19 @@ class Server { int MaxConcurrencyOf(google::protobuf::Service* service, const butil::StringPiece& method_name) const; + bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name); + bool IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const; + + bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name, + const butil::StringPiece& method_name); + bool IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name, + const butil::StringPiece& method_name) const; + + bool& IgnoreEovercrowdedOf(google::protobuf::Service* service, + const butil::StringPiece& method_name); + bool IgnoreEovercrowdedOf(google::protobuf::Service* service, + const butil::StringPiece& method_name) const; + int Concurrency() const { return butil::subtle::NoBarrier_Load(&_concurrency); }; @@ -694,6 +710,8 @@ friend class Controller; AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*); int MaxConcurrencyOf(const MethodProperty*) const; + bool& IgnoreEovercrowdedOf(MethodProperty*); + bool IgnoreEovercrowdedOf(const MethodProperty*) const; static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc, ConcurrencyLimiter** out); From dbe25f3f9c23fa8d4b17f7510f1a928696d3efab Mon Sep 17 00:00:00 2001 From: lianxuechao Date: Mon, 7 Oct 2024 16:31:28 +0800 Subject: [PATCH 2/4] [feat] only service level ignore_eovercrowded --- src/brpc/policy/baidu_rpc_protocol.cpp | 11 ++-- src/brpc/server.cpp | 71 +------------------------- src/brpc/server.h | 19 ++----- 3 files changed, 11 insertions(+), 90 deletions(-) diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 7728f4cf9e..a9d0dd096b 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -473,6 +473,12 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { break; } + if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) { + cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", + butil::endpoint2str(socket->remote_side()).c_str()); + break; + } + if (!server_accessor.AddConcurrency(cntl.get())) { cntl->SetFailed( ELIMIT, "Reached server's max_concurrency=%d", @@ -566,11 +572,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL); break; } - if (socket->is_overcrowded() && !server->options().ignore_eovercrowded && !mp->ignore_eovercrowded) { - cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", - butil::endpoint2str(socket->remote_side()).c_str()); - break; - } // Switch to service-specific error. non_service_error.release(); method_status = mp->status; diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 007e433423..c641b8239e 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -152,7 +152,8 @@ ServerOptions::ServerOptions() , rtmp_service(NULL) , redis_service(NULL) , bthread_tag(BTHREAD_TAG_INVALID) - , rpc_pb_message_factory(new DefaultRpcPBMessageFactory()) { + , rpc_pb_message_factory(new DefaultRpcPBMessageFactory()) + , ignore_eovercrowded(false) { if (s_ncore > 0) { num_threads = s_ncore + 1; } @@ -2306,74 +2307,6 @@ int Server::MaxConcurrencyOf(google::protobuf::Service* service, return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name); } -bool& Server::IgnoreEovercrowdedOf(MethodProperty* mp) { - if (IsRunning()) { - LOG(WARNING) << "IgnoreEovercrowdedOf is only allowd before Server started"; - return g_default_ignore_eovercrowded; - } - if (mp->status == NULL) { - LOG(ERROR) << "method=" << mp->method->full_name() - << " does not support max_concurrency"; - _failed_to_set_max_concurrency_of_method = true; - return g_default_ignore_eovercrowded; - } - return mp->ignore_eovercrowded; -} - -bool Server::IgnoreEovercrowdedOf(const MethodProperty* mp) const { - if (IsRunning()) { - LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started"; - return g_default_ignore_eovercrowded; - } - if (mp == NULL || mp->status == NULL) { - return false; - } - return mp->ignore_eovercrowded; -} - -bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) { - MethodProperty* mp = _method_map.seek(full_method_name); - if (mp == NULL) { - LOG(ERROR) << "Fail to find method=" << full_method_name; - _failed_to_set_max_concurrency_of_method = true; - return g_default_ignore_eovercrowded; - } - return IgnoreEovercrowdedOf(mp); -} - -bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const { - return IgnoreEovercrowdedOf(_method_map.seek(full_method_name)); -} - -bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name, - const butil::StringPiece& method_name) { - MethodProperty* mp = const_cast( - FindMethodPropertyByFullName(full_service_name, method_name)); - if (mp == NULL) { - LOG(ERROR) << "Fail to find method=" << full_service_name - << '/' << method_name; - _failed_to_set_max_concurrency_of_method = true; - return g_default_ignore_eovercrowded; - } - return IgnoreEovercrowdedOf(mp); -} - -bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name, - const butil::StringPiece& method_name) const { - return IgnoreEovercrowdedOf(FindMethodPropertyByFullName( - full_service_name, method_name)); -} - -bool& Server::IgnoreEovercrowdedOf(google::protobuf::Service* service, - const butil::StringPiece& method_name) { - return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name); -} - -bool Server::IgnoreEovercrowdedOf(google::protobuf::Service* service, - const butil::StringPiece& method_name) const { - return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name); -} - bool Server::AcceptRequest(Controller* cntl) const { const Interceptor* interceptor = _options.interceptor; if (!interceptor) { diff --git a/src/brpc/server.h b/src/brpc/server.h index d002e789f2..ee5a500d1b 100644 --- a/src/brpc/server.h +++ b/src/brpc/server.h @@ -287,6 +287,9 @@ struct ServerOptions { // Owned by Server and deleted in server's destructor. RpcPBMessageFactory* rpc_pb_message_factory; + // Ignore eovercrowded error on server side, i.e. , if eovercrowded is reported when server is processing a rpc request, + // server will keep processing this request, it is expected to be used by some light-weight control-frame rpcs. + // [CUATION] You should not enabling this option if your rpc is heavy-loaded. bool ignore_eovercrowded; private: @@ -414,7 +417,6 @@ class Server { const google::protobuf::MethodDescriptor* method; MethodStatus* status; AdaptiveMaxConcurrency max_concurrency; - bool ignore_eovercrowded = false; MethodProperty(); }; @@ -593,19 +595,6 @@ class Server { int MaxConcurrencyOf(google::protobuf::Service* service, const butil::StringPiece& method_name) const; - bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name); - bool IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const; - - bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name, - const butil::StringPiece& method_name); - bool IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name, - const butil::StringPiece& method_name) const; - - bool& IgnoreEovercrowdedOf(google::protobuf::Service* service, - const butil::StringPiece& method_name); - bool IgnoreEovercrowdedOf(google::protobuf::Service* service, - const butil::StringPiece& method_name) const; - int Concurrency() const { return butil::subtle::NoBarrier_Load(&_concurrency); }; @@ -710,8 +699,6 @@ friend class Controller; AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*); int MaxConcurrencyOf(const MethodProperty*) const; - bool& IgnoreEovercrowdedOf(MethodProperty*); - bool IgnoreEovercrowdedOf(const MethodProperty*) const; static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc, ConcurrencyLimiter** out); From 93870e25baee7e85d02ebc9fa594bc0a68044a91 Mon Sep 17 00:00:00 2001 From: lianxuechao Date: Sat, 12 Oct 2024 12:46:06 +0800 Subject: [PATCH 3/4] [feat] impl ignore_eovercrowded in http protocol as well --- src/brpc/policy/baidu_rpc_protocol.cpp | 5 ----- src/brpc/policy/http_rpc_protocol.cpp | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index a9d0dd096b..87b4c41ca3 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -505,11 +505,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { google::protobuf::Service* svc = NULL; google::protobuf::MethodDescriptor* method = NULL; if (NULL != server->options().baidu_master_service) { - if (socket->is_overcrowded()) { - cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", - butil::endpoint2str(socket->remote_side()).c_str()); - break; - } svc = server->options().baidu_master_service; auto sampled_request = new (std::nothrow) SampledRequest; if (NULL == sampled_request) { diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index c4502c277f..76f43c0524 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -1495,7 +1495,7 @@ void ProcessHttpRequest(InputMessageBase *msg) { // NOTE: accesses to builtin services are not counted as part of // concurrency, therefore are not limited by ServerOptions.max_concurrency. if (!sp->is_builtin_service && !sp->params.is_tabbed) { - if (socket->is_overcrowded()) { + if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) { cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", butil::endpoint2str(socket->remote_side()).c_str()); return; From f2e38efb18d1ca23770dc493a87e7bb6e253b9a9 Mon Sep 17 00:00:00 2001 From: lianxuechao Date: Mon, 14 Oct 2024 11:04:06 +0800 Subject: [PATCH 4/4] [feat] enable for all protocols using is_overcrowded, rm useless code --- src/brpc/policy/hulu_pbrpc_protocol.cpp | 2 +- src/brpc/policy/nshead_protocol.cpp | 2 +- src/brpc/policy/sofa_pbrpc_protocol.cpp | 2 +- src/brpc/policy/thrift_protocol.cpp | 2 +- src/brpc/server.cpp | 1 - 5 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/brpc/policy/hulu_pbrpc_protocol.cpp b/src/brpc/policy/hulu_pbrpc_protocol.cpp index cb10aac35b..20e9c827d2 100644 --- a/src/brpc/policy/hulu_pbrpc_protocol.cpp +++ b/src/brpc/policy/hulu_pbrpc_protocol.cpp @@ -422,7 +422,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) { break; } - if (socket->is_overcrowded()) { + if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) { cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", butil::endpoint2str(socket->remote_side()).c_str()); break; diff --git a/src/brpc/policy/nshead_protocol.cpp b/src/brpc/policy/nshead_protocol.cpp index e51be36149..4288085d68 100644 --- a/src/brpc/policy/nshead_protocol.cpp +++ b/src/brpc/policy/nshead_protocol.cpp @@ -301,7 +301,7 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) { cntl->SetFailed(ELOGOFF, "Server is stopping"); break; } - if (socket->is_overcrowded()) { + if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) { cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", butil::endpoint2str(socket->remote_side()).c_str()); break; diff --git a/src/brpc/policy/sofa_pbrpc_protocol.cpp b/src/brpc/policy/sofa_pbrpc_protocol.cpp index 7584f79bd4..ad58022ffd 100644 --- a/src/brpc/policy/sofa_pbrpc_protocol.cpp +++ b/src/brpc/policy/sofa_pbrpc_protocol.cpp @@ -381,7 +381,7 @@ void ProcessSofaRequest(InputMessageBase* msg_base) { break; } - if (socket->is_overcrowded()) { + if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) { cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", butil::endpoint2str(socket->remote_side()).c_str()); break; diff --git a/src/brpc/policy/thrift_protocol.cpp b/src/brpc/policy/thrift_protocol.cpp index d53ec5e988..e3f4b2faec 100755 --- a/src/brpc/policy/thrift_protocol.cpp +++ b/src/brpc/policy/thrift_protocol.cpp @@ -530,7 +530,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { if (!server->IsRunning()) { return cntl->SetFailed(ELOGOFF, "Server is stopping"); } - if (socket->is_overcrowded()) { + if (socket->is_overcrowded() && !server->options().ignore_eovercrowded) { return cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", butil::endpoint2str(socket->remote_side()).c_str()); } diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index c641b8239e..fa3ab7d7ed 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -795,7 +795,6 @@ static bool OptionsAvailableOverRdma(const ServerOptions* opt) { #endif static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0); -static bool g_default_ignore_eovercrowded(false); int Server::StartInternal(const butil::EndPoint& endpoint, const PortRange& port_range,