Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2119,13 +2119,13 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
meta_delete_bitmap_key(key_info, &key);
delete_bitmap_keys.add_delete_bitmap_keys(key);
}
// no need to record pending key for compaction or schema change,
// no need to record pending key for compaction,
// because delete bitmap will attach to new rowset, just delete new rowset if failed
// lock_id > 0 : load
// lock_id = -1 : compaction
// lock_id = -2 : schema change
// lock_id = -3 : compaction update delete bitmap without lock
if (request->lock_id() > 0) {
if (request->lock_id() > 0 || request->lock_id() == -2) {
std::string pending_val;
if (!delete_bitmap_keys.SerializeToString(&pending_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
Expand Down
6 changes: 6 additions & 0 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "common/logging.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "keys.h"
#include "meta-service/keys.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/meta_service_tablet_stats.h"
Expand Down Expand Up @@ -1383,6 +1384,11 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
if (!success) {
return;
}

std::string pending_key = meta_pending_delete_bitmap_key({instance_id, new_tablet_id});
txn->remove(pending_key);
LOG(INFO) << "xxx sc remove delete bitmap pending key, pending_key=" << hex(pending_key)
<< " tablet_id=" << new_tablet_id << "job_id=" << schema_change.id();
}

for (size_t i = 0; i < schema_change.txn_ids().size(); ++i) {
Expand Down
59 changes: 59 additions & 0 deletions cloud/test/meta_service_job_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,31 @@ MetaServiceCode remove_delete_bitmap_lock(MetaServiceProxy* meta_service, int64_
return res.status().code();
}

MetaServiceCode update_delete_bitmap(MetaServiceProxy* meta_service, int64_t table_id,
int64_t partition_id, int64_t tablet_id, int64_t lock_id,
int64_t initor,
std::string cloud_unique_id = "test_cloud_unique_id") {
brpc::Controller cntl;
UpdateDeleteBitmapRequest update_delete_bitmap_req;
UpdateDeleteBitmapResponse update_delete_bitmap_res;
update_delete_bitmap_req.set_cloud_unique_id(cloud_unique_id);
update_delete_bitmap_req.set_table_id(table_id);
update_delete_bitmap_req.set_partition_id(partition_id);
update_delete_bitmap_req.set_lock_id(lock_id);
update_delete_bitmap_req.set_initiator(initor);
update_delete_bitmap_req.set_tablet_id(tablet_id);
for (int i = 0; i < 3; i++) {
update_delete_bitmap_req.add_rowset_ids("0200000003ea308a3647dbea83220ed4b8897f2288244a91");
update_delete_bitmap_req.add_segment_ids(0);
update_delete_bitmap_req.add_versions(i);
update_delete_bitmap_req.add_segment_delete_bitmaps("1");
}
meta_service->update_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl),
&update_delete_bitmap_req, &update_delete_bitmap_res,
nullptr);
return update_delete_bitmap_res.status().code();
}

void remove_delete_bitmap_lock(MetaServiceProxy* meta_service, int64_t table_id) {
std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
std::unique_ptr<Transaction> txn;
Expand Down Expand Up @@ -1814,12 +1839,46 @@ TEST(MetaServiceJobTest, SchemaChangeJobWithMoWTest) {

res_code = get_delete_bitmap_lock(meta_service.get(), table_id, -2, 12345);
ASSERT_EQ(res_code, MetaServiceCode::OK);

std::string pending_key = meta_pending_delete_bitmap_key({instance_id, new_tablet_id});
std::string pending_val;
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn->get(pending_key, &pending_val), TxnErrorCode::TXN_KEY_NOT_FOUND);

res_code = update_delete_bitmap(meta_service.get(), table_id, partition_id, new_tablet_id,
-2, 12345);
ASSERT_EQ(res_code, MetaServiceCode::OK);

// schema change job should write pending delete bitmap key
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn->get(pending_key, &pending_val), TxnErrorCode::TXN_OK);
PendingDeleteBitmapPB pending_info;
ASSERT_TRUE(pending_info.ParseFromString(pending_val));
ASSERT_EQ(pending_info.delete_bitmap_keys_size(), 3);
for (int i = 0; i < 3; ++i) {
std::string_view k1 = pending_info.delete_bitmap_keys(i);
k1.remove_prefix(1);
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
decode_key(&k1, &out);
// 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
ASSERT_EQ(std::get<std::int64_t>(std::get<0>(out[3])), new_tablet_id);
ASSERT_EQ(std::get<std::string>(std::get<0>(out[4])),
"0200000003ea308a3647dbea83220ed4b8897f2288244a91");
ASSERT_EQ(std::get<std::int64_t>(std::get<0>(out[5])), i);
ASSERT_EQ(std::get<std::int64_t>(std::get<0>(out[6])), 0);
}

finish_schema_change_job(meta_service.get(), tablet_id, new_tablet_id, "job1", "be1",
output_rowsets, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res_code = remove_delete_bitmap_lock(meta_service.get(), table_id, -2, 12345);
ASSERT_EQ(res_code, MetaServiceCode::LOCK_EXPIRED);
res.Clear();

// pending delete bitmap key on new tablet should be removed after schema change job finishes
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn->get(pending_key, &pending_val), TxnErrorCode::TXN_KEY_NOT_FOUND);
}

{
Expand Down
111 changes: 111 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5529,6 +5529,117 @@ TEST(MetaServiceTest, UpdateDeleteBitmapFailCase) {
ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps(0), data1);
}

TEST(MetaServiceTest, UpdateDeleteBitmapScOverrideExistingKey) {
auto meta_service = get_meta_service();
brpc::Controller cntl;
size_t split_size = 90 * 1000; // see cloud/src/common/util.h

extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr,
const std::string& cloud_unique_id);
auto instance_id = get_instance_id(meta_service->resource_mgr(), "test_cloud_unique_id");

{
// schema change should use pending delete bitmap to clear previous failed trials
int64_t db_id = 99999;
int64_t table_id = 1801;
int64_t index_id = 4801;
int64_t t1p1 = 2001;
int64_t tablet_id = 3001;
int64_t txn_id;
ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, t1p1, tablet_id));
begin_txn_and_commit_rowset(meta_service.get(), "label11", db_id, table_id, t1p1, tablet_id,
&txn_id);
int64_t lock_id = -2;
int64_t initiator = 1009;
int64_t version = 100;

