diff --git a/be/src/common/config.h b/be/src/common/config.h index ddb4ede67b89ab..fba6c862af1f0a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -184,6 +184,8 @@ namespace config { CONF_Int32(port, "20001"); // default thrift client connect timeout(in seconds) CONF_Int32(thrift_connect_timeout_seconds, "3"); + // default thrift client retry interval (in milliseconds) + CONF_mInt64(thrift_client_retry_interval_ms, "1000"); // max row count number for single scan range CONF_mInt32(doris_scan_range_row_count, "524288"); // size of scanner queue between scanner thread and compute thread diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index fe8db2fb4b3101..5f1e2f4fe4a11c 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -171,7 +171,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { request.sync = true; request.commitInfos = std::move(ctx->commit_infos); request.__isset.commitInfos = true; - request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); + request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms); // set attachment if has TTxnCommitAttachment attachment; diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp index d47d0a4337cd2d..6bd750bb1f41c3 100644 --- a/be/src/util/thrift_rpc_helper.cpp +++ b/be/src/util/thrift_rpc_helper.cpp @@ -34,6 +34,7 @@ #include "util/thrift_util.h" #include "util/runtime_profile.h" #include "runtime/client_cache.h" +#include "monotime.h" namespace doris { @@ -68,8 +69,9 @@ Status ThriftRpcHelper::rpc( try { callback(client); } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "retrying call frontend service, address=" - << address << ", reason=" << e.what(); + LOG(WARNING) << "retrying call frontend service after " << config::thrift_client_retry_interval_ms + << " ms, address=" << address << ", reason=" << e.what(); + SleepFor(MonoDelta::FromMilliseconds(config::thrift_client_retry_interval_ms)); status = client.reopen(timeout_ms); if (!status.ok()) { LOG(WARNING) << "client repoen failed. address=" << address @@ -79,10 +81,11 @@ Status ThriftRpcHelper::rpc( callback(client); } } catch (apache::thrift::TException& e) { + LOG(WARNING) << "call frontend service failed, address=" << address + << ", reason=" << e.what(); + SleepFor(MonoDelta::FromMilliseconds(config::thrift_client_retry_interval_ms * 2)); // just reopen to disable this connection client.reopen(timeout_ms); - LOG(WARNING) << "call frontend service failed, address=" << address - << ", reason=" << e.what(); return Status::ThriftRpcError("failed to call frontend service"); } return Status::OK(); diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index ae4cf45a7176f2..0486efe64a4e5b 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -468,6 +468,12 @@ If the system is found to be in a high-stress scenario and a large number of thr ### `tc_use_memory_min` +### `thrift_client_retry_interval_ms` + +* Type: int64 +* Description: Used to set retry interval for thrift client in be to avoid avalanche disaster in fe thrift server, the unit is ms. +* Default: 1000 + ### `thrift_connect_timeout_seconds` ### `thrift_rpc_timeout_ms` diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index de57375734ea6c..fd75d883160ad0 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -415,6 +415,12 @@ under the License. ### `tc_use_memory_min` +### `thrift_client_retry_interval_ms` + +* 类型:int64 +* 描述:用来为be的thrift客户端设置重试间隔, 避免fe的thrift server发生雪崩问题,单位为ms。 +* 默认值:1000 + ### `thrift_connect_timeout_seconds` ### `thrift_rpc_timeout_ms` diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 744700872eaff0..090c56d54f3bcd 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -864,7 +864,7 @@ public class Config extends ConfigBase { /** * The number of query retries. * A query may retry if we encounter RPC exception and no result has been sent to user. - * You may reduce this number to void Avalanche disaster. + * You may reduce this number to avoid Avalanche disaster. */ @ConfField(mutable = true) public static int max_query_retry_time = 2; diff --git a/fe/src/main/java/org/apache/doris/common/ThriftServer.java b/fe/src/main/java/org/apache/doris/common/ThriftServer.java index 3c293dd1fa2704..9caab7bb72c36f 100644 --- a/fe/src/main/java/org/apache/doris/common/ThriftServer.java +++ b/fe/src/main/java/org/apache/doris/common/ThriftServer.java @@ -98,7 +98,7 @@ private void createSimpleServer() throws TTransportException { private void createThreadedServer() throws TTransportException { TThreadedSelectorServer.Args args = - new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port)).protocolFactory( + new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port, Config.thrift_client_timeout_ms)).protocolFactory( new TBinaryProtocol.Factory()).processor(processor); ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool"); args.executorService(threadPoolExecutor);