Skip to content

Commit

Permalink
[fix](merge-on-write) fix be core and delete unused pending publish i…
Browse files Browse the repository at this point in the history
…nfo for async publsih when tablet dropped
  • Loading branch information
liaoxin01 committed Jul 13, 2023
1 parent 8a42ba5 commit 66c0ce0
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 1 deletion.
6 changes: 5 additions & 1 deletion be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,11 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_
bool is_recovery) {
if (!is_recovery) {
TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
LOG(INFO) << "tablet may be dropped when add async publish task, tablet_id: "
<< tablet_id;
return;
}
PendingPublishInfoPB pending_publish_info_pb;
pending_publish_info_pb.set_partition_id(partition_id);
pending_publish_info_pb.set_transaction_id(transaction_id);
Expand Down Expand Up @@ -1259,7 +1264,6 @@ void StorageEngine::_async_publish_callback() {
if (!tablet) {
LOG(WARNING) << "tablet does not exist when async publush, tablet_id: "
<< tablet_id;
// TODO(liaoxin) remove pending publish info from db
tablet_iter = _async_publish_tasks.erase(tablet_iter);
continue;
}
Expand Down
27 changes: 27 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,9 @@ Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
// cleand unused delete bitmap for deleted tablet
_clean_unused_delete_bitmap();

// cleand unused pending publish info for deleted tablet
_clean_unused_pending_publish_info();

// clean unused rowsets in remote storage backends
for (auto data_dir : get_stores()) {
data_dir->perform_remote_rowset_gc();
Expand Down Expand Up @@ -795,6 +798,30 @@ void StorageEngine::_clean_unused_delete_bitmap() {
}
}

void StorageEngine::_clean_unused_pending_publish_info() {
std::vector<std::pair<int64_t, int64_t>> removed_infos;
auto clean_pending_publish_info_func = [this, &removed_infos](int64_t tablet_id,
int64_t publish_version,
const string& info) -> bool {
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
if (tablet == nullptr) {
removed_infos.emplace_back(tablet_id, publish_version);
}
return true;
};
auto data_dirs = get_stores();
for (auto data_dir : data_dirs) {
TabletMetaManager::traverse_delete_bitmap(data_dir->get_meta(),
clean_pending_publish_info_func);
for (auto& [tablet_id, publish_version] : removed_infos) {
TabletMetaManager::remove_pending_publish_info(data_dir, tablet_id, publish_version);
}
LOG(INFO) << "removed invalid pending publish info from dir: " << data_dir->path()
<< ", deleted pending publish info size: " << removed_infos.size();
removed_infos.clear();
}
}

void StorageEngine::gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos) {
for (auto [tablet_id, version] : gc_tablet_infos) {
LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {}, version: {}", tablet_id,
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ class StorageEngine {

void _clean_unused_delete_bitmap();

void _clean_unused_pending_publish_info();

Status _do_sweep(const std::string& scan_root, const time_t& local_tm_now,
const int32_t expire);

Expand Down

0 comments on commit 66c0ce0

Please sign in to comment.