Skip to content

Commit

Permalink
[enhancement](merge-on-write) split delete bitmap from tablet meta
Browse files Browse the repository at this point in the history
  • Loading branch information
liaoxin01 committed Jul 4, 2023
1 parent 11e18f4 commit 3302bd9
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 3 deletions.
19 changes: 19 additions & 0 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <filesystem>
#include <memory>
#include <new>
#include <roaring/roaring.hh>
#include <set>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -539,6 +540,24 @@ Status DataDir::load() {
}
}

auto load_delete_bitmap_func = [this](int64_t tablet_id, RowsetId rowset_id, int64_t segment_id,
int64_t version, const string& val) {
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
if (!tablet) {
return true;
}
const std::vector<RowsetMetaSharedPtr>& all_rowsets = tablet->tablet_meta()->all_rs_metas();
for (auto& rowset_meta : all_rowsets) {
// only process the rowset in _rs_metas
if (rowset_meta->rowset_id() == rowset_id) {
tablet->tablet_meta()->delete_bitmap().delete_bitmap[{
rowset_id, segment_id, version}] |= roaring::Roaring::read(val.data());
}
}
return true;
};
TabletMetaManager::traverse_delete_bitmap(_meta, load_delete_bitmap_func);

// At startup, we only count these invalid rowset, but do not actually delete it.
// The actual delete operation is in StorageEngine::_clean_unused_rowset_metas,
// which is cleaned up uniformly by the background cleanup thread.
Expand Down
23 changes: 23 additions & 0 deletions be/src/olap/olap_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,29 @@ Status OlapMeta::put(const int column_family_index, const std::vector<BatchEntry
return Status::OK();
}

Status OlapMeta::put(rocksdb::WriteBatch* batch) {
DorisMetrics::instance()->meta_write_request_total->increment(1);

rocksdb::Status s;
{
int64_t duration_ns = 0;
Defer defer([&] {
DorisMetrics::instance()->meta_write_request_duration_us->increment(duration_ns / 1000);
});
SCOPED_RAW_TIMER(&duration_ns);

WriteOptions write_options;
write_options.sync = config::sync_tablet_meta;
s = _db->Write(write_options, batch);
}

if (!s.ok()) {
LOG(WARNING) << "rocks db put batch failed, reason:" << s.ToString();
return Status::Error<META_PUT_ERROR>();
}
return Status::OK();
}

