Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

different method can use different TimeoutConcurrencyConf #2112

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/cn/timeout_concurrency_limiter.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
在服务正常运营过程中,流量的增减、请求体的大小变化,磁盘的顺序、随机读写,这些都会影响请求的延迟,用户一般情况下不希望请求延迟的波动造成错误,即使会有一些请求的排队造成请求延迟增加,因此,一般用户设置的请求超时时间都会是服务平均延迟的3至4倍。基于请求超时时间的限流是根据统计服务平均延迟和请求设置的超时时间相比较,来估算请求是否能够在设置的超时时间内完成处理,如果能够能完成则接受请求,如果不能完成则拒绝请求。由于统计服务平均延迟和当前请求的实际延迟会有一定的时间差,因此需要设置一个比较宽泛的最大并发度,保证服务不会因为突然的慢请求造成短时间内服务堆积过多的请求。

## 开启方法
目前只有method级别支持基于超时的限流。如果要为某个method开启基于超时的限流,只需要将它的最大并发设置为"timeout"即可,如果客户端没有开启FLAGS_baidu_std_protocol_deliver_timeout_ms,可以设置FLAGS_timeout_cl_default_timeout_ms来调整一个默认的请求超时时间,可以设置FLAGS_timeout_cl_max_concurrency来调整最大并发度。
目前只有method级别支持基于超时的限流。如果要为某个method开启基于超时的限流,只需要将它的最大并发设置为"timeout"即可,如果客户端没有开启FLAGS_baidu_std_protocol_deliver_timeout_ms,可以设置FLAGS_timeout_cl_default_timeout_ms来调整一个默认的请求超时时间,可以设置FLAGS_timeout_cl_max_concurrency来调整最大并发度。也可以通过设置brpc::TimeoutConcurrencyConf为每个method指定不同的配置。

```c++
// Set timeout concurrency limiter for all methods
brpc::ServerOptions options;
options.method_max_concurrency = "timeout";
options.method_max_concurrency = brpc::TimeoutConcurrencyConf{1, 100};

// Set timeout concurrency limiter for specific method
server.MaxConcurrencyOf("example.EchoService.Echo") = "timeout";
server.MaxConcurrencyOf("example.EchoService.Echo") = brpc::TimeoutConcurrencyConf{1, 100};
```
10 changes: 10 additions & 0 deletions src/brpc/adaptive_max_concurrency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ AdaptiveMaxConcurrency::AdaptiveMaxConcurrency(int max_concurrency)
}
}

AdaptiveMaxConcurrency::AdaptiveMaxConcurrency(
const TimeoutConcurrencyConf& value)
: _value("timeout"), _max_concurrency(-1), _timeout_conf(value) {}

inline bool CompareStringPieceWithoutCase(
const butil::StringPiece& s1, const char* s2) {
DCHECK(s2 != NULL);
Expand Down Expand Up @@ -80,6 +84,12 @@ void AdaptiveMaxConcurrency::operator=(int max_concurrency) {
}
}

void AdaptiveMaxConcurrency::operator=(const TimeoutConcurrencyConf& value) {
_value = "timeout";
_max_concurrency = -1;
_timeout_conf = value;
}

