Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,15 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
// if version or compaction stats can't match, it means that this is a retry and there are
// compaction or other loads finished successfully on the same tablet. So the previous publish
// is stale and we should re-calculate the delete bitmap

// we still need to update delete bitmap KVs to MS when we skip to calcalate delete bitmaps,
// because the pending delete bitmap KVs in MS we wrote before may have been removed and replaced by other txns
int64_t lock_id = txn_info.is_txn_load ? txn_info.lock_id : -1;
RETURN_IF_ERROR(
tablet->save_delete_bitmap_to_ms(version, transaction_id, delete_bitmap, lock_id));

LOG(INFO) << "tablet=" << _tablet_id << ", " << txn_str
<< ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap.";
<< ", publish_status=SUCCEED, not need to re-calculate delete_bitmaps.";
} else {
if (invisible_rowsets == nullptr) {
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, transaction_id,
Expand Down
38 changes: 29 additions & 9 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,35 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
}

RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, lock_id));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
auto sleep_sec = dp->param<int>("sleep", 5);
std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
});

DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
auto retry = dp->param<bool>("retry", false);
if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
"injected DELETE_BITMAP_LOCK_ERROR");
} else {
return Status::InternalError<false>("injected non-retryable error");
}
});

return Status::OK();
}

Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, int64_t lock_id) {
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
for (auto iter = delete_bitmap->delete_bitmap.begin();
iter != delete_bitmap->delete_bitmap.end(); ++iter) {
Expand All @@ -734,18 +763,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
iter->second);
}
}

auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ class CloudTablet final : public BaseTablet {
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id = -1) override;

Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, int64_t lock_id);

Status calc_delete_bitmap_for_compaction(const std::vector<RowsetSharedPtr>& input_rowsets,
const RowsetSharedPtr& output_rowset,
const RowIdConversion& rowid_conversion,
Expand Down
1 change: 1 addition & 0 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,7 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont

// 3. store all pending delete bitmap for this txn
PendingDeleteBitmapPB delete_bitmap_keys;
delete_bitmap_keys.set_lock_id(request->lock_id());
for (size_t i = 0; i < request->rowset_ids_size(); ++i) {
MetaDeleteBitmapInfo key_info {instance_id, tablet_id, request->rowset_ids(i),
request->versions(i), request->segment_ids(i)};
Expand Down
31 changes: 31 additions & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,39 @@ void process_mow_when_commit_txn(
LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << hex(lock_keys[i])
<< " txn_id=" << txn_id;

int64_t lock_id = lock_info.lock_id();
for (auto tablet_id : table_id_tablet_ids[table_id]) {
std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id});

// check that if the pending info's lock_id is correct
std::string pending_val;
err = txn->get(pending_key, &pending_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
ss << "failed to get delete bitmap pending info, instance_id=" << instance_id
<< " tablet_id=" << tablet_id << " key=" << hex(pending_key) << " err=" << err;
msg = ss.str();
code = cast_as<ErrCategory::READ>(err);
return;
}

if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) continue;

PendingDeleteBitmapPB pending_info;
if (!pending_info.ParseFromString(pending_val)) [[unlikely]] {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "failed to parse PendingDeleteBitmapPB";
return;
}

if (pending_info.has_lock_id() && pending_info.lock_id() != lock_id) {
code = MetaServiceCode::PENDING_DELETE_BITMAP_WRONG;
msg = fmt::format(
"wrong lock_id in pending delete bitmap infos, expect lock_id={}, but "
"found {} tablet_id={} instance_id={}",
lock_id, pending_info.lock_id(), tablet_id, instance_id);
return;
}

txn->remove(pending_key);
LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" << hex(pending_key)
<< " txn_id=" << txn_id;
Expand Down
139 changes: 138 additions & 1 deletion cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5351,7 +5351,7 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) {

// case: first version of rowset
{
int64_t txn_id = -1;
int64_t txn_id = 98765;
int64_t table_id = 123456; // same as table_id of tmp rowset
int64_t db_id = 222;
int64_t tablet_id_base = 8113;
Expand Down Expand Up @@ -5430,11 +5430,16 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) {
std::string lock_val;
auto ret = txn->get(lock_key, &lock_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
DeleteBitmapUpdateLockPB lock_info;
ASSERT_TRUE(lock_info.ParseFromString(lock_val));

std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base});
std::string pending_val;
ret = txn->get(pending_key, &pending_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
PendingDeleteBitmapPB pending_info;
ASSERT_TRUE(pending_info.ParseFromString(pending_val));
ASSERT_EQ(pending_info.lock_id(), lock_info.lock_id());
}

// commit txn
Expand Down Expand Up @@ -5468,6 +5473,138 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) {
}
}

TEST(MetaServiceTest, WrongPendingBitmapTest) {
auto meta_service = get_meta_service();
extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr,
const std::string& cloud_unique_id);
auto instance_id = get_instance_id(meta_service->resource_mgr(), "test_cloud_unique_id");

