Skip to content

Commit

Permalink
[feature](merge-cloud) compaction for mow table
Browse files Browse the repository at this point in the history
  • Loading branch information
liaoxin01 committed Mar 5, 2024
1 parent 207aba4 commit ad0bc1e
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 137 deletions.
13 changes: 3 additions & 10 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,19 +278,16 @@ Status CloudBaseCompaction::modify_rowsets() {
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());

/*
DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t missed_rows = _stats.merged_rows;
int64_t initiator =
boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(_tablet->cloud_calc_delete_bitmap_for_compaciton(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(), missed_rows,
initiator, output_rowset_delete_bitmap));
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaciton(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap));
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}
*/

cloud::FinishTabletJobResponse resp;
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
Expand Down Expand Up @@ -320,11 +317,9 @@ Status CloudBaseCompaction::modify_rowsets() {
// ATTN: MUST NOT update `cumu_compaction_cnt` or `cumu_point` which are used when sync rowsets, otherwise may cause
// the tablet to be unable to synchronize the rowset meta changes generated by cumu compaction.
cloud_tablet()->set_base_compaction_cnt(cloud_tablet()->base_compaction_cnt() + 1);
/*
if (output_rowset_delete_bitmap) {
_tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap);
}
*/
if (stats.cumulative_compaction_cnt() >= cloud_tablet()->cumulative_compaction_cnt()) {
cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
stats.num_rows(), stats.data_size());
Expand All @@ -347,14 +342,12 @@ void CloudBaseCompaction::garbage_collection() {
compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
std::to_string(config::heartbeat_service_port));
compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
/*
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator =
boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits<int64_t>::max();
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}
*/
auto st = _engine.meta_mgr().abort_tablet_job(job);
if (!st.ok()) {
LOG_WARNING("failed to abort compaction job")
Expand Down
13 changes: 3 additions & 10 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,19 +227,16 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());

/*
DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t missed_rows = merger_stats ? merger_stats->merged_rows : -1;
int64_t initiator =
boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(_tablet->cloud_calc_delete_bitmap_for_compaciton(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(), missed_rows,
initiator, output_rowset_delete_bitmap));
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaciton(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap));
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}
*/

cloud::FinishTabletJobResponse resp;
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
Expand Down Expand Up @@ -285,11 +282,9 @@ Status CloudCumulativeCompaction::modify_rowsets() {
cloud_tablet()->set_cumulative_compaction_cnt(cloud_tablet()->cumulative_compaction_cnt() +
1);
cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
/*
if (output_rowset_delete_bitmap) {
_tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap);
}
*/
if (stats.base_compaction_cnt() >= cloud_tablet()->base_compaction_cnt()) {
cloud_tablet()->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
stats.num_rows(), stats.data_size());
Expand All @@ -311,14 +306,12 @@ void CloudCumulativeCompaction::garbage_collection() {
compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
std::to_string(config::heartbeat_service_port));
compaction_job->set_type(cloud::TabletCompactionJobPB::CUMULATIVE);
/*
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator =
boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits<int64_t>::max();
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}
*/
auto st = _engine.meta_mgr().abort_tablet_job(job);
if (!st.ok()) {
LOG_WARNING("failed to abort compaction job")
Expand Down
55 changes: 55 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,4 +543,59 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
return Status::OK();
}

Status CloudTablet::calc_delete_bitmap_for_compaciton(
const std::vector<RowsetSharedPtr>& input_rowsets, const RowsetSharedPtr& output_rowset,
const RowIdConversion& rowid_conversion, ReaderType compaction_type, int64_t merged_rows,
int64_t initiator, DeleteBitmapPtr& output_rowset_delete_bitmap) {
output_rowset_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
std::set<RowLocation> missed_rows;
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map;

// 1. calc delete bitmap for historical data
RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
Version version = max_version();
calc_compaction_output_rowset_delete_bitmap(
input_rowsets, rowid_conversion, 0, version.second + 1, &missed_rows, &location_map,
tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
std::size_t missed_rows_size = missed_rows.size();
if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
if (merged_rows >= 0 && merged_rows != missed_rows_size) {
std::string err_msg = fmt::format(
"cumulative compaction: the merged rows({}) is not equal to missed "
"rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
merged_rows, missed_rows_size, tablet_id(), table_id());
DCHECK(false) << err_msg;
LOG(WARNING) << err_msg;
}
}
if (config::enable_rowid_conversion_correctness_check) {
RETURN_IF_ERROR(check_rowid_conversion(output_rowset, location_map));
}
location_map.clear();

// 2. calc delete bimap for incremental data
RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
*this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));

calc_compaction_output_rowset_delete_bitmap(
input_rowsets, rowid_conversion, version.second, UINT64_MAX, &missed_rows,
&location_map, tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
if (config::enable_rowid_conversion_correctness_check) {
RETURN_IF_ERROR(check_rowid_conversion(output_rowset, location_map));
}
if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
DCHECK_EQ(missed_rows.size(), missed_rows_size);
if (missed_rows.size() != missed_rows_size) {
LOG(WARNING) << "missed rows don't match, before: " << missed_rows_size
<< " after: " << missed_rows.size();
}
}

// 3. store delete bitmap
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, -1, initiator,
output_rowset_delete_bitmap.get()));
return Status::OK();
}

} // namespace doris
7 changes: 7 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ class CloudTablet final : public BaseTablet {
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet& cur_rowset_ids) override;

