From 655d14d924df0ca04857111a83cbc1a1d7fa45ea Mon Sep 17 00:00:00 2001 From: caiconghui Date: Sun, 5 Jul 2020 18:01:03 +0800 Subject: [PATCH 1/3] Add thrift_client_retry_interval_ms for thrift client to avoid avalanche disaster in fe thrift server --- be/src/common/config.h | 2 ++ be/src/runtime/stream_load/stream_load_executor.cpp | 2 +- be/src/util/thrift_rpc_helper.cpp | 9 ++++++--- docs/en/administrator-guide/config/be_config.md | 6 ++++++ docs/zh-CN/administrator-guide/config/be_config.md | 6 ++++++ fe/src/main/java/org/apache/doris/common/Config.java | 2 +- .../main/java/org/apache/doris/common/ThriftServer.java | 3 ++- 7 files changed, 24 insertions(+), 6 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index ddb4ede67b89ab..fba6c862af1f0a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -184,6 +184,8 @@ namespace config { CONF_Int32(port, "20001"); // default thrift client connect timeout(in seconds) CONF_Int32(thrift_connect_timeout_seconds, "3"); + // default thrift client retry interval (in milliseconds) + CONF_mInt64(thrift_client_retry_interval_ms, "1000"); // max row count number for single scan range CONF_mInt32(doris_scan_range_row_count, "524288"); // size of scanner queue between scanner thread and compute thread diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index fe8db2fb4b3101..5f1e2f4fe4a11c 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -171,7 +171,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { request.sync = true; request.commitInfos = std::move(ctx->commit_infos); request.__isset.commitInfos = true; - request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); + request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms); // set attachment if has TTxnCommitAttachment attachment; diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp index d47d0a4337cd2d..1a6679e39950fa 100644 --- a/be/src/util/thrift_rpc_helper.cpp +++ b/be/src/util/thrift_rpc_helper.cpp @@ -34,6 +34,7 @@ #include "util/thrift_util.h" #include "util/runtime_profile.h" #include "runtime/client_cache.h" +#include "monotime.h" namespace doris { @@ -68,8 +69,9 @@ Status ThriftRpcHelper::rpc( try { callback(client); } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "retrying call frontend service, address=" + LOG(WARNING) << "retrying call frontend service after " << 1 << " ms, address=" << address << ", reason=" << e.what(); + SleepFor(MonoDelta::FromMilliseconds(config::thrift_client_retry_interval_ms)); status = client.reopen(timeout_ms); if (!status.ok()) { LOG(WARNING) << "client repoen failed. address=" << address @@ -79,10 +81,11 @@ Status ThriftRpcHelper::rpc( callback(client); } } catch (apache::thrift::TException& e) { + LOG(WARNING) << "call frontend service failed, address=" << address + << ", reason=" << e.what(); + SleepFor(MonoDelta::FromMilliseconds(config::thrift_client_retry_interval_ms * 2)); // just reopen to disable this connection client.reopen(timeout_ms); - LOG(WARNING) << "call frontend service failed, address=" << address - << ", reason=" << e.what(); return Status::ThriftRpcError("failed to call frontend service"); } return Status::OK(); diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index ae4cf45a7176f2..0486efe64a4e5b 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -468,6 +468,12 @@ If the system is found to be in a high-stress scenario and a large number of thr ### `tc_use_memory_min` +### `thrift_client_retry_interval_ms` + +* Type: int64 +* Description: Used to set retry interval for thrift client in be to avoid avalanche disaster in fe thrift server, the unit is ms. +* Default: 1000 + ### `thrift_connect_timeout_seconds` ### `thrift_rpc_timeout_ms` diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index de57375734ea6c..fd75d883160ad0 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -415,6 +415,12 @@ under the License. ### `tc_use_memory_min` +### `thrift_client_retry_interval_ms` + +* 类型:int64 +* 描述:用来为be的thrift客户端设置重试间隔, 避免fe的thrift server发生雪崩问题,单位为ms。 +* 默认值:1000 + ### `thrift_connect_timeout_seconds` ### `thrift_rpc_timeout_ms` diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 744700872eaff0..090c56d54f3bcd 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -864,7 +864,7 @@ public class Config extends ConfigBase { /** * The number of query retries. * A query may retry if we encounter RPC exception and no result has been sent to user. - * You may reduce this number to void Avalanche disaster. + * You may reduce this number to avoid Avalanche disaster. */ @ConfField(mutable = true) public static int max_query_retry_time = 2; diff --git a/fe/src/main/java/org/apache/doris/common/ThriftServer.java b/fe/src/main/java/org/apache/doris/common/ThriftServer.java index 3c293dd1fa2704..a64e23f11818d3 100644 --- a/fe/src/main/java/org/apache/doris/common/ThriftServer.java +++ b/fe/src/main/java/org/apache/doris/common/ThriftServer.java @@ -21,6 +21,7 @@ import com.google.common.collect.Sets; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TProcessor; @@ -98,7 +99,7 @@ private void createSimpleServer() throws TTransportException { private void createThreadedServer() throws TTransportException { TThreadedSelectorServer.Args args = - new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port)).protocolFactory( + new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port, Config.thrift_client_timeout_ms)).protocolFactory( new TBinaryProtocol.Factory()).processor(processor); ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool"); args.executorService(threadPoolExecutor); From 49cd3a59f4eec1409e5ab241adc227a5b7d3cbad Mon Sep 17 00:00:00 2001 From: caiconghui Date: Sun, 5 Jul 2020 18:26:30 +0800 Subject: [PATCH 2/3] remove unused import in ThriftServer.java --- fe/src/main/java/org/apache/doris/common/ThriftServer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/common/ThriftServer.java b/fe/src/main/java/org/apache/doris/common/ThriftServer.java index a64e23f11818d3..9caab7bb72c36f 100644 --- a/fe/src/main/java/org/apache/doris/common/ThriftServer.java +++ b/fe/src/main/java/org/apache/doris/common/ThriftServer.java @@ -21,7 +21,6 @@ import com.google.common.collect.Sets; -import org.apache.hadoop.hdfs.DFSClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TProcessor; From d89fedc63ed01ba3b99875155d0869161c7c0089 Mon Sep 17 00:00:00 2001 From: caiconghui Date: Mon, 6 Jul 2020 00:43:58 +0800 Subject: [PATCH 3/3] fix by review --- be/src/util/thrift_rpc_helper.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp index 1a6679e39950fa..6bd750bb1f41c3 100644 --- a/be/src/util/thrift_rpc_helper.cpp +++ b/be/src/util/thrift_rpc_helper.cpp @@ -69,8 +69,8 @@ Status ThriftRpcHelper::rpc( try { callback(client); } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "retrying call frontend service after " << 1 << " ms, address=" - << address << ", reason=" << e.what(); + LOG(WARNING) << "retrying call frontend service after " << config::thrift_client_retry_interval_ms + << " ms, address=" << address << ", reason=" << e.what(); SleepFor(MonoDelta::FromMilliseconds(config::thrift_client_retry_interval_ms)); status = client.reopen(timeout_ms); if (!status.ok()) {