Skip to content

Commit

Permalink
Merge branch 'main' into baseline_resync
Browse files Browse the repository at this point in the history
  • Loading branch information
yuwmao committed Dec 23, 2024
2 parents f743e81 + b9d3fbf commit 013ba18
Show file tree
Hide file tree
Showing 14 changed files with 318 additions and 28 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.17"
version = "2.1.18"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
48 changes: 48 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,54 @@ bool HeapChunkSelector::release_chunk(const pg_id_t pg_id, const chunk_num_t v_c
return true;
}

bool HeapChunkSelector::reset_pg_chunks(pg_id_t pg_id) {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return false;
}
{
auto pg_chunk_collection = pg_it->second;
std::scoped_lock lock(pg_chunk_collection->mtx);
for (auto& chunk : pg_chunk_collection->m_pg_chunks) {
chunk->reset();
}
}
return true;
}

bool HeapChunkSelector::return_pg_chunks_to_dev_heap(const pg_id_t pg_id) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return false;
}

auto pg_chunk_collection = pg_it->second;
auto pdev_id = pg_chunk_collection->m_pg_chunks[0]->get_pdev_id();
auto pdev_it = m_per_dev_heap.find(pdev_id);
RELEASE_ASSERT(pdev_it != m_per_dev_heap.end(), "pdev {} should in per dev heap", pdev_id);
auto pdev_heap = pdev_it->second;

{
std::scoped_lock lock(pdev_heap->mtx, pg_chunk_collection->mtx);
for (auto& chunk : pg_chunk_collection->m_pg_chunks) {
if (chunk->m_state == ChunkState::INUSE) {
chunk->m_state = ChunkState::AVAILABLE;
} // with shard which should be first
chunk->m_pg_id = std::nullopt;
chunk->m_v_chunk_id = std::nullopt;

pdev_heap->m_heap.emplace(chunk);
pdev_heap->available_blk_count += chunk->available_blks();
}
}
m_per_pg_chunks.erase(pg_it);
return true;
}

uint32_t HeapChunkSelector::get_chunk_size() const {
const auto chunk = m_chunks.begin()->second;
return chunk->size();
Expand Down
14 changes: 14 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ class HeapChunkSelector : public homestore::ChunkSelector {
// It is used in two scenarios: 1. seal shard 2. create shard rollback
bool release_chunk(const pg_id_t pg_id, const chunk_num_t v_chunk_id);

bool reset_pg_chunks(pg_id_t pg_id);

/**
* Releases all chunks associated with the specified pg_id.
*
* This function is used to return all chunks that are currently associated with a particular
* pg identified by the given pg_id. It is typically used in scenarios where
* all chunks associated with a pg need to be freed, such as pg move out.
*
* @param pg_id The ID of the protection group whose chunks are to be released.
* @return A boolean value indicating whether the operation was successful.
*/
bool return_pg_chunks_to_dev_heap(pg_id_t pg_id);

/**
* select chunks for pg, chunks need to be in same pdev.
*
Expand Down
48 changes: 48 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ static constexpr uint64_t io_align{512};
PGError toPgError(homestore::ReplServiceError const&);
BlobError toBlobError(homestore::ReplServiceError const&);
ShardError toShardError(homestore::ReplServiceError const&);
ENUM(PGState, uint8_t, ALIVE = 0, DESTROYED);

class HSHomeObject : public HomeObjectImpl {
private:
Expand Down Expand Up @@ -83,6 +84,7 @@ class HSHomeObject : public HomeObjectImpl {

struct pg_info_superblk {
pg_id_t id;
PGState state;
uint32_t num_members;
uint32_t num_chunks;
peer_id_t replica_set_uuid;
Expand Down Expand Up @@ -112,6 +114,7 @@ class HSHomeObject : public HomeObjectImpl {

pg_info_superblk& operator=(pg_info_superblk const& rhs) {
id = rhs.id;
state = rhs.state;
num_members = rhs.num_members;
num_chunks = rhs.num_chunks;
pg_size = rhs.pg_size;
Expand Down Expand Up @@ -483,6 +486,20 @@ class HSHomeObject : public HomeObjectImpl {
void on_pg_replace_member(homestore::group_id_t group_id, const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in);

/**
* @brief Cleans up and recycles resources for the PG identified by the given pg_id on the current node.
*
* This function is called when the replication device leaves or when a specific PG is destroyed on the current
* node. Note that this function does not perform Raft synchronization with other nodes.
*
* Possible scenarios for calling this function include:
* - A member-out node cleaning up resources for a specified PG.
* - During baseline rsync to clean up PG resources on the current node.
*
* @param pg_id The ID of the PG to be destroyed.
*/
void pg_destroy(pg_id_t pg_id);

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand Down Expand Up @@ -569,6 +586,7 @@ class HSHomeObject : public HomeObjectImpl {
size_t hash_len) const;

std::shared_ptr< BlobIndexTable > recover_index_table(homestore::superblk< homestore::index_table_sb >&& sb);
std::optional< pg_id_t > get_pg_id_with_group_id(homestore::group_id_t group_id) const;

private:
std::shared_ptr< BlobIndexTable > create_index_table();
Expand All @@ -593,6 +611,36 @@ class HSHomeObject : public HomeObjectImpl {
sisl::io_blob_safe& get_pad_buf(uint32_t pad_len);

// void trigger_timed_events();

/**
* @brief Marks the PG as destroyed.
*
* Updates the internal state to indicate that the specified PG is destroyed and ensures its state is persisted.
*
* @param pg_id The ID of the PG to be marked as destroyed.
*/
void mark_pg_destroyed(pg_id_t pg_id);

/**
* @brief Cleans up and recycles resources for shards in the PG located using a PG ID.
*
* @param pg_id The ID of the PG whose shards are to be destroyed.
*/
void destroy_shards(pg_id_t pg_id);

/**
* @brief Resets the chunks for the given PG ID and triggers a checkpoint flush.
*
* @param pg_id The ID of the PG whose chunks are to be reset.
*/
void reset_pg_chunks(pg_id_t pg_id);

/**
* @brief Cleans up and recycles resources for the PG located using a pg_id.
*
* @param pg_id The ID of the PG to be cleaned.
*/
void cleanup_pg_resources(pg_id_t pg_id);
};

class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks {
Expand Down
85 changes: 82 additions & 3 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,79 @@ void HSHomeObject::on_pg_replace_member(homestore::group_id_t group_id, const re
boost::uuids::to_string(member_in.id));
}

std::optional< pg_id_t > HSHomeObject::get_pg_id_with_group_id(homestore::group_id_t group_id) const {
auto lg = std::shared_lock(_pg_lock);
auto iter = std::find_if(_pg_map.begin(), _pg_map.end(), [group_id](const auto& entry) {
return pg_repl_dev(*entry.second).group_id() == group_id;
});
if (iter != _pg_map.end()) {
return iter->first;
} else {
return std::nullopt;
}
}

void HSHomeObject::pg_destroy(pg_id_t pg_id) {
mark_pg_destroyed(pg_id);
destroy_shards(pg_id);
reset_pg_chunks(pg_id);
cleanup_pg_resources(pg_id);
LOGI("pg {} is destroyed", pg_id);
}
void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on pg destroy with unknown pg_id {}", pg_id);
return;
}
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
hs_pg->pg_sb_->state = PGState::DESTROYED;
hs_pg->pg_sb_.write();
}