Status OlapMeta::remove(const int column_family_index, const std::string& key) {
DorisMetrics::instance()->meta_write_request_total->increment(1);
auto& handle = _handles[column_family_index];
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/olap_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
namespace rocksdb {
class ColumnFamilyHandle;
class DB;
class WriteBatch;
} // namespace rocksdb

namespace doris {
Expand All @@ -41,7 +42,6 @@ class OlapMeta final {
: key(key_arg), value(value_arg) {}
};

public:
OlapMeta(const std::string& root_path);
~OlapMeta();

Expand All @@ -53,6 +53,7 @@ class OlapMeta final {

Status put(const int column_family_index, const std::string& key, const std::string& value);
Status put(const int column_family_index, const std::vector<BatchEntry>& entries);
Status put(rocksdb::WriteBatch* batch);

Status remove(const int column_family_index, const std::string& key);
Status remove(const int column_family_index, const std::vector<std::string>& keys);
Expand All @@ -62,6 +63,10 @@ class OlapMeta final {

std::string get_root_path() const { return _root_path; }

rocksdb::ColumnFamilyHandle* get_handle(const int column_family_index) {
return _handles[column_family_index].get();
}

private:
std::string _root_path;
// keep order of _db && _handles, we need destroy _handles before _db
Expand Down
31 changes: 31 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <random>
#include <set>
#include <thread>
#include <unordered_set>
#include <utility>

#include "agent/task_worker_pool.h"
Expand All @@ -64,6 +65,7 @@
#include "olap/single_replica_compaction.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
#include "olap/task/engine_task.h"
#include "olap/txn_manager.h"
#include "runtime/memory/mem_tracker.h"
Expand Down Expand Up @@ -696,6 +698,9 @@ Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
// clean unused rowset metas in OlapMeta
_clean_unused_rowset_metas();

// cleand unused delete bitmap for deleted tablet
_clean_unused_delete_bitmap();

// clean unused rowsets in remote storage backends
for (auto data_dir : get_stores()) {
data_dir->perform_remote_rowset_gc();
Expand Down Expand Up @@ -771,6 +776,32 @@ void StorageEngine::_clean_unused_rowset_metas() {
}
}

void StorageEngine::_clean_unused_delete_bitmap() {
std::unordered_set<int64_t> removed_tablets;
auto clean_delete_bitmap_func = [this, &removed_tablets](int64_t tablet_id, RowsetId rowset_id,
int64_t segment_id, int64_t version,
const std::string& val) -> bool {
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
if (tablet == nullptr) {
if (removed_tablets.insert(tablet_id).second) {
LOG(INFO) << "clean ununsed delete bitmap for deleted tablet, tablet_id: "
<< tablet_id;
}
}
return true;
};
auto data_dirs = get_stores();
for (auto data_dir : data_dirs) {
TabletMetaManager::traverse_delete_bitmap(data_dir->get_meta(), clean_delete_bitmap_func);
for (auto id : removed_tablets) {
TabletMetaManager::remove_delete_bitmap_by_tablet_id(data_dir, id);
}
LOG(INFO) << "removed invalid delete bitmap from dir: " << data_dir->path()
<< ", deleted tablets size: " << removed_tablets.size();
removed_tablets.clear();
}
}

void StorageEngine::gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos) {
for (auto [tablet_id, version] : gc_tablet_infos) {
LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {}, version: {}", tablet_id,
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class StorageEngine {

void _clean_unused_rowset_metas();

void _clean_unused_delete_bitmap();

Status _do_sweep(const std::string& scan_root, const time_t& local_tm_now,
const int32_t expire);

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
#include "olap/storage_policy.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "olap/types.h"
Expand Down Expand Up @@ -1514,6 +1515,9 @@ bool Tablet::do_tablet_meta_checkpoint() {
rs_meta->set_remove_from_rowset_meta();
}

TabletMetaManager::remove_old_version_delete_bitmap(_data_dir, tablet_id(),
max_version_unlocked().second);

_newly_created_rowset_num = 0;
_last_checkpoint_time = UnixMillis();
return true;
Expand Down
91 changes: 91 additions & 0 deletions be/src/olap/tablet_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <fmt/format.h>
#include <gen_cpp/olap_file.pb.h>
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>

#include <boost/algorithm/string/trim.hpp>
#include <fstream>
Expand All @@ -28,6 +29,7 @@
#include <vector>

#include "common/logging.h"
#include "gutil/endian.h"
#include "json2pb/json_to_pb.h"
#include "json2pb/pb_to_json.h"
#include "olap/data_dir.h"
Expand Down Expand Up @@ -214,4 +216,93 @@ Status TabletMetaManager::traverse_pending_publish(
return status;
}

std::string TabletMetaManager::encode_delete_bitmap_key(TTabletId tablet_id, int64_t version,
const RowsetId& rowset_id,
int64_t segment_id) {
std::string key;
key.reserve(56);
key.append(DELETE_BITMAP);
put_fixed64_le(&key, BigEndian::FromHost64(tablet_id));
put_fixed64_le(&key, BigEndian::FromHost64(version));
put_fixed32_le(&key, BigEndian::FromHost32(rowset_id.version));
put_fixed64_le(&key, BigEndian::FromHost64(rowset_id.hi));
put_fixed64_le(&key, BigEndian::FromHost64(rowset_id.mi));
put_fixed64_le(&key, BigEndian::FromHost64(rowset_id.lo));
put_fixed64_le(&key, BigEndian::FromHost64(segment_id));
return key;
}

void TabletMetaManager::decode_delete_bitmap_key(const string& enc_key, TTabletId* tablet_id,
int64_t* version, RowsetId* rowset_id,
int64_t* segment_id) {
DCHECK_EQ(enc_key.size(), 56);
*tablet_id = BigEndian::ToHost64(UNALIGNED_LOAD64(enc_key.data() + 4));
*version = BigEndian::ToHost64(UNALIGNED_LOAD64(enc_key.data() + 12));
rowset_id->version = BigEndian::ToHost32(UNALIGNED_LOAD32(enc_key.data() + 20));
rowset_id->hi = BigEndian::ToHost64(UNALIGNED_LOAD64(enc_key.data() + 24));
rowset_id->mi = BigEndian::ToHost64(UNALIGNED_LOAD64(enc_key.data() + 32));
rowset_id->lo = BigEndian::ToHost64(UNALIGNED_LOAD64(enc_key.data() + 40));
*segment_id = BigEndian::ToHost64(UNALIGNED_LOAD64(enc_key.data() + 48));
}

Status TabletMetaManager::save_delete_bitmap(DataDir* store, TTabletId tablet_id,
DeleteBitmapPtr delete_bimap, int64_t version) {
if (delete_bimap->delete_bitmap.empty()) {
return Status::OK();
}
OlapMeta* meta = store->get_meta();
rocksdb::WriteBatch batch;
rocksdb::ColumnFamilyHandle* cf = meta->get_handle(META_COLUMN_FAMILY_INDEX);
for (auto& [id, bitmap] : delete_bimap->delete_bitmap) {
auto& rowset_id = std::get<0>(id);
int64_t segment_id = std::get<1>(id);
std::string key = encode_delete_bitmap_key(tablet_id, version, rowset_id, segment_id);
std::string value(bitmap.getSizeInBytes(), '\0');
bitmap.write(value.data());
batch.Put(cf, key, value);
}
return meta->put(&batch);
}

Status TabletMetaManager::traverse_delete_bitmap(
OlapMeta* meta,
std::function<bool(int64_t, RowsetId, int64_t, int64_t, const std::string&)> const& func) {
auto traverse_header_func = [&func](const std::string& key, const std::string& value) -> bool {
TTabletId tablet_id;
int64_t version;
RowsetId rowset_id;
int64_t segment_id;
decode_delete_bitmap_key(key, &tablet_id, &version, &rowset_id, &segment_id);
VLOG_NOTICE << "traverse delete bitmap, key: |" << tablet_id << "|" << rowset_id << "|"
<< segment_id << "|" << version;
return func(tablet_id, rowset_id, segment_id, version, value);
};
Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, DELETE_BITMAP, traverse_header_func);
return status;
}

Status TabletMetaManager::remove_old_version_delete_bitmap(DataDir* store, TTabletId tablet_id,
int64_t version) {
OlapMeta* meta = store->get_meta();
rocksdb::WriteBatch batch;
rocksdb::ColumnFamilyHandle* cf = meta->get_handle(META_COLUMN_FAMILY_INDEX);
auto lower_key = encode_delete_bitmap_key(tablet_id, 0, RowsetId(), 0);
auto upper_key = encode_delete_bitmap_key(tablet_id, version + 1, RowsetId(), 0);
batch.DeleteRange(cf, lower_key, upper_key);
LOG(INFO) << "remove old version delete bitmap, tablet_id: " << tablet_id
<< " version: " << version;
return meta->put(&batch);
}

Status TabletMetaManager::remove_delete_bitmap_by_tablet_id(DataDir* store, TTabletId tablet_id) {
OlapMeta* meta = store->get_meta();
rocksdb::WriteBatch batch;
rocksdb::ColumnFamilyHandle* cf = meta->get_handle(META_COLUMN_FAMILY_INDEX);
auto lower_key = encode_delete_bitmap_key(tablet_id, 0, RowsetId(), 0);
auto upper_key = encode_delete_bitmap_key(tablet_id, INT64_MAX, RowsetId(), 0);
batch.DeleteRange(cf, lower_key, upper_key);
LOG(INFO) << "remove delete bitmap by tablet_id, tablet_id: " << tablet_id;
return meta->put(&batch);
}

} // namespace doris
20 changes: 20 additions & 0 deletions be/src/olap/tablet_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const std::string HEADER_PREFIX = "tabletmeta_";

const std::string PENDING_PUBLISH_INFO = "ppi_";

const std::string DELETE_BITMAP = "dlb_";

// Helper Class for managing tablet headers of one root path.
class TabletMetaManager {
public:
Expand Down Expand Up @@ -69,6 +71,24 @@ class TabletMetaManager {

static Status traverse_pending_publish(
OlapMeta* meta, std::function<bool(int64_t, int64_t, const std::string&)> const& func);

static Status save_delete_bitmap(DataDir* store, TTabletId tablet_id,
DeleteBitmapPtr delete_bimap, int64_t version);

static Status traverse_delete_bitmap(OlapMeta* meta,
std::function<bool(int64_t, RowsetId, int64_t, int64_t,
const std::string&)> const& func);

static std::string encode_delete_bitmap_key(TTabletId tablet_id, int64_t version,
const RowsetId& rowset_id, int64_t segment_id);

static void decode_delete_bitmap_key(const string& enc_key, TTabletId* tablet_id,
int64_t* version, RowsetId* rowset_id,
int64_t* segment_id);
static Status remove_old_version_delete_bitmap(DataDir* store, TTabletId tablet_id,
int64_t version);

static Status remove_delete_bitmap_by_tablet_id(DataDir* store, TTabletId tablet_id);
};

} // namespace doris
6 changes: 4 additions & 2 deletions be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
#include "olap/task/engine_publish_version_task.h"
#include "util/time.h"

Expand Down Expand Up @@ -391,8 +392,9 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
}
stats->partial_update_write_segment_us = MonotonicMicros() - t3;
int64_t t4 = MonotonicMicros();
std::shared_lock rlock(tablet->get_header_lock());
tablet->save_meta();
RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap(
tablet->data_dir(), tablet->tablet_id(), tablet_txn_info.delete_bitmap,
version.second));
stats->save_meta_time_us = MonotonicMicros() - t4;
}

Expand Down
Loading

0 comments on commit 3302bd9

Please sign in to comment.