Skip to content

Commit

Permalink
[fix](move-memtable) detect repeat stream open due to retry
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Sep 30, 2024
1 parent d4b9c5e commit 5e9ebab
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 0 deletions.
2 changes: 2 additions & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class LoadStream : public brpc::StreamInputHandler {
}
}

void increase_use_count() { _total_streams++; }

void close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/load_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ LoadStreamMgr::~LoadStreamMgr() {
Status LoadStreamMgr::open_load_stream(const POpenLoadStreamRequest* request,
LoadStream*& load_stream) {
UniqueId load_id(request->load_id());
UniqueId conn_id(request->connection_id());

{
std::lock_guard l(_lock);
Expand All @@ -62,6 +63,12 @@ Status LoadStreamMgr::open_load_stream(const POpenLoadStreamRequest* request,
_load_streams_map[load_id] = std::move(p);
}
load_stream->add_source(request->src_id());
if (_known_connections.contains(conn_id)) {
// detected retry open, increase total streams
load_stream->increase_use_count();
} else {
_known_connections.insert(conn_id);
}
}
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/load_stream_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class LoadStreamMgr {
private:
std::mutex _lock;
std::unordered_map<UniqueId, LoadStreamPtr> _load_streams_map;
std::unordered_set<UniqueId> _known_connections;
std::unique_ptr<ThreadPool> _file_writer_thread_pool;

uint32_t _num_threads = 0;
Expand Down
8 changes: 8 additions & 0 deletions be/src/util/uid_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ inline TUniqueId generate_uuid() {
return uid;
}

inline PUniqueId generate_uuid_p() {
auto uuid = boost::uuids::basic_random_generator<boost::mt19937>()();
PUniqueId uid;
uid.set_hi(reinterpret_cast<int64_t*>(uuid.data)[0]);
uid.set_lo(reinterpret_cast<int64_t*>(uuid.data)[1]);
return uid;
}

std::ostream& operator<<(std::ostream& os, const UniqueId& uid);

std::string print_id(const TUniqueId& id);
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
POpenLoadStreamRequest request;
*request.mutable_load_id() = _load_id;
*request.mutable_connection_id() = _connection_id;
request.set_src_id(_src_id);
request.set_txn_id(txn_id);
request.set_enable_profile(enable_profile);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "util/uid_util.h"
#include "vec/columns/column.h"
#include "vec/common/allocator.h"
#include "vec/core/block.h"
Expand Down Expand Up @@ -235,6 +236,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
std::atomic<bool> _is_cancelled;
std::atomic<bool> _is_eos;

PUniqueId _connection_id = generate_uuid_p();
PUniqueId _load_id;
brpc::StreamId _stream_id;
int64_t _src_id = -1; // source backend_id
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ message POpenLoadStreamRequest {
optional bool enable_profile = 6 [default = false];
optional int64 total_streams = 7;
optional int64 idle_timeout_ms = 8;
optional PUniqueId connection_id = 9;
}

message PTabletSchemaWithIndex {
Expand Down

0 comments on commit 5e9ebab

Please sign in to comment.