const std::string& AdaptiveMaxConcurrency::type() const {
if (_max_concurrency > 0) {
return CONSTANT();
yanglimingcn marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
11 changes: 11 additions & 0 deletions src/brpc/adaptive_max_concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@

namespace brpc {

// timeout concurrency limiter config
struct TimeoutConcurrencyConf {
int64_t timeout_ms;
int max_concurrency;
};

class AdaptiveMaxConcurrency{
public:
explicit AdaptiveMaxConcurrency();
explicit AdaptiveMaxConcurrency(int max_concurrency);
explicit AdaptiveMaxConcurrency(const butil::StringPiece& value);
explicit AdaptiveMaxConcurrency(const TimeoutConcurrencyConf& value);

// Non-trivial destructor to prevent AdaptiveMaxConcurrency from being
// passed to variadic arguments without explicit type conversion.
Expand All @@ -41,11 +48,13 @@ class AdaptiveMaxConcurrency{

void operator=(int max_concurrency);
void operator=(const butil::StringPiece& value);
void operator=(const TimeoutConcurrencyConf& value);

// 0 for type="unlimited"
// >0 for type="constant"
// <0 for type="user-defined"
operator int() const { return _max_concurrency; }
operator TimeoutConcurrencyConf() const { return _timeout_conf; }

// "unlimited" for type="unlimited"
// "10" "20" "30" for type="constant"
Expand All @@ -62,6 +71,8 @@ class AdaptiveMaxConcurrency{
private:
std::string _value;
int _max_concurrency;
TimeoutConcurrencyConf
_timeout_conf; // TODO std::varient for different type
};

inline std::ostream& operator<<(std::ostream& os, const AdaptiveMaxConcurrency& amc) {
Expand Down
20 changes: 15 additions & 5 deletions src/brpc/policy/timeout_concurrency_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,34 @@ DEFINE_int32(timeout_cl_max_concurrency, 100,

TimeoutConcurrencyLimiter::TimeoutConcurrencyLimiter()
: _avg_latency_us(FLAGS_timeout_cl_initial_avg_latency_us),
_last_sampling_time_us(0) {}
_last_sampling_time_us(0),
_timeout_ms(FLAGS_timeout_cl_default_timeout_ms),
_max_concurrency(FLAGS_timeout_cl_max_concurrency) {}

TimeoutConcurrencyLimiter::TimeoutConcurrencyLimiter(
const TimeoutConcurrencyConf &conf)
: _avg_latency_us(FLAGS_timeout_cl_initial_avg_latency_us),
_last_sampling_time_us(0),
_timeout_ms(conf.timeout_ms),
_max_concurrency(conf.max_concurrency) {}

TimeoutConcurrencyLimiter *TimeoutConcurrencyLimiter::New(
const AdaptiveMaxConcurrency &) const {
return new (std::nothrow) TimeoutConcurrencyLimiter;
const AdaptiveMaxConcurrency &amc) const {
return new (std::nothrow)
TimeoutConcurrencyLimiter(static_cast<TimeoutConcurrencyConf>(amc));
}

bool TimeoutConcurrencyLimiter::OnRequested(int current_concurrency,
Controller *cntl) {
auto timeout_ms = FLAGS_timeout_cl_default_timeout_ms;
auto timeout_ms = _timeout_ms;
if (cntl != nullptr && cntl->timeout_ms() != UNSET_MAGIC_NUM) {
timeout_ms = cntl->timeout_ms();
}
// In extreme cases, the average latency may be greater than requested
// timeout, allow currency_concurrency is 1 ensures the average latency can
// be obtained renew.
return current_concurrency == 1 ||
(current_concurrency <= FLAGS_timeout_cl_max_concurrency &&
(current_concurrency <= _max_concurrency &&
_avg_latency_us < timeout_ms * 1000);
}

Expand Down
3 changes: 3 additions & 0 deletions src/brpc/policy/timeout_concurrency_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace policy {
class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
public:
TimeoutConcurrencyLimiter();
explicit TimeoutConcurrencyLimiter(const TimeoutConcurrencyConf& conf);

bool OnRequested(int current_concurrency, Controller* cntl) override;

Expand Down Expand Up @@ -66,6 +67,8 @@ class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
BAIDU_CACHELINE_ALIGNMENT butil::atomic<int64_t> _last_sampling_time_us;
butil::Mutex _sw_mutex;
SampleWindow _sw;
int64_t _timeout_ms;
int _max_concurrency;
};

} // namespace policy
Expand Down
24 changes: 24 additions & 0 deletions test/brpc_timeout_concurrency_limiter_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,27 @@ TEST(TimeoutConcurrencyLimiterTest, OnResponded) {
ASSERT_EQ(limiter._sw.succ_count, 2);
ASSERT_EQ(limiter._sw.failed_count, 0);
}

TEST(TimeoutConcurrencyLimiterTest, AdaptiveMaxConcurrencyTest) {
{
brpc::AdaptiveMaxConcurrency concurrency(
brpc::TimeoutConcurrencyConf{100, 100});
ASSERT_EQ(concurrency.type(), "timeout");
ASSERT_EQ(concurrency.value(), "timeout");
}
{
brpc::AdaptiveMaxConcurrency concurrency;
concurrency = "timeout";
ASSERT_EQ(concurrency.type(), "timeout");
ASSERT_EQ(concurrency.value(), "timeout");
}
{
brpc::AdaptiveMaxConcurrency concurrency;
concurrency = brpc::TimeoutConcurrencyConf{50, 100};
ASSERT_EQ(concurrency.type(), "timeout");
ASSERT_EQ(concurrency.value(), "timeout");
auto time_conf = static_cast<brpc::TimeoutConcurrencyConf>(concurrency);
ASSERT_EQ(time_conf.timeout_ms, 50);
ASSERT_EQ(time_conf.max_concurrency, 100);
}
}