Skip to content

Commit

Permalink
[fix](load) disable num segments check in compatibility mode
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Sep 20, 2024
1 parent 11dfd19 commit f0333f8
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
9 changes: 7 additions & 2 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ Status TabletStream::close() {
}

DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; });
if (_next_segid.load() != _num_segments) {
if (_check_num_segments && (_next_segid.load() != _num_segments)) {
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
Expand Down Expand Up @@ -380,9 +380,14 @@ void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end()) {
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id());
} else {
tablet_stream = it->second;
}
if (tablet.has_num_segments()) {
tablet_stream->add_num_segments(tablet.num_segments());
} else {
it->second->add_num_segments(tablet.num_segments());
// for compatibility reasons (sink from old version BE)
tablet_stream->disable_num_segments_check();
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class TabletStream {
Status append_data(const PStreamHeader& header, butil::IOBuf* data);
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
void disable_num_segments_check() { _check_num_segments = false; }
Status close();
int64_t id() const { return _id; }

Expand All @@ -65,6 +66,7 @@ class TabletStream {
std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
bool _check_num_segments = true;
bthread::Mutex _lock;
Status _status;
PUniqueId _load_id;
Expand Down

0 comments on commit f0333f8

Please sign in to comment.