diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 3b649c688355fe..aef3ab3233163a 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -126,6 +126,8 @@ class LoadStream : public brpc::StreamInputHandler { } } + void increase_use_count() { _total_streams++; } + void close(int64_t src_id, const std::vector& tablets_to_commit, std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); diff --git a/be/src/runtime/load_stream_mgr.cpp b/be/src/runtime/load_stream_mgr.cpp index 67739a0c0b0c30..0c231284f019e5 100644 --- a/be/src/runtime/load_stream_mgr.cpp +++ b/be/src/runtime/load_stream_mgr.cpp @@ -48,6 +48,7 @@ LoadStreamMgr::~LoadStreamMgr() { Status LoadStreamMgr::open_load_stream(const POpenLoadStreamRequest* request, LoadStream*& load_stream) { UniqueId load_id(request->load_id()); + UniqueId conn_id(request->connection_id()); { std::lock_guard l(_lock); @@ -62,6 +63,12 @@ Status LoadStreamMgr::open_load_stream(const POpenLoadStreamRequest* request, _load_streams_map[load_id] = std::move(p); } load_stream->add_source(request->src_id()); + if (_known_connections.contains(conn_id)) { + // detected retry open, increase total streams + load_stream->increase_use_count(); + } else { + _known_connections.insert(conn_id); + } } return Status::OK(); } diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h index 45abd9c8470b5c..7a1bc5558546ee 100644 --- a/be/src/runtime/load_stream_mgr.h +++ b/be/src/runtime/load_stream_mgr.h @@ -68,6 +68,7 @@ class LoadStreamMgr { private: std::mutex _lock; std::unordered_map _load_streams_map; + std::unordered_set _known_connections; std::unique_ptr _file_writer_thread_pool; uint32_t _num_threads = 0; diff --git a/be/src/util/uid_util.h b/be/src/util/uid_util.h index 5c0b5fb72fb3cd..3a50345ff2d625 100644 --- a/be/src/util/uid_util.h +++ b/be/src/util/uid_util.h @@ -166,6 +166,14 @@ inline TUniqueId generate_uuid() { return uid; } +inline PUniqueId generate_uuid_p() { + auto uuid = boost::uuids::basic_random_generator()(); + PUniqueId uid; + uid.set_hi(reinterpret_cast(uuid.data)[0]); + uid.set_lo(reinterpret_cast(uuid.data)[1]); + return uid; +} + std::ostream& operator<<(std::ostream& os, const UniqueId& uid); std::string print_id(const TUniqueId& id); diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index e3899ce7743f3b..0176bfee2d9f3b 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -166,6 +166,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, cntl.set_timeout_ms(config::open_load_stream_timeout_ms); POpenLoadStreamRequest request; *request.mutable_load_id() = _load_id; + *request.mutable_connection_id() = _connection_id; request.set_src_id(_src_id); request.set_txn_id(txn_id); request.set_enable_profile(enable_profile); diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index e83cbf24c690db..470b68281c2dc5 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -62,6 +62,7 @@ #include "util/debug_points.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" +#include "util/uid_util.h" #include "vec/columns/column.h" #include "vec/common/allocator.h" #include "vec/core/block.h" @@ -235,6 +236,7 @@ class LoadStreamStub : public std::enable_shared_from_this { std::atomic _is_cancelled; std::atomic _is_eos; + PUniqueId _connection_id = generate_uuid_p(); PUniqueId _load_id; brpc::StreamId _stream_id; int64_t _src_id = -1; // source backend_id diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 9abf9d7ea65036..76fbf9137acbad 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -874,6 +874,7 @@ message POpenLoadStreamRequest { optional bool enable_profile = 6 [default = false]; optional int64 total_streams = 7; optional int64 idle_timeout_ms = 8; + optional PUniqueId connection_id = 9; } message PTabletSchemaWithIndex {