Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ namespace config {
CONF_Int32(port, "20001");
// default thrift client connect timeout(in seconds)
CONF_Int32(thrift_connect_timeout_seconds, "3");
// default thrift client retry interval (in milliseconds)
CONF_mInt64(thrift_client_retry_interval_ms, "1000");
// max row count number for single scan range
CONF_mInt32(doris_scan_range_row_count, "524288");
// size of scanner queue between scanner thread and compute thread
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
request.sync = true;
request.commitInfos = std::move(ctx->commit_infos);
request.__isset.commitInfos = true;
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms);

// set attachment if has
TTxnCommitAttachment attachment;
Expand Down
11 changes: 7 additions & 4 deletions be/src/util/thrift_rpc_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "util/thrift_util.h"
#include "util/runtime_profile.h"
#include "runtime/client_cache.h"
#include "monotime.h"

namespace doris {

Expand Down Expand Up @@ -68,8 +69,9 @@ Status ThriftRpcHelper::rpc(
try {
callback(client);
} catch (apache::thrift::transport::TTransportException& e) {
LOG(WARNING) << "retrying call frontend service, address="
<< address << ", reason=" << e.what();
LOG(WARNING) << "retrying call frontend service after " << config::thrift_client_retry_interval_ms
<< " ms, address=" << address << ", reason=" << e.what();
SleepFor(MonoDelta::FromMilliseconds(config::thrift_client_retry_interval_ms));
status = client.reopen(timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "client repoen failed. address=" << address
Expand All @@ -79,10 +81,11 @@ Status ThriftRpcHelper::rpc(
callback(client);
}
} catch (apache::thrift::TException& e) {
LOG(WARNING) << "call frontend service failed, address=" << address
<< ", reason=" << e.what();
SleepFor(MonoDelta::FromMilliseconds(config::thrift_client_retry_interval_ms * 2));
// just reopen to disable this connection
client.reopen(timeout_ms);
LOG(WARNING) << "call frontend service failed, address=" << address
<< ", reason=" << e.what();
return Status::ThriftRpcError("failed to call frontend service");
}
return Status::OK();
Expand Down
6 changes: 6 additions & 0 deletions docs/en/administrator-guide/config/be_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,12 @@ If the system is found to be in a high-stress scenario and a large number of thr

### `tc_use_memory_min`

### `thrift_client_retry_interval_ms`

* Type: int64
* Description: Used to set retry interval for thrift client in be to avoid avalanche disaster in fe thrift server, the unit is ms.
* Default: 1000

### `thrift_connect_timeout_seconds`

### `thrift_rpc_timeout_ms`
Expand Down
6 changes: 6 additions & 0 deletions docs/zh-CN/administrator-guide/config/be_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ under the License.

### `tc_use_memory_min`

### `thrift_client_retry_interval_ms`

* 类型:int64
* 描述:用来为be的thrift客户端设置重试间隔, 避免fe的thrift server发生雪崩问题,单位为ms。
* 默认值:1000

### `thrift_connect_timeout_seconds`

### `thrift_rpc_timeout_ms`
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ public class Config extends ConfigBase {
/**
* The number of query retries.
* A query may retry if we encounter RPC exception and no result has been sent to user.
* You may reduce this number to void Avalanche disaster.
* You may reduce this number to avoid Avalanche disaster.
*/
@ConfField(mutable = true)
public static int max_query_retry_time = 2;
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/common/ThriftServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private void createSimpleServer() throws TTransportException {

private void createThreadedServer() throws TTransportException {
TThreadedSelectorServer.Args args =
new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port)).protocolFactory(
new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port, Config.thrift_client_timeout_ms)).protocolFactory(
new TBinaryProtocol.Factory()).processor(processor);
ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool");
args.executorService(threadPoolExecutor);
Expand Down