Skip to content

Commit

Permalink
Add UT for PGBlobIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
yuwmao committed Nov 19, 2024
1 parent 7b9819d commit 355c16b
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 71 deletions.
7 changes: 3 additions & 4 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ bool HSHomeObject::PGBlobIterator::update_cursor(objId id) {

objId HSHomeObject::PGBlobIterator::expected_next_obj_id() {
//next batch
if (last_end_blob_idx_ + cur_batch_blob_count_ < cur_blob_list_.size() - 1) {
if (last_end_blob_idx_ + static_cast<int64_t>(cur_batch_blob_count_) < static_cast<int64_t>(cur_blob_list_.size() - 1)) {
return objId(cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id + 1);
}
//next shard
if (cur_shard_idx_ < shard_list_.size() - 1) {
auto next_shard_seq_num = shard_list_[cur_shard_idx_++].id & 0xFFFFFFFFFFFF;
auto next_shard_seq_num = shard_list_[cur_shard_idx_+1].id & 0xFFFFFFFFFFFF;
return objId(next_shard_seq_num, 0);
}
return objId(LAST_OBJ_ID);
Expand Down Expand Up @@ -197,7 +197,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
auto idx = (uint64_t)(last_end_blob_idx_ + 1);

while (total_bytes < max_batch_size_ && idx < cur_blob_list_.size()) {
auto info = cur_blob_list_[idx];
auto info = cur_blob_list_[idx++];
ResyncBlobState state = ResyncBlobState::NORMAL;
//handle deleted object
if (info.pbas == tombstone_pbas) {
Expand Down Expand Up @@ -231,7 +231,6 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
blob_entries.push_back(CreateResyncBlobDataDirect(builder_, info.blob_id, (uint8_t)state, &data));
total_bytes += blob.size();
}
idx++;

if (idx == cur_blob_list_.size()) { end_of_shard = true; }
cur_batch_blob_count_ = blob_entries.size();
Expand Down
190 changes: 123 additions & 67 deletions src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,71 +139,127 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) {
}

TEST_F(HomeObjectFixture, PGBlobIterator) {
// uint64_t num_shards_per_pg = 3;
// uint64_t num_blobs_per_shard = 5;
// std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec;
// // pg -> next blob_id in this pg
// std::map< pg_id_t, blob_id_t > pg_blob_id;
//
// pg_id_t pg_id{1};
// create_pg(pg_id);
// for (uint64_t i = 0; i < num_shards_per_pg; i++) {
// auto shard = create_shard(1, 64 * Mi);
// pg_shard_id_vec[1].emplace_back(shard.id);
// pg_blob_id[i] = 0;
// LOGINFO("pg {} shard {}", pg_id, shard.id);
// }
//
// // Put blob for all shards in all pg's.
// put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id);
//
// PG* pg1;
// {
// auto lg = std::shared_lock(_obj_inst->_pg_lock);
// auto iter = _obj_inst->_pg_map.find(pg_id);
// ASSERT_TRUE(iter != _obj_inst->_pg_map.end());
// pg1 = iter->second.get();
// }
//
// auto pg1_iter =
// std::make_shared< homeobject::HSHomeObject::PGBlobIterator >(*_obj_inst, pg1->pg_info_.replica_set_uuid);
// ASSERT_EQ(pg1_iter->end_of_scan(), false);
//
// // Verify PG shard meta data.
// sisl::io_blob_safe meta_blob;
// pg1_iter->create_pg_shard_snapshot_data(meta_blob);
// ASSERT_TRUE(meta_blob.size() > 0);
//
// auto pg_req = GetSizePrefixedResyncPGShardInfo(meta_blob.bytes());
// ASSERT_EQ(pg_req->pg()->pg_id(), pg1->pg_info_.id);
// auto u1 = pg_req->pg()->replica_set_uuid();
// auto u2 = pg1->pg_info_.replica_set_uuid;
// ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end()));
//
// // Verify get blobs for pg.
// uint64_t max_num_blobs_in_batch = 3, max_batch_size_bytes = 128 * Mi;
// std::vector< HSHomeObject::BlobInfoData > blob_data_vec;
// while (!pg1_iter->end_of_scan()) {
// std::vector< HSHomeObject::BlobInfoData > vec;
// bool end_of_shard;
// auto result = pg1_iter->get_next_blobs(max_num_blobs_in_batch, max_batch_size_bytes, vec, end_of_shard);
// ASSERT_EQ(result, 0);
// for (auto& v : vec) {
// blob_data_vec.push_back(std::move(v));
// }
// }
//
// ASSERT_EQ(blob_data_vec.size(), num_shards_per_pg * num_blobs_per_shard);
// for (auto& b : blob_data_vec) {
// auto g = _obj_inst->blob_manager()->get(b.shard_id, b.blob_id, 0, 0).get();
// ASSERT_TRUE(!!g);
// auto result = std::move(g.value());
// LOGINFO("Get blob pg {} shard {} blob {} len {} data {}", 1, b.shard_id, b.blob_id, b.blob.body.size(),
// hex_bytes(result.body.cbytes(), 5));
// EXPECT_EQ(result.body.size(), b.blob.body.size());
// EXPECT_EQ(std::memcmp(result.body.bytes(), b.blob.body.cbytes(), result.body.size()), 0);
// EXPECT_EQ(result.user_key.size(), b.blob.user_key.size());
// EXPECT_EQ(result.user_key, b.blob.user_key);
// EXPECT_EQ(result.object_off, b.blob.object_off);
// }
// Generate test data
uint64_t num_shards_per_pg = 3;
uint64_t num_blobs_per_shard = 5;
std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec;
std::map< pg_id_t, blob_id_t > pg_blob_id;

pg_id_t pg_id{1};
create_pg(pg_id);
for (uint64_t i = 0; i < num_shards_per_pg; i++) {
auto shard = create_shard(1, 64 * Mi);
pg_shard_id_vec[1].emplace_back(shard.id);
pg_blob_id[i] = 0;
LOGINFO("pg {} shard {}", pg_id, shard.id);
}
put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id);

PG* pg;
{
auto lg = std::shared_lock(_obj_inst->_pg_lock);
auto iter = _obj_inst->_pg_map.find(pg_id);
ASSERT_TRUE(iter != _obj_inst->_pg_map.end());
pg = iter->second.get();
}

//Construct shards as [sealed, open, filtered]
seal_shard(pg->shards_.front()->info.id);
ASSERT_EQ(pg->shards_.front()->info.state, homeobject::ShardInfo::State::SEALED);
// Filter out the last shard
auto snp_lsn = pg->shards_.back()->info.lsn - 1;
auto pg_iter =
std::make_shared< HSHomeObject::PGBlobIterator >(*_obj_inst, pg->pg_info_.replica_set_uuid, snp_lsn);
ASSERT_EQ(pg_iter->shard_list_.size(), num_shards_per_pg - 1);

// Verify PG meta data.
sisl::io_blob_safe meta_blob;
pg_iter->create_pg_snapshot_data(meta_blob);
ASSERT_TRUE(meta_blob.size() > 0);

SyncMessageHeader* header = r_cast< SyncMessageHeader* >(meta_blob.bytes());
ASSERT_EQ(header->msg_type, SyncMessageType::PG_META);
auto pg_msg = GetSizePrefixedResyncPGMetaData(meta_blob.cbytes() + sizeof(SyncMessageHeader));
ASSERT_EQ(pg_msg->pg_id(), pg->pg_info_.id);
auto u1 = pg_msg->replica_set_uuid();
auto u2 = pg->pg_info_.replica_set_uuid;
ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end()));
ASSERT_EQ(pg_msg->blob_seq_num(), pg->durable_entities().blob_sequence_num.load());
ASSERT_EQ(pg_msg->shard_seq_num(), pg->shard_sequence_num_);

auto msg_members = pg_msg->members();
ASSERT_EQ(msg_members->size(), pg->pg_info_.members.size());
for (auto m : *msg_members) {
uuids::uuid id{};
std::copy_n(m->uuid()->data(), 16, id.begin());
auto it = pg->pg_info_.members.find(PGMember{id});
ASSERT_TRUE(it != pg->pg_info_.members.end());
ASSERT_EQ(m->name()->str(), it->name);
ASSERT_EQ(m->priority(), it->priority);
}

ASSERT_EQ(pg->shards_.size()-1, pg_msg->shard_ids()->size());
auto idx = 0;
for (auto& shard : pg->shards_) {
if (shard->info.lsn > snp_lsn) { continue; }
ASSERT_EQ(shard->info.id, pg_msg->shard_ids()->Get(idx++));
}

//Verify shard meta data
auto shard_list = pg_shard_id_vec[1];
blob_id_t current_blob_id{0};
for (auto& shard : pg->shards_) {
auto shard_seq_num = shard->info.id & 0xFFFFFFFFFFFF;
auto batch_id = 0;
objId oid(shard_seq_num, batch_id++);
if (shard->info.lsn > snp_lsn) {
ASSERT_FALSE(pg_iter->update_cursor(oid));
continue;
}
ASSERT_TRUE(pg_iter->update_cursor(oid));
ASSERT_TRUE(pg_iter->generate_shard_blob_list());
ASSERT_EQ(pg_iter->cur_blob_list_.size(), num_blobs_per_shard);
sisl::io_blob_safe meta_data;
ASSERT_TRUE(pg_iter->create_shard_snapshot_data(meta_data));

SyncMessageHeader* header = r_cast< SyncMessageHeader* >(meta_data.bytes());
ASSERT_EQ(header->msg_type, SyncMessageType::SHARD_META);
auto shard_msg = GetSizePrefixedResyncShardMetaData(meta_data.cbytes() + sizeof(SyncMessageHeader));
ASSERT_EQ(shard_msg->shard_id(), shard->info.id);
ASSERT_EQ(shard_msg->pg_id(), pg->pg_info_.id);
ASSERT_EQ(shard_msg->state(), static_cast<uint8_t>(shard->info.state));
ASSERT_EQ(shard_msg->created_lsn(), shard->info.lsn);
ASSERT_EQ(shard_msg->created_time(), shard->info.created_time);
ASSERT_EQ(shard_msg->last_modified_time(), shard->info.last_modified_time);
ASSERT_EQ(shard_msg->total_capacity_bytes(), shard->info.total_capacity_bytes);

//Verify blob data
ASSERT_TRUE(pg_iter->update_cursor(objId(shard_seq_num, batch_id++)));
sisl::io_blob_safe blob_batch;
ASSERT_TRUE(pg_iter->create_blobs_snapshot_data(blob_batch));
header = r_cast< SyncMessageHeader* >(blob_batch.bytes());
ASSERT_EQ(header->msg_type, SyncMessageType::SHARD_BATCH);
auto blob_msg = GetSizePrefixedResyncBlobDataBatch(blob_batch.cbytes() + sizeof(SyncMessageHeader));
ASSERT_EQ(blob_msg->blob_list()->size(), num_blobs_per_shard);
for (auto i = 0; i < static_cast< int >(num_blobs_per_shard); i++) {
auto b = blob_msg->blob_list()->Get(i);
ASSERT_EQ(b->blob_id(), current_blob_id++);
ASSERT_EQ(b->state(), static_cast<uint8_t>(ResyncBlobState::NORMAL));
auto blob_data = b->data()->Data();
auto header = r_cast< HSHomeObject::BlobHeader const* >(blob_data);
ASSERT_TRUE(header->valid());
auto g = _obj_inst->blob_manager()->get(shard->info.id, b->blob_id(), 0, 0).get();
ASSERT_TRUE(!!g);
auto result = std::move(g.value());
EXPECT_EQ(result.body.size(), header->blob_size);
ASSERT_TRUE(
memcmp(result.body.cbytes(), blob_data+header->data_offset, header->blob_size) == 0);
LOGDEBUG(
"Get blob pg {}, shard {}, blob {}, data_len {}, blob_len {}, header_len {}, user_key_len {}, data {}",
pg->pg_info_.id, shard->info.id, b->blob_id(),
b->data()->size(), header->blob_size, sizeof(HSHomeObject::BlobHeader), header->user_key_size,
hex_bytes(result.body.cbytes(), 5));
}
ASSERT_EQ(blob_msg->is_last_batch(), true);
}
}

0 comments on commit 355c16b

Please sign in to comment.