Status calc_delete_bitmap_for_compaciton(const std::vector<RowsetSharedPtr>& input_rowsets,
const RowsetSharedPtr& output_rowset,
const RowIdConversion& rowid_conversion,
ReaderType compaction_type, int64_t merged_rows,
int64_t initiator,
DeleteBitmapPtr& output_rowset_delete_bitmap);

int64_t last_sync_time_s = 0;
int64_t last_load_time_ms = 0;
int64_t last_base_compaction_success_time_ms = 0;
Expand Down
105 changes: 105 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "olap/delete_bitmap_calculator.h"
#include "olap/memtable.h"
#include "olap/primary_key_index.h"
#include "olap/rowid_conversion.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
Expand Down Expand Up @@ -1270,4 +1271,108 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, const Tablet
return Status::OK();
}

void BaseTablet::calc_compaction_output_rowset_delete_bitmap(
const std::vector<RowsetSharedPtr>& input_rowsets, const RowIdConversion& rowid_conversion,
uint64_t start_version, uint64_t end_version, std::set<RowLocation>* missed_rows,
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>* location_map,
const DeleteBitmap& input_delete_bitmap, DeleteBitmap* output_rowset_delete_bitmap) {
RowLocation src;
RowLocation dst;
for (auto& rowset : input_rowsets) {
src.rowset_id = rowset->rowset_id();
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
src.segment_id = seg_id;
DeleteBitmap subset_map(tablet_id());
input_delete_bitmap.subset({rowset->rowset_id(), seg_id, start_version},
{rowset->rowset_id(), seg_id, end_version}, &subset_map);
// traverse all versions and convert rowid
for (auto iter = subset_map.delete_bitmap.begin();
iter != subset_map.delete_bitmap.end(); ++iter) {
auto cur_version = std::get<2>(iter->first);
for (auto index = iter->second.begin(); index != iter->second.end(); ++index) {
src.row_id = *index;
if (rowid_conversion.get(src, &dst) != 0) {
VLOG_CRITICAL << "Can't find rowid, may be deleted by the delete_handler, "
<< " src loaction: |" << src.rowset_id << "|"
<< src.segment_id << "|" << src.row_id
<< " version: " << cur_version;
missed_rows->insert(src);
continue;
}
VLOG_DEBUG << "calc_compaction_output_rowset_delete_bitmap dst location: |"
<< dst.rowset_id << "|" << dst.segment_id << "|" << dst.row_id
<< " src location: |" << src.rowset_id << "|" << src.segment_id
<< "|" << src.row_id << " start version: " << start_version
<< "end version" << end_version;
(*location_map)[rowset].emplace_back(src, dst);
output_rowset_delete_bitmap->add({dst.rowset_id, dst.segment_id, cur_version},
dst.row_id);
}
}
}
}
}