// case: first version of rowset
{
int64_t txn_id = 56789;
int64_t table_id = 123456; // same as table_id of tmp rowset
int64_t db_id = 222;
int64_t tablet_id_base = 8113;
int64_t partition_id = 1234;
// begin txn
{
brpc::Controller cntl;
BeginTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
TxnInfoPB txn_info_pb;
txn_info_pb.set_db_id(db_id);
txn_info_pb.set_label("test_label");
txn_info_pb.add_table_ids(table_id);
txn_info_pb.set_timeout_ms(36000);
req.mutable_txn_info()->CopyFrom(txn_info_pb);
BeginTxnResponse res;
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
txn_id = res.txn_id();
}

// mock rowset and tablet
for (int i = 0; i < 5; ++i) {
create_tablet(meta_service.get(), table_id, 1235, partition_id, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
tmp_rowset.set_partition_id(partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

// update delete bitmap
{
// get delete bitmap update lock
brpc::Controller cntl;
GetDeleteBitmapUpdateLockRequest get_lock_req;
GetDeleteBitmapUpdateLockResponse get_lock_res;
get_lock_req.set_cloud_unique_id("test_cloud_unique_id");
get_lock_req.set_table_id(table_id);
get_lock_req.add_partition_ids(partition_id);
get_lock_req.set_expiration(5);
get_lock_req.set_lock_id(txn_id);
get_lock_req.set_initiator(-1);
meta_service->get_delete_bitmap_update_lock(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &get_lock_req,
&get_lock_res, nullptr);
ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK);

// first update delete bitmap
UpdateDeleteBitmapRequest update_delete_bitmap_req;
UpdateDeleteBitmapResponse update_delete_bitmap_res;
update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
update_delete_bitmap_req.set_table_id(table_id);
update_delete_bitmap_req.set_partition_id(partition_id);
update_delete_bitmap_req.set_lock_id(txn_id);
update_delete_bitmap_req.set_initiator(-1);
update_delete_bitmap_req.set_tablet_id(tablet_id_base);

update_delete_bitmap_req.add_rowset_ids("123");
update_delete_bitmap_req.add_segment_ids(1);
update_delete_bitmap_req.add_versions(2);
update_delete_bitmap_req.add_segment_delete_bitmaps("abc0");

meta_service->update_delete_bitmap(
reinterpret_cast<google::protobuf::RpcController*>(&cntl),
&update_delete_bitmap_req, &update_delete_bitmap_res, nullptr);
ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK);
}

// check delete bitmap update lock and pending delete bitmap
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
std::string lock_val;
auto ret = txn->get(lock_key, &lock_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
DeleteBitmapUpdateLockPB lock_info;
ASSERT_TRUE(lock_info.ParseFromString(lock_val));

std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base});
std::string pending_val;
ret = txn->get(pending_key, &pending_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
PendingDeleteBitmapPB pending_info;
ASSERT_TRUE(pending_info.ParseFromString(pending_val));
ASSERT_EQ(pending_info.lock_id(), lock_info.lock_id());
}

{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
// pending bitmap have been modified by other txn
std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base});
std::string pending_val;
auto ret = txn->get(pending_key, &pending_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
PendingDeleteBitmapPB pending_info;
ASSERT_TRUE(pending_info.ParseFromString(pending_val));
// change pending bitmap's lock_id
pending_info.set_lock_id(pending_info.lock_id() + 1);
ASSERT_TRUE(pending_info.SerializeToString(&pending_val));
txn->put(pending_key, pending_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}

// commit txn
{
brpc::Controller cntl;
CommitTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_db_id(db_id);
req.set_txn_id(txn_id);
req.add_mow_table_ids(table_id);
CommitTxnResponse res;
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::PENDING_DELETE_BITMAP_WRONG);
}
}
}

TEST(MetaServiceTest, GetDeleteBitmapWithRetryTest1) {
auto meta_service = get_meta_service();
SyncPoint::get_instance()->enable_processing();
Expand Down
2 changes: 2 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,7 @@ enum MetaServiceCode {
LOCK_EXPIRED = 8001;
LOCK_CONFLICT = 8002;
ROWSETS_EXPIRED = 8003;
PENDING_DELETE_BITMAP_WRONG = 8004;

// partial update
ROWSET_META_NOT_FOUND = 9001;
Expand Down Expand Up @@ -1446,6 +1447,7 @@ message RemoveDeleteBitmapResponse {

message PendingDeleteBitmapPB {
repeated bytes delete_bitmap_keys = 1;
optional int64 lock_id = 2;
}

message DeleteBitmapUpdateLockPB {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1
2 2 2
3 3 3

-- !sql --
1 999 999
2 2 2
3 3 3

Loading
Loading