Skip to content

Commit

Permalink
add timeout concurrency limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
yanglimingcn committed Nov 29, 2022
1 parent d0052a6 commit c314177
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/brpc/concurrency_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ConcurrencyLimiter {
// false when the concurrency reaches the upper limit, otherwise it
// returns true. Normally, when OnRequested returns false, you should
// return an ELIMIT error directly.
virtual bool OnRequested(int current_concurrency) = 0;
virtual bool OnRequested(int current_concurrency, int32_t timeout_ms) = 0;

// Each request should call this method before responding.
// `error_code' : Error code obtained from the controller, 0 means success.
Expand Down
6 changes: 3 additions & 3 deletions src/brpc/details/method_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class MethodStatus : public Describable {
// Call this function when the method is about to be called.
// Returns false when the method is overloaded. If rejected_cc is not
// NULL, it's set with the rejected concurrency.
bool OnRequested(int* rejected_cc = NULL);
bool OnRequested(int* rejected_cc = NULL, int32_t timeout_ms = 0);

// Call this when the method just finished.
// `error_code' : The error code obtained from the controller. Equal to
Expand Down Expand Up @@ -89,9 +89,9 @@ class ConcurrencyRemover {
uint64_t _received_us;
};

inline bool MethodStatus::OnRequested(int* rejected_cc) {
inline bool MethodStatus::OnRequested(int* rejected_cc, int32_t timeout_ms) {
const int cc = _nconcurrency.fetch_add(1, butil::memory_order_relaxed) + 1;
if (NULL == _cl || _cl->OnRequested(cc)) {
if (NULL == _cl || _cl->OnRequested(cc, timeout_ms)) {
return true;
}
if (rejected_cc) {
Expand Down
5 changes: 4 additions & 1 deletion src/brpc/global.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
#include "brpc/concurrency_limiter.h"
#include "brpc/policy/auto_concurrency_limiter.h"
#include "brpc/policy/constant_concurrency_limiter.h"
#include "brpc/policy/timeout_concurrency_limiter.h"

#include "brpc/input_messenger.h" // get_or_new_client_side_messenger
#include "brpc/socket_map.h" // SocketMapList
Expand Down Expand Up @@ -150,6 +151,7 @@ struct GlobalExtensions {

AutoConcurrencyLimiter auto_cl;
ConstantConcurrencyLimiter constant_cl;
TimeoutConcurrencyLimiter timeout_cl;
};

static pthread_once_t register_extensions_once = PTHREAD_ONCE_INIT;
Expand Down Expand Up @@ -601,7 +603,8 @@ static void GlobalInitializeOrDieImpl() {
// Concurrency Limiters
ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);

ConcurrencyLimiterExtension()->RegisterOrDie("timeout", &g_ext->timeout_cl);

if (FLAGS_usercode_in_pthread) {
// Optional. If channel/server are initialized before main(), this
// flag may be false at here even if it will be set to true after
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/policy/auto_concurrency_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ AutoConcurrencyLimiter* AutoConcurrencyLimiter::New(const AdaptiveMaxConcurrency
return new (std::nothrow) AutoConcurrencyLimiter;
}

bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
bool AutoConcurrencyLimiter::OnRequested(int current_concurrency, int32_t) {
return current_concurrency <= _max_concurrency;
}

Expand Down
2 changes: 1 addition & 1 deletion src/brpc/policy/auto_concurrency_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class AutoConcurrencyLimiter : public ConcurrencyLimiter {
public:
AutoConcurrencyLimiter();

bool OnRequested(int current_concurrency) override;
bool OnRequested(int current_concurrency, int32_t) override;

void OnResponded(int error_code, int64_t latency_us) override;

Expand Down
2 changes: 1 addition & 1 deletion src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
method_status = mp->status;
if (method_status) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
if (!method_status->OnRequested(&rejected_cc, cntl->timeout_ms())) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
break;
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/policy/constant_concurrency_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ConstantConcurrencyLimiter::ConstantConcurrencyLimiter(int max_concurrency)
: _max_concurrency(max_concurrency) {
}

bool ConstantConcurrencyLimiter::OnRequested(int current_concurrency) {
bool ConstantConcurrencyLimiter::OnRequested(int current_concurrency, int32_t) {
return current_concurrency <= _max_concurrency;
}

Expand Down
2 changes: 1 addition & 1 deletion src/brpc/policy/constant_concurrency_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ConstantConcurrencyLimiter : public ConcurrencyLimiter {
public:
explicit ConstantConcurrencyLimiter(int max_concurrency);

bool OnRequested(int current_concurrency) override;
bool OnRequested(int current_concurrency, int32_t) override;

void OnResponded(int error_code, int64_t latency_us) override;

Expand Down
160 changes: 160 additions & 0 deletions src/brpc/policy/timeout_concurrency_limiter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "brpc/policy/timeout_concurrency_limiter.h"
#include "brpc/controller.h"
#include "brpc/errno.pb.h"
#include <cmath>
#include <gflags/gflags.h>

namespace brpc {
namespace policy {

DEFINE_int32(timeout_cl_sample_window_size_ms, 1000,
"Duration of the sampling window.");
DEFINE_int32(timeout_cl_min_sample_count, 100,
"During the duration of the sampling window, if the number of "
"requests collected is less than this value, the sampling window "
"will be discarded.");
DEFINE_int32(timeout_cl_max_sample_count, 200,
"During the duration of the sampling window, once the number of "
"requests collected is greater than this value, even if the "
"duration of the window has not ended, the max_concurrency will "
"be updated and a new sampling window will be started.");
DEFINE_double(timeout_cl_sampling_interval_ms, 0.1,
"Interval for sampling request in auto concurrency limiter");
DEFINE_int32(timeout_cl_initial_avg_latency_us, 500,
"Initial max concurrency for gradient concurrency limiter");
DEFINE_bool(
timeout_cl_enable_error_punish, true,
"Whether to consider failed requests when calculating maximum concurrency");
DEFINE_double(
timeout_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger "
"the configuration item, the more aggressive the penalty strategy.");
DEFINE_int32(timeout_cl_default_timeout_ms, 500,
"Default timeout for rpc request");

TimeoutConcurrencyLimiter::TimeoutConcurrencyLimiter()
: _avg_latency_us(FLAGS_timeout_cl_initial_avg_latency_us),
_last_sampling_time_us(0) {}

TimeoutConcurrencyLimiter *TimeoutConcurrencyLimiter::New(
const AdaptiveMaxConcurrency &) const {
return new (std::nothrow) TimeoutConcurrencyLimiter;
}

bool TimeoutConcurrencyLimiter::OnRequested(int current_concurrency,
int32_t timeout_ms) {
if (timeout_ms == UNSET_MAGIC_NUM) {
timeout_ms = FLAGS_timeout_cl_default_timeout_ms;
}
return current_concurrency * _avg_latency_us < timeout_ms;
}

void TimeoutConcurrencyLimiter::OnResponded(int error_code,
int64_t latency_us) {
if (ELIMIT == error_code) {
return;
}

const int64_t now_time_us = butil::gettimeofday_us();
int64_t last_sampling_time_us =
_last_sampling_time_us.load(butil::memory_order_relaxed);

if (last_sampling_time_us == 0 ||
now_time_us - last_sampling_time_us >=
FLAGS_timeout_cl_sampling_interval_ms * 1000) {
bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
if (sample_this_call) {
bool sample_window_submitted =
AddSample(error_code, latency_us, now_time_us);
if (sample_window_submitted) {
// The following log prints has data-race in extreme cases,
// unless you are in debug, you should not open it.
VLOG(1) << "Sample window submitted, current avg_latency_us:"
<< _avg_latency_us;
}
}
}
}

int TimeoutConcurrencyLimiter::MaxConcurrency() { return 0; }

bool TimeoutConcurrencyLimiter::AddSample(int error_code, int64_t latency_us,
int64_t sampling_time_us) {
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
if (_sw.start_time_us == 0) {
_sw.start_time_us = sampling_time_us;
}

if (error_code != 0 && FLAGS_timeout_cl_enable_error_punish) {
++_sw.failed_count;
_sw.total_failed_us += latency_us;
} else if (error_code == 0) {
++_sw.succ_count;
_sw.total_succ_us += latency_us;
}

if (_sw.succ_count + _sw.failed_count < FLAGS_timeout_cl_min_sample_count) {
if (sampling_time_us - _sw.start_time_us >=
FLAGS_timeout_cl_sample_window_size_ms * 1000) {
// If the sample size is insufficient at the end of the sampling
// window, discard the entire sampling window
ResetSampleWindow(sampling_time_us);
}
return false;
}
if (sampling_time_us - _sw.start_time_us <
FLAGS_timeout_cl_sample_window_size_ms * 1000 &&
_sw.succ_count + _sw.failed_count < FLAGS_timeout_cl_max_sample_count) {
return false;
}

if (_sw.succ_count > 0) {
UpdateAvgLatency();
} else {
// All request failed
AdjustAvgLatency(_avg_latency_us / 2);
}
ResetSampleWindow(sampling_time_us);
return true;
}

void TimeoutConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
_sw.start_time_us = sampling_time_us;
_sw.succ_count = 0;
_sw.failed_count = 0;
_sw.total_failed_us = 0;
_sw.total_succ_us = 0;
}

void TimeoutConcurrencyLimiter::AdjustAvgLatency(int64_t avg_latency_us) {
_avg_latency_us = avg_latency_us;
}

void TimeoutConcurrencyLimiter::UpdateAvgLatency() {
double failed_punish =
_sw.total_failed_us * FLAGS_timeout_cl_fail_punish_ratio;
auto avg_latency_us =
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
AdjustAvgLatency(avg_latency_us);
}

} // namespace policy
} // namespace brpc
74 changes: 74 additions & 0 deletions src/brpc/policy/timeout_concurrency_limiter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H
#define BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H

#include "brpc/concurrency_limiter.h"

namespace brpc {
namespace policy {

class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
public:
TimeoutConcurrencyLimiter();

bool OnRequested(int current_concurrency, int32_t timeout_ms) override;

void OnResponded(int error_code, int64_t latency_us) override;

int MaxConcurrency() override;

TimeoutConcurrencyLimiter* New(
const AdaptiveMaxConcurrency&) const override;

private:
struct SampleWindow {
SampleWindow()
: start_time_us(0),
succ_count(0),
failed_count(0),
total_failed_us(0),
total_succ_us(0) {}
int64_t start_time_us;
int32_t succ_count;
int32_t failed_count;
int64_t total_failed_us;
int64_t total_succ_us;
};

bool AddSample(int error_code, int64_t latency_us,
int64_t sampling_time_us);

// The following methods are not thread safe and can only be called
// in AppSample()
void ResetSampleWindow(int64_t sampling_time_us);
void UpdateAvgLatency();
void AdjustAvgLatency(int64_t avg_latency_us);

// modified per sample-window or more
int64_t _avg_latency_us;
// modified per sample.
BAIDU_CACHELINE_ALIGNMENT butil::atomic<int64_t> _last_sampling_time_us;
butil::Mutex _sw_mutex;
SampleWindow _sw;
};

} // namespace policy
} // namespace brpc

#endif // BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H

0 comments on commit c314177

Please sign in to comment.