From 8053dc1d20a0f76e14a8e3e4d22597a56aafc9c4 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 14 Aug 2024 16:25:19 +0800 Subject: [PATCH] [improve](move-memtable) reuse connection in load_stream_stub (#39231) Reuse connection in `LoadStreamStub::open()` and hopefully fix the "Connection reset by peer" issue under heavy load. --- be/src/runtime/exec_env.h | 4 ++++ be/src/runtime/exec_env_init.cpp | 7 ++++-- be/src/util/brpc_client_cache.cpp | 22 +++++++++++++++--- be/src/util/brpc_client_cache.h | 24 +++++++++++++------- be/src/util/doris_metrics.h | 1 + be/src/vec/sink/load_stream_stub.cpp | 3 +-- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 ++-- 7 files changed, 48 insertions(+), 17 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 6ddb87eaf6c64f..1f59c4b1a12be8 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -215,6 +215,9 @@ class ExecEnv { BrpcClientCache* brpc_internal_client_cache() const { return _internal_client_cache; } + BrpcClientCache* brpc_streaming_client_cache() const { + return _streaming_client_cache; + } BrpcClientCache* brpc_function_client_cache() const { return _function_client_cache; } @@ -392,6 +395,7 @@ class ExecEnv { // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle. std::shared_ptr _new_load_stream_mgr; BrpcClientCache* _internal_client_cache = nullptr; + BrpcClientCache* _streaming_client_cache = nullptr; BrpcClientCache* _function_client_cache = nullptr; std::shared_ptr _stream_load_executor; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index fa2cef6e31ded1..6c287e1a8daf14 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -228,8 +228,10 @@ Status ExecEnv::_init(const std::vector& store_paths, auto num_flush_threads = _store_paths.size() * config::flush_thread_num_per_store; _load_stream_mgr = std::make_unique(num_flush_threads); _new_load_stream_mgr = NewLoadStreamMgr::create_shared(); - _internal_client_cache = new BrpcClientCache(); - _function_client_cache = new BrpcClientCache(); + _streaming_client_cache = + new BrpcClientCache("baidu_std", "single", "streaming"); + _function_client_cache = + new BrpcClientCache(config::function_service_protocol); _stream_load_executor = StreamLoadExecutor::create_shared(this); _routine_load_task_executor = new RoutineLoadTaskExecutor(this); RETURN_IF_ERROR(_routine_load_task_executor->init()); @@ -628,6 +630,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_routine_load_task_executor); // _stream_load_executor SAFE_DELETE(_function_client_cache); + SAFE_DELETE(_streaming_client_cache); SAFE_DELETE(_internal_client_cache); SAFE_DELETE(_bfd_parser); diff --git a/be/src/util/brpc_client_cache.cpp b/be/src/util/brpc_client_cache.cpp index b9135e8014dc7d..c5a6488787879b 100644 --- a/be/src/util/brpc_client_cache.cpp +++ b/be/src/util/brpc_client_cache.cpp @@ -25,12 +25,23 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_endpoint_stub_count, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_stream_endpoint_stub_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_function_endpoint_stub_count, MetricUnit::NOUNIT); template <> -BrpcClientCache::BrpcClientCache() { - REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return _stub_map.size(); }); +BrpcClientCache::BrpcClientCache(std::string protocol, + std::string connection_type, + std::string connection_group) + : _protocol(protocol), + _connection_type(connection_type), + _connection_group(connection_group) { + if (connection_group == "streaming") { + REGISTER_HOOK_METRIC(brpc_stream_endpoint_stub_count, + [this]() { return _stub_map.size(); }); + } else { + REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return _stub_map.size(); }); + } } template <> @@ -39,7 +50,12 @@ BrpcClientCache::~BrpcClientCache() { } template <> -BrpcClientCache::BrpcClientCache() { +BrpcClientCache::BrpcClientCache(std::string protocol, + std::string connection_type, + std::string connection_group) + : _protocol(protocol), + _connection_type(connection_type), + _connection_group(connection_group) { REGISTER_HOOK_METRIC(brpc_function_endpoint_stub_count, [this]() { return _stub_map.size(); }); } diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index ebef80f4a6bdfb..09c92fb398e085 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -59,7 +59,8 @@ namespace doris { template class BrpcClientCache { public: - BrpcClientCache(); + BrpcClientCache(std::string protocol = "baidu_std", std::string connection_type = "", + std::string connection_group = ""); virtual ~BrpcClientCache(); std::shared_ptr get_client(const butil::EndPoint& endpoint) { @@ -110,20 +111,24 @@ class BrpcClientCache { } std::shared_ptr get_new_client_no_cache(const std::string& host_port, - const std::string& protocol = "baidu_std", - const std::string& connect_type = "", + const std::string& protocol = "", + const std::string& connection_type = "", const std::string& connection_group = "") { brpc::ChannelOptions options; - if constexpr (std::is_same_v) { - options.protocol = config::function_service_protocol; - } else { + if (protocol != "") { options.protocol = protocol; + } else if (_protocol != "") { + options.protocol = _protocol; } - if (connect_type != "") { - options.connection_type = connect_type; + if (connection_type != "") { + options.connection_type = connection_type; + } else if (_connection_type != "") { + options.connection_type = _connection_type; } if (connection_group != "") { options.connection_group = connection_group; + } else if (_connection_group != "") { + options.connection_group = _connection_group; } options.connect_timeout_ms = 2000; options.timeout_ms = 2000; @@ -204,6 +209,9 @@ class BrpcClientCache { private: StubMap _stub_map; + const std::string _protocol; + const std::string _connection_type; + const std::string _connection_group; }; using InternalServiceClientCache = BrpcClientCache; diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 513ef91723fc30..567efdc9ae5204 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -176,6 +176,7 @@ class DorisMetrics { UIntGauge* stream_load_pipe_count = nullptr; UIntGauge* new_stream_load_pipe_count = nullptr; UIntGauge* brpc_endpoint_stub_count = nullptr; + UIntGauge* brpc_stream_endpoint_stub_count = nullptr; UIntGauge* brpc_function_endpoint_stub_count = nullptr; UIntGauge* tablet_writer_count = nullptr; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index dc34b13e0ac155..c535f03214f685 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -183,8 +183,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, } POpenLoadStreamResponse response; // set connection_group "streaming" to distinguish with non-streaming connections - const auto& stub = - client_cache->get_new_client_no_cache(host_port, "baidu_std", "single", "streaming"); + const auto& stub = client_cache->get_client(host_port); stub->open_load_stream(&cntl, &request, &response, nullptr); for (const auto& resp : response.tablet_schemas()) { auto tablet_schema = std::make_unique(); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index fecbd324c57aaa..6013e31609f7b7 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -281,13 +281,13 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream // get tablet schema from each backend only in the 1st stream for (auto& stream : streams | std::ranges::views::take(1)) { const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, + RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id, *_schema, tablets_for_schema, _total_streams, idle_timeout_ms, _state->enable_profile())); } // for the rest streams, open without getting tablet schema for (auto& stream : streams | std::ranges::views::drop(1)) { - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, + RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id, *_schema, {}, _total_streams, idle_timeout_ms, _state->enable_profile())); }