Skip to content

Commit

Permalink
Merge pull request #2419 from wwbmmm/fix-check-health
Browse files Browse the repository at this point in the history
Fix RetryPolicy blocks health check
  • Loading branch information
lorinlee authored Oct 21, 2023
2 parents ba5271a + d866be2 commit 9245098
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,18 +662,17 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,

// Retry backoff.
bthread::TaskGroup* g = bthread::tls_task_group;
if (retry_policy->CanRetryBackoffInPthread() ||
(g && !g->is_current_pthread_task())) {
int64_t backoff_time_us = retry_policy->GetBackoffTimeMs(this) * 1000L;
int64_t backoff_time_us = retry_policy->GetBackoffTimeMs(this) * 1000L;
if (backoff_time_us > 0 &&
backoff_time_us < _deadline_us - butil::gettimeofday_us()) {
// No need to do retry backoff when the backoff time is longer than the remaining rpc time.
if (backoff_time_us > 0 &&
backoff_time_us < _deadline_us - butil::gettimeofday_us()) {
if (retry_policy->CanRetryBackoffInPthread() ||
(g && !g->is_current_pthread_task())) {
bthread_usleep(backoff_time_us);
} else {
LOG(WARNING) << "`CanRetryBackoffInPthread()' returns false, "
"skip retry backoff in pthread.";
}

} else {
LOG(WARNING) << "`CanRetryBackoffInPthread()' returns false, "
"skip retry backoff in pthread.";
}
return IssueRPC(butil::gettimeofday_us());
}
Expand Down Expand Up @@ -1262,8 +1261,25 @@ int Controller::HandleSocketFailed(bthread_id_t id, void* data, int error_code,
cntl->SetFailed(error_code, "%s @%s", berror(error_code),
butil::endpoint2str(cntl->remote_side()).c_str());
}
CompletionInfo info = { id, false };
cntl->OnVersionedRPCReturned(info, true, saved_error);

struct OnVersionedRPCReturnedArgs {
bthread_id_t id;
Controller* cntl;
int error;
};
auto func = [](void* p) -> void* {
std::unique_ptr<OnVersionedRPCReturnedArgs> args(static_cast<OnVersionedRPCReturnedArgs*>(p));
CompletionInfo info = { args->id, false };
args->cntl->OnVersionedRPCReturned(info, true, args->error);
return NULL;
};

auto* args = new OnVersionedRPCReturnedArgs{ id, cntl, saved_error };
bthread_t tid;
// RetryPolicy may block current bthread, so start a new bthread to run OnVersionedRPCReturned
if (!cntl->_retry_policy || bthread_start_background(&tid, NULL, func, args) != 0) {
func(args);
}
return 0;
}

Expand Down

0 comments on commit 9245098

Please sign in to comment.