Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
});
Status status;
if (_sub_txn_ids.empty()) {
// Check empty rowset for non-sub_txn case
if (_engine.txn_delete_bitmap_cache().is_empty_rowset(_transaction_id, _tablet_id)) {
LOG(INFO) << "tablet=" << _tablet_id << ", txn=" << _transaction_id
<< " is empty rowset, skip delete bitmap calculation";
return Status::OK();
}
status = _handle_rowset(tablet, _version);
} else {
std::stringstream ss;
Expand All @@ -237,9 +243,18 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
std::vector<RowsetSharedPtr> invisible_rowsets;
DeleteBitmapPtr tablet_delete_bitmap =
std::make_shared<DeleteBitmap>(tablet->tablet_meta()->delete_bitmap());
for (int i = 0; i < _sub_txn_ids.size(); ++i) {
size_t empty_rowset_count = 0;
for (size_t i = 0; i < _sub_txn_ids.size(); ++i) {
int64_t sub_txn_id = _sub_txn_ids[i];
int64_t version = _version + i;
// Check empty rowset for each sub_txn using sub_txn_id
if (_engine.txn_delete_bitmap_cache().is_empty_rowset(sub_txn_id, _tablet_id)) {
LOG(INFO) << "tablet=" << _tablet_id << ", sub_txn=" << sub_txn_id
<< ", version=" << version
<< " is empty rowset, skip delete bitmap calculation";
empty_rowset_count++;
continue;
}
LOG(INFO) << "start calc delete bitmap for txn_id=" << _transaction_id
<< ", sub_txn_id=" << sub_txn_id << ", table_id=" << tablet->table_id()
<< ", partition_id=" << tablet->partition_id() << ", tablet_id=" << _tablet_id
Expand All @@ -254,7 +269,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
<< ", cur_version=" << version << ", status=" << status;
return status;
}
DCHECK(invisible_rowsets.size() == i + 1);
DCHECK(invisible_rowsets.size() == i + 1 - empty_rowset_count);
}
}
DBUG_EXECUTE_IF("CloudCalcDbmTask.handle.return.block",
Expand Down
9 changes: 9 additions & 0 deletions be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ const RowsetMetaSharedPtr& CloudRowsetBuilder::rowset_meta() {

Status CloudRowsetBuilder::set_txn_related_delete_bitmap() {
if (_tablet->enable_unique_key_merge_on_write()) {
// For empty rowsets when skip_writing_empty_rowset_metadata=true,
// store only a lightweight marker instead of full rowset info.
// This allows CalcDeleteBitmapTask to detect and skip gracefully,
// while using minimal memory (~16 bytes per entry).
if (_skip_writing_rowset_metadata) {
_engine.txn_delete_bitmap_cache().mark_empty_rowset(_req.txn_id, _tablet->tablet_id(),
_req.txn_expiration);
return Status::OK();
}
if (config::enable_merge_on_write_correctness_check && _rowset->num_rows() != 0) {
auto st = _tablet->check_delete_bitmap_correctness(
_delete_bitmap, _rowset->end_version() - 1, _req.txn_id, *_rowset_ids);
Expand Down
39 changes: 38 additions & 1 deletion be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
while (!_expiration_txn.empty()) {
auto iter = _expiration_txn.begin();
if (_txn_map.find(iter->second) == _txn_map.end()) {
bool in_txn_map = _txn_map.find(iter->second) != _txn_map.end();
bool in_markers = _empty_rowset_markers.find(iter->second) != _empty_rowset_markers.end();
if (!in_txn_map && !in_markers) {
_expiration_txn.erase(iter);
continue;
}
Expand All @@ -239,6 +241,7 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
if (iter->first > current_time) {
break;
}
// Clean from _txn_map if exists
auto txn_iter = _txn_map.find(iter->second);
if ((txn_iter != _txn_map.end()) && (iter->first == txn_iter->second.txn_expiration)) {
LOG_INFO("clean expired delete bitmap")
Expand All @@ -251,6 +254,14 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
erase(cache_key);
_txn_map.erase(iter->second);
}
// Clean from _empty_rowset_markers if exists
auto marker_iter = _empty_rowset_markers.find(iter->second);
if (marker_iter != _empty_rowset_markers.end()) {
LOG_INFO("clean expired empty rowset marker")
.tag("txn_id", iter->second.txn_id)
.tag("tablet_id", iter->second.tablet_id);
_empty_rowset_markers.erase(marker_iter);
}
_expiration_txn.erase(iter);
}
}
Expand All @@ -272,6 +283,32 @@ void CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId tra
}
}

void CloudTxnDeleteBitmapCache::mark_empty_rowset(TTransactionId txn_id, int64_t tablet_id,
int64_t txn_expiration) {
int64_t txn_expiration_min =
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count() +
config::tablet_txn_info_min_expired_seconds;
txn_expiration = std::max(txn_expiration_min, txn_expiration);

if (config::enable_mow_verbose_log) {
LOG_INFO("mark empty rowset")
.tag("txn_id", txn_id)
.tag("tablet_id", tablet_id)
.tag("expiration", txn_expiration);
}
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(txn_id, tablet_id);
_empty_rowset_markers.emplace(txn_key);
_expiration_txn.emplace(txn_expiration, txn_key);
}

bool CloudTxnDeleteBitmapCache::is_empty_rowset(TTransactionId txn_id, int64_t tablet_id) {
std::shared_lock<std::shared_mutex> rlock(_rwlock);
TxnKey txn_key(txn_id, tablet_id);
return _empty_rowset_markers.contains(txn_key);
}

void CloudTxnDeleteBitmapCache::_clean_thread_callback() {
do {
remove_expired_tablet_txn_info();
Expand Down
13 changes: 13 additions & 0 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicy {

void remove_unused_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id);

// Mark a rowset as empty/skipped (lightweight marker, no rowset stored)
// Used for empty rowsets when skip_writing_empty_rowset_metadata is enabled
void mark_empty_rowset(TTransactionId txn_id, int64_t tablet_id, int64_t txn_expiration);

// Check if this is a known empty/skipped rowset
// Returns true if was marked as empty rowset
// Note: Does not remove the marker, as CalcDeleteBitmapTask may retry.
// Cleanup is handled by expiration-based removal in remove_expired_tablet_txn_info()
bool is_empty_rowset(TTransactionId txn_id, int64_t tablet_id);

// !!!ATTENTION!!!: the delete bitmap stored in CloudTxnDeleteBitmapCache contains sentinel marks,
// and the version in BitmapKey is DeleteBitmap::TEMP_VERSION_COMMON.
// when using delete bitmap from this cache, the caller should manually remove these marks if don't need it
Expand Down Expand Up @@ -107,6 +117,9 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicy {

std::map<TxnKey, TxnVal> _txn_map;
std::multimap<int64_t, TxnKey> _expiration_txn;
// Lightweight markers for empty/skipped rowsets (only stores TxnKey, ~16 bytes per entry)
// Used to track empty rowsets that were not committed to meta-service
std::set<TxnKey> _empty_rowset_markers;
std::shared_mutex _rwlock;
std::shared_ptr<Thread> _clean_thread;
CountDownLatch _stop_latch;
Expand Down
Loading