void HSHomeObject::reset_pg_chunks(pg_id_t pg_id) {
bool res = chunk_selector_->reset_pg_chunks(pg_id);
RELEASE_ASSERT(res, "Failed to reset all chunks in pg {}", pg_id);
auto fut = homestore::hs()->cp_mgr().trigger_cp_flush(true /* force */);
auto on_complete = [&](auto success) {
RELEASE_ASSERT(success, "Failed to trigger CP flush");
LOGI("CP Flush completed");
};
on_complete(std::move(fut).get());
}

void HSHomeObject::cleanup_pg_resources(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on pg resource release with unknown pg_id {}", pg_id);
return;
}

// destroy index table
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
if (nullptr != hs_pg->index_table_) {
auto uuid_str = boost::uuids::to_string(hs_pg->index_table_->uuid());
index_table_pg_map_.erase(uuid_str);
hs()->index_service().remove_index_table(hs_pg->index_table_);
hs_pg->index_table_->destroy();
}

// destroy pg super blk
hs_pg->pg_sb_.destroy();

// return pg chunks to dev heap
// which must be done after destroying pg super blk to avoid multiple pg use same chunks
bool res = chunk_selector_->return_pg_chunks_to_dev_heap(pg_id);
RELEASE_ASSERT(res, "Failed to return pg {} chunks to dev_heap", pg_id);

// erase pg in pg map
_pg_map.erase(iter);
}

void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) {
RELEASE_ASSERT(hs_pg->pg_info_.replica_set_uuid == hs_pg->repl_dev_->group_id(),
"PGInfo replica set uuid mismatch with ReplDev instance for {}",
Expand Down Expand Up @@ -333,9 +406,14 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c
// add entry in map, so that index recovery can update the PG.
std::scoped_lock lg(index_lock_);
auto it = index_table_pg_map_.find(uuid_str);
RELEASE_ASSERT(it != index_table_pg_map_.end(), "IndexTable should be recovered before PG");
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;
if (it != index_table_pg_map_.end()) {
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;
} else {
RELEASE_ASSERT(hs_pg->pg_sb_->state == PGState::DESTROYED, "IndexTable should be recovered before PG");
hs_pg->index_table_ = nullptr;
LOGI("Index table not found for destroyed pg_id={}, index_table_uuid={}", pg_id, uuid_str);
}

add_pg_to_map(std::move(hs_pg));
}
Expand Down Expand Up @@ -365,6 +443,7 @@ HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, share
pg_sb_.create(sizeof(pg_info_superblk) - sizeof(char) + pg_info_.members.size() * sizeof(pg_members) +
num_chunks * sizeof(homestore::chunk_num_t));
pg_sb_->id = pg_info_.id;
pg_sb_->state = PGState::ALIVE;
pg_sb_->num_members = pg_info_.members.size();
pg_sb_->num_chunks = num_chunks;
pg_sb_->pg_size = pg_info_.size;
Expand Down
20 changes: 20 additions & 0 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ShardError toShardError(ReplServiceError const& e) {
}
}