Status BaseTablet::check_rowid_conversion(
RowsetSharedPtr dst_rowset,
const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>&
location_map) {
if (location_map.empty()) {
VLOG_DEBUG << "check_rowid_conversion, location_map is empty";
return Status::OK();
}
std::vector<segment_v2::SegmentSharedPtr> dst_segments;

RETURN_IF_ERROR(
std::dynamic_pointer_cast<BetaRowset>(dst_rowset)->load_segments(&dst_segments));
std::unordered_map<RowsetId, std::vector<segment_v2::SegmentSharedPtr>> input_rowsets_segment;

VLOG_DEBUG << "check_rowid_conversion, dst_segments size: " << dst_segments.size();
for (auto [src_rowset, locations] : location_map) {
std::vector<segment_v2::SegmentSharedPtr>& segments =
input_rowsets_segment[src_rowset->rowset_id()];
if (segments.empty()) {
RETURN_IF_ERROR(
std::dynamic_pointer_cast<BetaRowset>(src_rowset)->load_segments(&segments));
}
for (auto& [src, dst] : locations) {
std::string src_key;
std::string dst_key;
Status s = segments[src.segment_id]->read_key_by_rowid(src.row_id, &src_key);
if (UNLIKELY(s.is<NOT_IMPLEMENTED_ERROR>())) {
LOG(INFO) << "primary key index of old version does not "
"support reading key by rowid";
break;
}
if (UNLIKELY(!s)) {
LOG(WARNING) << "failed to get src key: |" << src.rowset_id << "|" << src.segment_id
<< "|" << src.row_id << " status: " << s;
DCHECK(false);
return s;
}

s = dst_segments[dst.segment_id]->read_key_by_rowid(dst.row_id, &dst_key);
if (UNLIKELY(!s)) {
LOG(WARNING) << "failed to get dst key: |" << dst.rowset_id << "|" << dst.segment_id
<< "|" << dst.row_id << " status: " << s;
DCHECK(false);
return s;
}

VLOG_DEBUG << "check_rowid_conversion, src: |" << src.rowset_id << "|" << src.segment_id
<< "|" << src.row_id << "|" << src_key << " dst: |" << dst.rowset_id << "|"
<< dst.segment_id << "|" << dst.row_id << "|" << dst_key;
if (UNLIKELY(src_key.compare(dst_key) != 0)) {
LOG(WARNING) << "failed to check key, src key: |" << src.rowset_id << "|"
<< src.segment_id << "|" << src.row_id << "|" << src_key
<< " dst key: |" << dst.rowset_id << "|" << dst.segment_id << "|"
<< dst.row_id << "|" << dst_key;
DCHECK(false);
return Status::InternalError("failed to check rowid conversion");
}
}
}
return Status::OK();
}

} // namespace doris
14 changes: 14 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct RowsetWriterContext;
class RowsetWriter;
class CalcDeleteBitmapToken;
class SegmentCacheHandle;
class RowIdConversion;

struct TabletWithVersion {
BaseTabletSPtr tablet;
Expand Down Expand Up @@ -215,6 +216,19 @@ class BaseTablet {
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet& cur_rowset_ids) = 0;
virtual CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() = 0;

void calc_compaction_output_rowset_delete_bitmap(
const std::vector<RowsetSharedPtr>& input_rowsets,
const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version,
std::set<RowLocation>* missed_rows,
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>* location_map,
const DeleteBitmap& input_delete_bitmap, DeleteBitmap* output_rowset_delete_bitmap);

Status check_rowid_conversion(
RowsetSharedPtr dst_rowset,
const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>&
location_map);

////////////////////////////////////////////////////////////////////////////
// end MoW functions
////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit ad0bc1e

Please sign in to comment.