diff --git a/src/brpc/concurrency_limiter.h b/src/brpc/concurrency_limiter.h index 99ca13a153..08cdb813c2 100644 --- a/src/brpc/concurrency_limiter.h +++ b/src/brpc/concurrency_limiter.h @@ -33,7 +33,7 @@ class ConcurrencyLimiter { // false when the concurrency reaches the upper limit, otherwise it // returns true. Normally, when OnRequested returns false, you should // return an ELIMIT error directly. - virtual bool OnRequested(int current_concurrency) = 0; + virtual bool OnRequested(int current_concurrency, int32_t timeout_ms) = 0; // Each request should call this method before responding. // `error_code' : Error code obtained from the controller, 0 means success. diff --git a/src/brpc/details/method_status.h b/src/brpc/details/method_status.h index 094e8953a2..a9105998f1 100644 --- a/src/brpc/details/method_status.h +++ b/src/brpc/details/method_status.h @@ -38,7 +38,7 @@ class MethodStatus : public Describable { // Call this function when the method is about to be called. // Returns false when the method is overloaded. If rejected_cc is not // NULL, it's set with the rejected concurrency. - bool OnRequested(int* rejected_cc = NULL); + bool OnRequested(int* rejected_cc = NULL, int32_t timeout_ms = 0); // Call this when the method just finished. // `error_code' : The error code obtained from the controller. Equal to @@ -89,9 +89,9 @@ class ConcurrencyRemover { uint64_t _received_us; }; -inline bool MethodStatus::OnRequested(int* rejected_cc) { +inline bool MethodStatus::OnRequested(int* rejected_cc, int32_t timeout_ms) { const int cc = _nconcurrency.fetch_add(1, butil::memory_order_relaxed) + 1; - if (NULL == _cl || _cl->OnRequested(cc)) { + if (NULL == _cl || _cl->OnRequested(cc, timeout_ms)) { return true; } if (rejected_cc) { diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index af8dac5c27..30c2f1a3b9 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -80,6 +80,7 @@ #include "brpc/concurrency_limiter.h" #include "brpc/policy/auto_concurrency_limiter.h" #include "brpc/policy/constant_concurrency_limiter.h" +#include "brpc/policy/timeout_concurrency_limiter.h" #include "brpc/input_messenger.h" // get_or_new_client_side_messenger #include "brpc/socket_map.h" // SocketMapList @@ -150,6 +151,7 @@ struct GlobalExtensions { AutoConcurrencyLimiter auto_cl; ConstantConcurrencyLimiter constant_cl; + TimeoutConcurrencyLimiter timeout_cl; }; static pthread_once_t register_extensions_once = PTHREAD_ONCE_INIT; @@ -601,7 +603,8 @@ static void GlobalInitializeOrDieImpl() { // Concurrency Limiters ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl); ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl); - + ConcurrencyLimiterExtension()->RegisterOrDie("timeout", &g_ext->timeout_cl); + if (FLAGS_usercode_in_pthread) { // Optional. If channel/server are initialized before main(), this // flag may be false at here even if it will be set to true after diff --git a/src/brpc/policy/auto_concurrency_limiter.cpp b/src/brpc/policy/auto_concurrency_limiter.cpp index 5eafbd7aec..64e80c955e 100644 --- a/src/brpc/policy/auto_concurrency_limiter.cpp +++ b/src/brpc/policy/auto_concurrency_limiter.cpp @@ -93,7 +93,7 @@ AutoConcurrencyLimiter* AutoConcurrencyLimiter::New(const AdaptiveMaxConcurrency return new (std::nothrow) AutoConcurrencyLimiter; } -bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) { +bool AutoConcurrencyLimiter::OnRequested(int current_concurrency, int32_t) { return current_concurrency <= _max_concurrency; } diff --git a/src/brpc/policy/auto_concurrency_limiter.h b/src/brpc/policy/auto_concurrency_limiter.h index 7d69424793..a09b3271fc 100644 --- a/src/brpc/policy/auto_concurrency_limiter.h +++ b/src/brpc/policy/auto_concurrency_limiter.h @@ -29,7 +29,7 @@ class AutoConcurrencyLimiter : public ConcurrencyLimiter { public: AutoConcurrencyLimiter(); - bool OnRequested(int current_concurrency) override; + bool OnRequested(int current_concurrency, int32_t) override; void OnResponded(int error_code, int64_t latency_us) override; diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 76f19619a4..d7c3b4f24a 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -453,7 +453,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { method_status = mp->status; if (method_status) { int rejected_cc = 0; - if (!method_status->OnRequested(&rejected_cc)) { + if (!method_status->OnRequested(&rejected_cc, cntl->timeout_ms())) { cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d", mp->method->full_name().c_str(), rejected_cc); break; diff --git a/src/brpc/policy/constant_concurrency_limiter.cpp b/src/brpc/policy/constant_concurrency_limiter.cpp index 91ab7a881f..6ab633a43d 100644 --- a/src/brpc/policy/constant_concurrency_limiter.cpp +++ b/src/brpc/policy/constant_concurrency_limiter.cpp @@ -24,7 +24,7 @@ ConstantConcurrencyLimiter::ConstantConcurrencyLimiter(int max_concurrency) : _max_concurrency(max_concurrency) { } -bool ConstantConcurrencyLimiter::OnRequested(int current_concurrency) { +bool ConstantConcurrencyLimiter::OnRequested(int current_concurrency, int32_t) { return current_concurrency <= _max_concurrency; } diff --git a/src/brpc/policy/constant_concurrency_limiter.h b/src/brpc/policy/constant_concurrency_limiter.h index 755714b83c..bf12028650 100644 --- a/src/brpc/policy/constant_concurrency_limiter.h +++ b/src/brpc/policy/constant_concurrency_limiter.h @@ -27,7 +27,7 @@ class ConstantConcurrencyLimiter : public ConcurrencyLimiter { public: explicit ConstantConcurrencyLimiter(int max_concurrency); - bool OnRequested(int current_concurrency) override; + bool OnRequested(int current_concurrency, int32_t) override; void OnResponded(int error_code, int64_t latency_us) override; diff --git a/src/brpc/policy/timeout_concurrency_limiter.cpp b/src/brpc/policy/timeout_concurrency_limiter.cpp new file mode 100644 index 0000000000..a269f1217e --- /dev/null +++ b/src/brpc/policy/timeout_concurrency_limiter.cpp @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/policy/timeout_concurrency_limiter.h" +#include "brpc/controller.h" +#include "brpc/errno.pb.h" +#include +#include + +namespace brpc { +namespace policy { + +DEFINE_int32(timeout_cl_sample_window_size_ms, 1000, + "Duration of the sampling window."); +DEFINE_int32(timeout_cl_min_sample_count, 100, + "During the duration of the sampling window, if the number of " + "requests collected is less than this value, the sampling window " + "will be discarded."); +DEFINE_int32(timeout_cl_max_sample_count, 200, + "During the duration of the sampling window, once the number of " + "requests collected is greater than this value, even if the " + "duration of the window has not ended, the max_concurrency will " + "be updated and a new sampling window will be started."); +DEFINE_double(timeout_cl_sampling_interval_ms, 0.1, + "Interval for sampling request in auto concurrency limiter"); +DEFINE_int32(timeout_cl_initial_avg_latency_us, 500, + "Initial max concurrency for gradient concurrency limiter"); +DEFINE_bool( + timeout_cl_enable_error_punish, true, + "Whether to consider failed requests when calculating maximum concurrency"); +DEFINE_double( + timeout_cl_fail_punish_ratio, 1.0, + "Use the failed requests to punish normal requests. The larger " + "the configuration item, the more aggressive the penalty strategy."); +DEFINE_int32(timeout_cl_default_timeout_ms, 500, + "Default timeout for rpc request"); + +TimeoutConcurrencyLimiter::TimeoutConcurrencyLimiter() + : _avg_latency_us(FLAGS_timeout_cl_initial_avg_latency_us), + _last_sampling_time_us(0) {} + +TimeoutConcurrencyLimiter *TimeoutConcurrencyLimiter::New( + const AdaptiveMaxConcurrency &) const { + return new (std::nothrow) TimeoutConcurrencyLimiter; +} + +bool TimeoutConcurrencyLimiter::OnRequested(int current_concurrency, + int32_t timeout_ms) { + if (timeout_ms == UNSET_MAGIC_NUM) { + timeout_ms = FLAGS_timeout_cl_default_timeout_ms; + } + return current_concurrency * _avg_latency_us < timeout_ms; +} + +void TimeoutConcurrencyLimiter::OnResponded(int error_code, + int64_t latency_us) { + if (ELIMIT == error_code) { + return; + } + + const int64_t now_time_us = butil::gettimeofday_us(); + int64_t last_sampling_time_us = + _last_sampling_time_us.load(butil::memory_order_relaxed); + + if (last_sampling_time_us == 0 || + now_time_us - last_sampling_time_us >= + FLAGS_timeout_cl_sampling_interval_ms * 1000) { + bool sample_this_call = _last_sampling_time_us.compare_exchange_strong( + last_sampling_time_us, now_time_us, butil::memory_order_relaxed); + if (sample_this_call) { + bool sample_window_submitted = + AddSample(error_code, latency_us, now_time_us); + if (sample_window_submitted) { + // The following log prints has data-race in extreme cases, + // unless you are in debug, you should not open it. + VLOG(1) << "Sample window submitted, current avg_latency_us:" + << _avg_latency_us; + } + } + } +} + +int TimeoutConcurrencyLimiter::MaxConcurrency() { return 0; } + +bool TimeoutConcurrencyLimiter::AddSample(int error_code, int64_t latency_us, + int64_t sampling_time_us) { + std::unique_lock lock_guard(_sw_mutex); + if (_sw.start_time_us == 0) { + _sw.start_time_us = sampling_time_us; + } + + if (error_code != 0 && FLAGS_timeout_cl_enable_error_punish) { + ++_sw.failed_count; + _sw.total_failed_us += latency_us; + } else if (error_code == 0) { + ++_sw.succ_count; + _sw.total_succ_us += latency_us; + } + + if (_sw.succ_count + _sw.failed_count < FLAGS_timeout_cl_min_sample_count) { + if (sampling_time_us - _sw.start_time_us >= + FLAGS_timeout_cl_sample_window_size_ms * 1000) { + // If the sample size is insufficient at the end of the sampling + // window, discard the entire sampling window + ResetSampleWindow(sampling_time_us); + } + return false; + } + if (sampling_time_us - _sw.start_time_us < + FLAGS_timeout_cl_sample_window_size_ms * 1000 && + _sw.succ_count + _sw.failed_count < FLAGS_timeout_cl_max_sample_count) { + return false; + } + + if (_sw.succ_count > 0) { + UpdateAvgLatency(); + } else { + // All request failed + AdjustAvgLatency(_avg_latency_us / 2); + } + ResetSampleWindow(sampling_time_us); + return true; +} + +void TimeoutConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) { + _sw.start_time_us = sampling_time_us; + _sw.succ_count = 0; + _sw.failed_count = 0; + _sw.total_failed_us = 0; + _sw.total_succ_us = 0; +} + +void TimeoutConcurrencyLimiter::AdjustAvgLatency(int64_t avg_latency_us) { + _avg_latency_us = avg_latency_us; +} + +void TimeoutConcurrencyLimiter::UpdateAvgLatency() { + double failed_punish = + _sw.total_failed_us * FLAGS_timeout_cl_fail_punish_ratio; + auto avg_latency_us = + std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count); + AdjustAvgLatency(avg_latency_us); +} + +} // namespace policy +} // namespace brpc diff --git a/src/brpc/policy/timeout_concurrency_limiter.h b/src/brpc/policy/timeout_concurrency_limiter.h new file mode 100644 index 0000000000..4229ac11b9 --- /dev/null +++ b/src/brpc/policy/timeout_concurrency_limiter.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H +#define BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H + +#include "brpc/concurrency_limiter.h" + +namespace brpc { +namespace policy { + +class TimeoutConcurrencyLimiter : public ConcurrencyLimiter { + public: + TimeoutConcurrencyLimiter(); + + bool OnRequested(int current_concurrency, int32_t timeout_ms) override; + + void OnResponded(int error_code, int64_t latency_us) override; + + int MaxConcurrency() override; + + TimeoutConcurrencyLimiter* New( + const AdaptiveMaxConcurrency&) const override; + + private: + struct SampleWindow { + SampleWindow() + : start_time_us(0), + succ_count(0), + failed_count(0), + total_failed_us(0), + total_succ_us(0) {} + int64_t start_time_us; + int32_t succ_count; + int32_t failed_count; + int64_t total_failed_us; + int64_t total_succ_us; + }; + + bool AddSample(int error_code, int64_t latency_us, + int64_t sampling_time_us); + + // The following methods are not thread safe and can only be called + // in AppSample() + void ResetSampleWindow(int64_t sampling_time_us); + void UpdateAvgLatency(); + void AdjustAvgLatency(int64_t avg_latency_us); + + // modified per sample-window or more + int64_t _avg_latency_us; + // modified per sample. + BAIDU_CACHELINE_ALIGNMENT butil::atomic _last_sampling_time_us; + butil::Mutex _sw_mutex; + SampleWindow _sw; +}; + +} // namespace policy +} // namespace brpc + +#endif // BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H