diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index a382420f5ca079..be092ca51490f3 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -63,8 +63,9 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir, _last_base_compaction_success_millis(0), _cumulative_point(K_INVALID_CUMULATIVE_POINT), _cumulative_compaction_type(cumulative_compaction_type) { - // change _rs_graph to _timestamped_version_tracker - _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas()); + // construct _timestamped_versioned_tracker from rs and stale rs meta + _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas(), + _tablet_meta->all_stale_rs_metas()); } OLAPStatus Tablet::_init_once_action() { @@ -108,6 +109,20 @@ OLAPStatus Tablet::_init_once_action() { _inc_rs_version_map[version] = std::move(rowset); } + // init stale rowset + for (auto& stale_rs_meta : _tablet_meta->all_stale_rs_metas()) { + Version version = stale_rs_meta->version(); + RowsetSharedPtr rowset; + res = RowsetFactory::create_rowset(&_schema, _tablet_path, stale_rs_meta, &rowset); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "fail to init stale rowset. tablet_id:" << tablet_id() + << ", schema_hash:" << schema_hash() << ", version=" << version + << ", res:" << res; + return res; + } + _stale_rs_version_map[version] = std::move(rowset); + } + return res; } @@ -276,6 +291,15 @@ const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version) cons return iter->second; } +const RowsetSharedPtr Tablet::get_stale_rowset_by_version(const Version& version) const { + auto iter = _stale_rs_version_map.find(version); + if (iter == _stale_rs_version_map.end()) { + VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name(); + return nullptr; + } + return iter->second; +} + // This function only be called by SnapshotManager to perform incremental clone. // It will be called under protected of _meta_lock(SnapshotManager will fetch it manually), // so it is no need to lock here. @@ -895,6 +919,12 @@ bool Tablet::check_path(const std::string& path_to_check) const { return true; } } + for (auto& stale_version_rowset : _stale_rs_version_map) { + bool ret = stale_version_rowset.second->check_path(path_to_check); + if (ret) { + return true; + } + } return false; } @@ -919,6 +949,11 @@ bool Tablet::check_rowset_id(const RowsetId& rowset_id) { return true; } } + for (auto& stale_version_rowset : _stale_rs_version_map) { + if (stale_version_rowset.second->rowset_id() == rowset_id) { + return true; + } + } if (RowsetMetaManager::check_rowset_meta(_data_dir->get_meta(), tablet_uid(), rowset_id)) { return true; } @@ -1120,6 +1155,14 @@ bool Tablet::rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta) { find_version = true; } } + for (auto& stale_version_rowset : _stale_rs_version_map) { + if (stale_version_rowset.second->rowset_id() == rowset_meta->rowset_id()) { + find_rowset_id = true; + } + if (stale_version_rowset.second->contains_version(rowset_meta->version())) { + find_version = true; + } + } return find_rowset_id || !find_version; } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index e8372fb5c51603..4f1ba22b928087 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -100,6 +100,7 @@ class Tablet : public BaseTablet { // The caller must call hold _meta_lock when call this two function. const RowsetSharedPtr get_rowset_by_version(const Version& version) const; const RowsetSharedPtr get_inc_rowset_by_version(const Version& version) const; + const RowsetSharedPtr get_stale_rowset_by_version(const Version& version) const; const RowsetSharedPtr rowset_with_max_version() const; diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 5fb25ce27dfcf1..15801facbdf808 100755 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -381,6 +381,12 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { _inc_rs_metas.push_back(std::move(rs_meta)); } + for (auto& it : tablet_meta_pb.stale_rs_metas()) { + RowsetMetaSharedPtr rs_meta(new AlphaRowsetMeta()); + rs_meta->init_from_pb(it); + _stale_rs_metas.push_back(std::move(rs_meta)); + } + // generate AlterTabletTask if (tablet_meta_pb.has_alter_task()) { AlterTabletTask* alter_tablet_task = new AlterTabletTask(); @@ -431,6 +437,9 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { for (auto rs : _inc_rs_metas) { rs->to_rowset_pb(tablet_meta_pb->add_inc_rs_metas()); } + for (auto rs : _stale_rs_metas) { + rs->to_rowset_pb(tablet_meta_pb->add_stale_rs_metas()); + } _schema.to_schema_pb(tablet_meta_pb->mutable_schema()); if (_alter_task != nullptr) { _alter_task->to_alter_pb(tablet_meta_pb->mutable_alter_task()); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 8ae8ccdd144b91..2058d72dff107e 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -171,8 +171,8 @@ class TabletMeta { const std::vector& to_delete); void revise_rs_metas(std::vector&& rs_metas); - void revise_inc_rs_metas(std::vector&& rs_metas); + inline const std::vector& all_inc_rs_metas() const; inline const std::vector& all_stale_rs_metas() const; OLAPStatus add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta); diff --git a/be/src/olap/version_graph.cpp b/be/src/olap/version_graph.cpp index a64153c7e23ec4..f397dce7f9c335 100644 --- a/be/src/olap/version_graph.cpp +++ b/be/src/olap/version_graph.cpp @@ -29,7 +29,7 @@ namespace doris { void TimestampedVersionTracker::_construct_versioned_tracker(const std::vector& rs_metas) { int64_t max_version = 0; - // construct the roset graph + // construct the rowset graph _version_graph.reconstruct_version_graph(rs_metas, &max_version); } @@ -43,6 +43,167 @@ void TimestampedVersionTracker::construct_versioned_tracker(const std::vector& rs_metas, + const std::vector& stale_metas) { + + if (rs_metas.empty()) { + VLOG(3) << "there is no version in the header."; + return; + } + _stale_version_path_map.clear(); + _next_path_id = 1; + _construct_versioned_tracker(rs_metas); + + // init _stale_version_path_map + _init_stale_version_path_map(rs_metas, stale_metas); +} + +void TimestampedVersionTracker::_init_stale_version_path_map( + const std::vector& rs_metas, + const std::vector& stale_metas) { + + if (stale_metas.empty()) { + return; + } + + // sort stale meta by version diff (second version - first version) + std::list sorted_stale_metas; + for (auto& rs : stale_metas) { + sorted_stale_metas.emplace_back(rs); + } + + // 1. Sort the existing rowsets by version in ascending order + sorted_stale_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) { + // compare by version diff between version.first and version.second + int64_t a_diff = a->version().second - a->version().first; + int64_t b_diff = b->version().second - b->version().first; + + int diff = a_diff - b_diff; + if (diff < 0) { + return true; + } + else if (diff > 0) { + return false; + } + // when the version diff is equal, compare rowset createtime + return a->creation_time() < b->creation_time(); + }); + + // first_version -> (second_version -> rowset_meta) + std::unordered_map> stale_map; + + // 2. generate stale path from stale_metas. traverse sorted_stale_metas and each time add stale_meta to stale_map. + // when a stale path in stale_map can replace stale_meta in sorted_stale_metas, stale_map remove rowset_metas of a stale path + // and add the path to _stale_version_path_map. + for(auto& stale_meta:sorted_stale_metas) { + std::vector stale_path; + // 2.1 find a path in stale_map can replace current stale_meta version + bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(), stale_meta->end_version(), &stale_path); + + // 2.2 add stale_meta to stale_map + auto start_iter = stale_map.find(stale_meta->start_version()); + if (start_iter != stale_map.end()) { + start_iter->second[stale_meta->end_version()] = stale_meta; + } else { + std::unordered_map item; + item[stale_meta->end_version()] = stale_meta; + stale_map[stale_meta->start_version()] = std::move(item); + } + // 2.3 add version to version_graph + Version stale_meta_version = stale_meta->version(); + add_version(stale_meta_version); + // 2.4 find the path + if (r) { + // add the path to _stale_version_path_map + add_stale_path_version(stale_path); + // remove stale_path from stale_map + for (auto stale_item:stale_path) { + stale_map[stale_item->start_version()].erase(stale_item->end_version()); + + if (stale_map[stale_item->start_version()].empty()) { + stale_map.erase(stale_item->start_version()); + } + } + } + } + + // 3. generate stale path from rs_metas + for(auto& stale_meta:rs_metas) { + std::vector stale_path; + // 3.1 find a path in stale_map can replace current stale_meta version + bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(), stale_meta->end_version(), &stale_path); + + // 3.2 find the path + if (r) { + // add the path to _stale_version_path_map + add_stale_path_version(stale_path); + // remove stale_path from stale_map + for (auto stale_item:stale_path) { + stale_map[stale_item->start_version()].erase(stale_item->end_version()); + + if (stale_map[stale_item->start_version()].empty()) { + stale_map.erase(stale_item->start_version()); + } + } + } + } + + // 4. process remain stale rowset_meta in stale_map + auto map_iter = stale_map.begin(); + while (map_iter != stale_map.end()) { + auto second_iter = map_iter->second.begin(); + while(second_iter != map_iter->second.end()) { + // each remain stale rowset_meta generate a stale path + std::vector stale_path; + stale_path.push_back(second_iter->second); + add_stale_path_version(stale_path); + + second_iter++; + } + map_iter++; + } +} + +bool TimestampedVersionTracker::_find_path_from_stale_map( + const std::unordered_map>& stale_map, + int64_t first_version, int64_t second_version, std::vector* stale_path) { + + auto first_iter = stale_map.find(first_version); + // if first_version not in stale_map, there is no path. + if (first_iter == stale_map.end()) { + return false; + } + auto& second_version_map = first_iter->second; + auto second_iter = second_version_map.find(second_version); + // if second_version in stale_map, find a path. + if (second_iter != second_version_map.end()) { + auto row_meta = second_iter->second; + // add rowset to path + stale_path->push_back(row_meta); + return true; + } + + // traverse the first version map to backtracking _find_path_from_stale_map + auto map_iter = second_version_map.begin(); + while (map_iter != second_version_map.end()) { + // the version greater than second_version, we can't find path in stale_map + if (map_iter->first > second_version) { + continue; + } + // backtracking _find_path_from_stale_map find from map_iter->first + 1 to second_version + stale_path->push_back(map_iter->second); + bool r = _find_path_from_stale_map(stale_map, map_iter->first + 1, second_version, stale_path); + if (r) { + return true; + } + // there is no path in current version, pop and continue + stale_path->pop_back(); + map_iter++; + } + + return false; +} void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) { diff --git a/be/src/olap/version_graph.h b/be/src/olap/version_graph.h index b8cac9ffd162e5..e051fbc4c5fcf3 100644 --- a/be/src/olap/version_graph.h +++ b/be/src/olap/version_graph.h @@ -136,9 +136,13 @@ using PathVersionListSharedPtr = std::shared_ptr& rs_metas); + /// Construct rowsets version tracker by main path rowset meta and stale rowset meta. + void construct_versioned_tracker(const std::vector& rs_metas, + const std::vector& stale_metas); + /// Recover rowsets version tracker from stale version path map. When delete operation fails, the /// tracker can be recovered from deleted stale_version_path_map. void recover_versioned_tracker(const std::map& stale_version_path_map); @@ -180,9 +184,19 @@ class TimestampedVersionTracker { void get_stale_version_path_json_doc(rapidjson::Document& path_arr); private: - /// Construct rowsets version tracker with stale rowsets. + /// Construct rowsets version tracker with main path rowset meta. void _construct_versioned_tracker(const std::vector& rs_metas); + /// init stale_version_path_map by main path rowset meta and stale rowset meta. + void _init_stale_version_path_map(const std::vector& rs_metas, + const std::vector& stale_metas); + + /// find a path in stale_map from first_version to second_version, stale_path is used as result. + bool _find_path_from_stale_map( + const std::unordered_map>& stale_map, + int64_t first_version, int64_t second_version, + std::vector* stale_path); + private: // This variable records the id of path version which will be dispatched to next path version, // it is not persisted. diff --git a/be/test/olap/timestamped_version_tracker_test.cpp b/be/test/olap/timestamped_version_tracker_test.cpp index 9b4ce966483b3b..3fb3f9ed39c4f4 100644 --- a/be/test/olap/timestamped_version_tracker_test.cpp +++ b/be/test/olap/timestamped_version_tracker_test.cpp @@ -480,6 +480,23 @@ TEST_F(TestTimestampedVersionTracker, construct_versioned_tracker) { ASSERT_EQ(1, tracker._next_path_id); } +TEST_F(TestTimestampedVersionTracker, construct_version_tracker_by_stale_meta) { + + std::vector rs_metas; + std::vector expried_rs_metas; + std::vector version_path; + + init_all_rs_meta(&rs_metas); + init_expried_row_rs_meta(&expried_rs_metas); + + TimestampedVersionTracker tracker; + tracker.construct_versioned_tracker(rs_metas, expried_rs_metas); + + ASSERT_EQ(10, tracker._version_graph._version_graph.size()); + ASSERT_EQ(4, tracker._stale_version_path_map.size()); + ASSERT_EQ(5, tracker._next_path_id); +} + TEST_F(TestTimestampedVersionTracker, construct_versioned_tracker_with_same_rowset) { std::vector rs_metas; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index d4f9cf758523de..72805b8a04671a 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -308,6 +308,7 @@ message TabletMetaPB { optional int64 end_rowset_id = 15; optional RowsetTypePB preferred_rowset_type = 16; optional TabletTypePB tablet_type = 17; + repeated RowsetMetaPB stale_rs_metas = 18; } message OLAPIndexHeaderMessage {