get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator);

{
UpdateDeleteBitmapRequest update_delete_bitmap_req;
UpdateDeleteBitmapResponse update_delete_bitmap_res;
// will be splited and stored in 5 KVs
std::string data1(split_size * 5, 'c');
update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res, table_id, t1p1, lock_id, initiator,
tablet_id, txn_id, version, data1);
ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK);

GetDeleteBitmapRequest get_delete_bitmap_req;
GetDeleteBitmapResponse get_delete_bitmap_res;
get_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
get_delete_bitmap_req.set_tablet_id(tablet_id);
get_delete_bitmap_req.add_rowset_ids("123");
get_delete_bitmap_req.add_begin_versions(0);
get_delete_bitmap_req.add_end_versions(version);
meta_service->get_delete_bitmap(
reinterpret_cast<google::protobuf::RpcController*>(&cntl),
&get_delete_bitmap_req, &get_delete_bitmap_res, nullptr);
ASSERT_EQ(get_delete_bitmap_res.status().code(), MetaServiceCode::OK);
ASSERT_EQ(get_delete_bitmap_res.rowset_ids_size(), 1);
ASSERT_EQ(get_delete_bitmap_res.versions_size(), 1);
ASSERT_EQ(get_delete_bitmap_res.segment_ids_size(), 1);
ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps_size(), 1);
ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps(0), data1);
}

{
std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id});
std::string pending_val;
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn->get(pending_key, &pending_val), TxnErrorCode::TXN_OK);
PendingDeleteBitmapPB pending_info;
ASSERT_TRUE(pending_info.ParseFromString(pending_val));
ASSERT_EQ(pending_info.delete_bitmap_keys_size(), 1);

std::string_view k1 = pending_info.delete_bitmap_keys(0);
k1.remove_prefix(1);
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
decode_key(&k1, &out);
// 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
auto encoded_tablet_id = std::get<std::int64_t>(std::get<0>(out[3]));
ASSERT_EQ(encoded_tablet_id, tablet_id);
auto encoded_rowset_id = std::get<std::string>(std::get<0>(out[4]));
ASSERT_EQ(encoded_rowset_id, "123");
auto encoded_version = std::get<std::int64_t>(std::get<0>(out[5]));
ASSERT_EQ(encoded_version, version);
auto encoded_segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
ASSERT_EQ(encoded_segment_id, 0);
}

{
UpdateDeleteBitmapRequest update_delete_bitmap_req;
UpdateDeleteBitmapResponse update_delete_bitmap_res;
// will be splited and stored in 3 KVs
// if we don't remove previous splited KVs, will crash when reading
std::string data2(split_size * 3, 'a');
update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res, table_id, t1p1, lock_id, initiator,
tablet_id, txn_id, version, data2);
ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK);

GetDeleteBitmapRequest get_delete_bitmap_req;
GetDeleteBitmapResponse get_delete_bitmap_res;
get_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
get_delete_bitmap_req.set_tablet_id(tablet_id);
get_delete_bitmap_req.add_rowset_ids("123");
get_delete_bitmap_req.add_begin_versions(0);
get_delete_bitmap_req.add_end_versions(version);
meta_service->get_delete_bitmap(
reinterpret_cast<google::protobuf::RpcController*>(&cntl),
&get_delete_bitmap_req, &get_delete_bitmap_res, nullptr);
ASSERT_EQ(get_delete_bitmap_res.status().code(), MetaServiceCode::OK);
ASSERT_EQ(get_delete_bitmap_res.rowset_ids_size(), 1);
ASSERT_EQ(get_delete_bitmap_res.versions_size(), 1);
ASSERT_EQ(get_delete_bitmap_res.segment_ids_size(), 1);
ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps_size(), 1);
ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps(0), data2);
}
}
}

TEST(MetaServiceTest, UpdateDeleteBitmap) {
auto meta_service = get_meta_service();
remove_delete_bitmap_lock(meta_service.get(), 112);
Expand Down
Loading