Skip to content

Commit

Permalink
[improve](move-memtable) reuse connection in load_stream_stub (apache…
Browse files Browse the repository at this point in the history
…#39231)

Reuse connection in `LoadStreamStub::open()` and hopefully fix the
"Connection reset by peer" issue under heavy load.
  • Loading branch information
kaijchen committed Aug 22, 2024
1 parent ca9e50e commit 8053dc1
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 17 deletions.
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ class ExecEnv {
BrpcClientCache<PBackendService_Stub>* brpc_internal_client_cache() const {
return _internal_client_cache;
}
BrpcClientCache<PBackendService_Stub>* brpc_streaming_client_cache() const {
return _streaming_client_cache;
}
BrpcClientCache<PFunctionService_Stub>* brpc_function_client_cache() const {
return _function_client_cache;
}
Expand Down Expand Up @@ -392,6 +395,7 @@ class ExecEnv {
// TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle.
std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
BrpcClientCache<PBackendService_Stub>* _streaming_client_cache = nullptr;
BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;

std::shared_ptr<StreamLoadExecutor> _stream_load_executor;
Expand Down
7 changes: 5 additions & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
auto num_flush_threads = _store_paths.size() * config::flush_thread_num_per_store;
_load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads);
_new_load_stream_mgr = NewLoadStreamMgr::create_shared();
_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
_streaming_client_cache =
new BrpcClientCache<PBackendService_Stub>("baidu_std", "single", "streaming");
_function_client_cache =
new BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol);
_stream_load_executor = StreamLoadExecutor::create_shared(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
RETURN_IF_ERROR(_routine_load_task_executor->init());
Expand Down Expand Up @@ -628,6 +630,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_routine_load_task_executor);
// _stream_load_executor
SAFE_DELETE(_function_client_cache);
SAFE_DELETE(_streaming_client_cache);
SAFE_DELETE(_internal_client_cache);

SAFE_DELETE(_bfd_parser);
Expand Down
22 changes: 19 additions & 3 deletions be/src/util/brpc_client_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@

namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_endpoint_stub_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_stream_endpoint_stub_count, MetricUnit::NOUNIT);

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_function_endpoint_stub_count, MetricUnit::NOUNIT);

template <>
BrpcClientCache<PBackendService_Stub>::BrpcClientCache() {
REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return _stub_map.size(); });
BrpcClientCache<PBackendService_Stub>::BrpcClientCache(std::string protocol,
std::string connection_type,
std::string connection_group)
: _protocol(protocol),
_connection_type(connection_type),
_connection_group(connection_group) {
if (connection_group == "streaming") {
REGISTER_HOOK_METRIC(brpc_stream_endpoint_stub_count,
[this]() { return _stub_map.size(); });
} else {
REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return _stub_map.size(); });
}
}

template <>
Expand All @@ -39,7 +50,12 @@ BrpcClientCache<PBackendService_Stub>::~BrpcClientCache() {
}

template <>
BrpcClientCache<PFunctionService_Stub>::BrpcClientCache() {
BrpcClientCache<PFunctionService_Stub>::BrpcClientCache(std::string protocol,
std::string connection_type,
std::string connection_group)
: _protocol(protocol),
_connection_type(connection_type),
_connection_group(connection_group) {
REGISTER_HOOK_METRIC(brpc_function_endpoint_stub_count, [this]() { return _stub_map.size(); });
}

Expand Down
24 changes: 16 additions & 8 deletions be/src/util/brpc_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ namespace doris {
template <class T>
class BrpcClientCache {
public:
BrpcClientCache();
BrpcClientCache(std::string protocol = "baidu_std", std::string connection_type = "",
std::string connection_group = "");
virtual ~BrpcClientCache();

std::shared_ptr<T> get_client(const butil::EndPoint& endpoint) {
Expand Down Expand Up @@ -110,20 +111,24 @@ class BrpcClientCache {
}

std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
const std::string& protocol = "baidu_std",
const std::string& connect_type = "",
const std::string& protocol = "",
const std::string& connection_type = "",
const std::string& connection_group = "") {
brpc::ChannelOptions options;
if constexpr (std::is_same_v<T, PFunctionService_Stub>) {
options.protocol = config::function_service_protocol;
} else {
if (protocol != "") {
options.protocol = protocol;
} else if (_protocol != "") {
options.protocol = _protocol;
}
if (connect_type != "") {
options.connection_type = connect_type;
if (connection_type != "") {
options.connection_type = connection_type;
} else if (_connection_type != "") {
options.connection_type = _connection_type;
}
if (connection_group != "") {
options.connection_group = connection_group;
} else if (_connection_group != "") {
options.connection_group = _connection_group;
}
options.connect_timeout_ms = 2000;
options.timeout_ms = 2000;
Expand Down Expand Up @@ -204,6 +209,9 @@ class BrpcClientCache {

private:
StubMap<T> _stub_map;
const std::string _protocol;
const std::string _connection_type;
const std::string _connection_group;
};

using InternalServiceClientCache = BrpcClientCache<PBackendService_Stub>;
Expand Down
1 change: 1 addition & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class DorisMetrics {
UIntGauge* stream_load_pipe_count = nullptr;
UIntGauge* new_stream_load_pipe_count = nullptr;
UIntGauge* brpc_endpoint_stub_count = nullptr;
UIntGauge* brpc_stream_endpoint_stub_count = nullptr;
UIntGauge* brpc_function_endpoint_stub_count = nullptr;
UIntGauge* tablet_writer_count = nullptr;

Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
}
POpenLoadStreamResponse response;
// set connection_group "streaming" to distinguish with non-streaming connections
const auto& stub =
client_cache->get_new_client_no_cache(host_port, "baidu_std", "single", "streaming");
const auto& stub = client_cache->get_client(host_port);
stub->open_load_stream(&cntl, &request, &response, nullptr);
for (const auto& resp : response.tablet_schemas()) {
auto tablet_schema = std::make_unique<TabletSchema>();
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,13 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream
// get tablet schema from each backend only in the 1st stream
for (auto& stream : streams | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info,
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info,
_txn_id, *_schema, tablets_for_schema, _total_streams,
idle_timeout_ms, _state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : streams | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info,
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info,
_txn_id, *_schema, {}, _total_streams, idle_timeout_ms,
_state->enable_profile()));
}
Expand Down

0 comments on commit 8053dc1

Please sign in to comment.