uint64_t ShardManager::max_shard_size() { return Gi; }

uint64_t ShardManager::max_shard_num_in_pg() { return ((uint64_t)0x01) << shard_width; }
Expand Down Expand Up @@ -533,6 +534,25 @@ bool HSHomeObject::release_chunk_based_on_create_shard_message(sisl::blob const&
}
}

void HSHomeObject::destroy_shards(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock, _shard_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on shards destroy with unknown pg_id {}", pg_id);
return;
}

auto& pg = iter->second;
for (auto& shard : pg->shards_) {
// release open shard v_chunk
auto hs_shard = s_cast< HS_Shard* >(shard.get());
// destroy shard super blk
hs_shard->sb_.destroy();
// erase shard in shard map
_shard_map.erase(shard->info.id);
}
}

HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t p_chunk_id,
homestore::chunk_num_t v_chunk_id) :
Shard(std::move(shard_info)), sb_(_shard_meta_name) {
Expand Down
10 changes: 8 additions & 2 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,14 @@ void ReplicationStateMachine::on_replace_member(const homestore::replica_member_
}

void ReplicationStateMachine::on_destroy(const homestore::group_id_t& group_id) {
// TODO:: add the logic to handle destroy
LOGI("replica destroyed");
auto PG_ID = home_object_->get_pg_id_with_group_id(group_id);
if (!PG_ID.has_value()) {
LOGW("do not have pg mapped by group_id {}", boost::uuids::to_string(group_id));
return;
}
home_object_->pg_destroy(PG_ID.value());
LOGI("replica destroyed, cleared PG {} resources with group_id {}", PG_ID.value(),
boost::uuids::to_string(group_id));
}

homestore::AsyncReplResult<>
Expand Down
20 changes: 18 additions & 2 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ class HomeObjectFixture : public ::testing::Test {
run_on_pg_leader(pg_id, [&]() {
auto v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(v_chunkID.has_value(), "failed to get shard v_chunk_id");
g_helper->set_v_chunk_id(v_chunkID.value());
g_helper->set_auxiliary_uint64_id(v_chunkID.value());
});

// get v_chunk_id from IPC and compare with local
auto leader_v_chunk_id = g_helper->get_v_chunk_id();
auto leader_v_chunk_id = g_helper->get_auxiliary_uint64_id();
auto local_v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(local_v_chunkID.has_value(), "failed to get shard v_chunk_id");
RELEASE_ASSERT(leader_v_chunk_id == local_v_chunkID, "v_chunk_id supposed to be identical");
Expand Down Expand Up @@ -332,6 +332,22 @@ class HomeObjectFixture : public ::testing::Test {
}
}

void verify_pg_destroy(pg_id_t pg_id, const string& index_table_uuid_str,
const std::vector< shard_id_t >& shard_id_vec) {
// check pg
ASSERT_FALSE(pg_exist(pg_id));
ASSERT_EQ(_obj_inst->index_table_pg_map_.find(index_table_uuid_str), _obj_inst->index_table_pg_map_.end());
// check shards
auto e = _obj_inst->shard_manager()->list_shards(pg_id).get();
ASSERT_EQ(e.error(), ShardError::UNKNOWN_PG);
for (const auto& shard_id : shard_id_vec) {
ASSERT_FALSE(shard_exist(shard_id));
}
// check chunk_selector
const auto& chunk_selector = _obj_inst->chunk_selector();
ASSERT_EQ(chunk_selector->m_per_pg_chunks.find(pg_id), chunk_selector->m_per_pg_chunks.end());
}

void verify_hs_pg(HSHomeObject::HS_PG* lhs_pg, HSHomeObject::HS_PG* rhs_pg) {
// verify index table
EXPECT_EQ(lhs_pg->index_table_->uuid(), rhs_pg->index_table_->uuid());
Expand Down
3 changes: 3 additions & 0 deletions src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ TEST_F(HomeObjectFixture, BasicEquivalence) {
}

TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) {
// test recovery with pristine state firstly
restart();

auto num_pgs = SISL_OPTIONS["num_pgs"].as< uint64_t >();
auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >() / num_pgs;

Expand Down
Loading

0 comments on commit 013ba18

Please sign in to comment.