Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge latest commits in main branch #247

Merged
merged 3 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.17"
version = "2.1.19"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
48 changes: 48 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,54 @@ bool HeapChunkSelector::release_chunk(const pg_id_t pg_id, const chunk_num_t v_c
return true;
}

bool HeapChunkSelector::reset_pg_chunks(pg_id_t pg_id) {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return false;
}
{
auto pg_chunk_collection = pg_it->second;
std::scoped_lock lock(pg_chunk_collection->mtx);
for (auto& chunk : pg_chunk_collection->m_pg_chunks) {
chunk->reset();
}
}
return true;
}

bool HeapChunkSelector::return_pg_chunks_to_dev_heap(const pg_id_t pg_id) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return false;
}

auto pg_chunk_collection = pg_it->second;
auto pdev_id = pg_chunk_collection->m_pg_chunks[0]->get_pdev_id();
auto pdev_it = m_per_dev_heap.find(pdev_id);
RELEASE_ASSERT(pdev_it != m_per_dev_heap.end(), "pdev {} should in per dev heap", pdev_id);
auto pdev_heap = pdev_it->second;

{
std::scoped_lock lock(pdev_heap->mtx, pg_chunk_collection->mtx);
for (auto& chunk : pg_chunk_collection->m_pg_chunks) {
if (chunk->m_state == ChunkState::INUSE) {
chunk->m_state = ChunkState::AVAILABLE;
} // with shard which should be first
chunk->m_pg_id = std::nullopt;
chunk->m_v_chunk_id = std::nullopt;

pdev_heap->m_heap.emplace(chunk);
pdev_heap->available_blk_count += chunk->available_blks();
}
}
m_per_pg_chunks.erase(pg_it);
return true;
}

uint32_t HeapChunkSelector::get_chunk_size() const {
const auto chunk = m_chunks.begin()->second;
return chunk->size();
Expand Down
14 changes: 14 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ class HeapChunkSelector : public homestore::ChunkSelector {
// It is used in two scenarios: 1. seal shard 2. create shard rollback
bool release_chunk(const pg_id_t pg_id, const chunk_num_t v_chunk_id);

bool reset_pg_chunks(pg_id_t pg_id);

/**
* Releases all chunks associated with the specified pg_id.
*
* This function is used to return all chunks that are currently associated with a particular
* pg identified by the given pg_id. It is typically used in scenarios where
* all chunks associated with a pg need to be freed, such as pg move out.
*
* @param pg_id The ID of the protection group whose chunks are to be released.
* @return A boolean value indicating whether the operation was successful.
*/
bool return_pg_chunks_to_dev_heap(pg_id_t pg_id);

/**
* select chunks for pg, chunks need to be in same pdev.
*
Expand Down
4 changes: 3 additions & 1 deletion src/lib/homestore_backend/hs_cp_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ using homestore::CPContext;

namespace homeobject {

std::unique_ptr< CPContext > HSHomeObject::MyCPCallbacks::on_switchover_cp(CP* cur_cp, CP* new_cp) { return nullptr; }
std::unique_ptr< CPContext > HSHomeObject::MyCPCallbacks::on_switchover_cp(CP* cur_cp, CP* new_cp) {
return std::make_unique< CPContext >(new_cp);
}

// when cp_flush is called, it means that all the dirty candidates are already in the dirty list.
// new dirty candidates will arrive on next cp's context.
Expand Down
48 changes: 48 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ static constexpr uint64_t io_align{512};
PGError toPgError(homestore::ReplServiceError const&);
BlobError toBlobError(homestore::ReplServiceError const&);
ShardError toShardError(homestore::ReplServiceError const&);
ENUM(PGState, uint8_t, ALIVE = 0, DESTROYED);

class HSHomeObject : public HomeObjectImpl {
private:
Expand Down Expand Up @@ -83,6 +84,7 @@ class HSHomeObject : public HomeObjectImpl {

struct pg_info_superblk {
pg_id_t id;
PGState state;
uint32_t num_members;
uint32_t num_chunks;
peer_id_t replica_set_uuid;
Expand Down Expand Up @@ -112,6 +114,7 @@ class HSHomeObject : public HomeObjectImpl {

pg_info_superblk& operator=(pg_info_superblk const& rhs) {
id = rhs.id;
state = rhs.state;
num_members = rhs.num_members;
num_chunks = rhs.num_chunks;
pg_size = rhs.pg_size;
Expand Down Expand Up @@ -483,6 +486,20 @@ class HSHomeObject : public HomeObjectImpl {
void on_pg_replace_member(homestore::group_id_t group_id, const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in);

/**
* @brief Cleans up and recycles resources for the PG identified by the given pg_id on the current node.
*
* This function is called when the replication device leaves or when a specific PG is destroyed on the current
* node. Note that this function does not perform Raft synchronization with other nodes.
*
* Possible scenarios for calling this function include:
* - A member-out node cleaning up resources for a specified PG.
* - During baseline rsync to clean up PG resources on the current node.
*
* @param pg_id The ID of the PG to be destroyed.
*/
void pg_destroy(pg_id_t pg_id);

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand Down Expand Up @@ -569,6 +586,7 @@ class HSHomeObject : public HomeObjectImpl {
size_t hash_len) const;

std::shared_ptr< BlobIndexTable > recover_index_table(homestore::superblk< homestore::index_table_sb >&& sb);
std::optional< pg_id_t > get_pg_id_with_group_id(homestore::group_id_t group_id) const;

private:
std::shared_ptr< BlobIndexTable > create_index_table();
Expand All @@ -593,6 +611,36 @@ class HSHomeObject : public HomeObjectImpl {
sisl::io_blob_safe& get_pad_buf(uint32_t pad_len);

// void trigger_timed_events();

/**
* @brief Marks the PG as destroyed.
*
* Updates the internal state to indicate that the specified PG is destroyed and ensures its state is persisted.
*
* @param pg_id The ID of the PG to be marked as destroyed.
*/
void mark_pg_destroyed(pg_id_t pg_id);

/**
* @brief Cleans up and recycles resources for shards in the PG located using a PG ID.
*
* @param pg_id The ID of the PG whose shards are to be destroyed.
*/
void destroy_shards(pg_id_t pg_id);

/**
* @brief Resets the chunks for the given PG ID and triggers a checkpoint flush.
*
* @param pg_id The ID of the PG whose chunks are to be reset.
*/
void reset_pg_chunks(pg_id_t pg_id);

/**
* @brief Cleans up and recycles resources for the PG located using a pg_id.
*
* @param pg_id The ID of the PG to be cleaned.
*/
void cleanup_pg_resources(pg_id_t pg_id);
};

class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks {
Expand Down
90 changes: 86 additions & 4 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ PGError toPgError(ReplServiceError const& e) {
case ReplServiceError::OK:
DEBUG_ASSERT(false, "Should not process OK!");
[[fallthrough]];
case ReplServiceError::DATA_DUPLICATED:
[[fallthrough]];
case ReplServiceError::FAILED:
return PGError::UNKNOWN;
default:
return PGError::UNKNOWN;
}
return PGError::UNKNOWN;
}

[[maybe_unused]] static homestore::ReplDev& pg_repl_dev(PG const& pg) {
Expand Down Expand Up @@ -265,6 +268,79 @@ void HSHomeObject::on_pg_replace_member(homestore::group_id_t group_id, const re
boost::uuids::to_string(member_in.id));
}

std::optional< pg_id_t > HSHomeObject::get_pg_id_with_group_id(homestore::group_id_t group_id) const {
auto lg = std::shared_lock(_pg_lock);
auto iter = std::find_if(_pg_map.begin(), _pg_map.end(), [group_id](const auto& entry) {
return pg_repl_dev(*entry.second).group_id() == group_id;
});
if (iter != _pg_map.end()) {
return iter->first;
} else {
return std::nullopt;
}
}

void HSHomeObject::pg_destroy(pg_id_t pg_id) {
mark_pg_destroyed(pg_id);
destroy_shards(pg_id);
reset_pg_chunks(pg_id);
cleanup_pg_resources(pg_id);
LOGI("pg {} is destroyed", pg_id);
}
void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on pg destroy with unknown pg_id {}", pg_id);
return;
}
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
hs_pg->pg_sb_->state = PGState::DESTROYED;
hs_pg->pg_sb_.write();
}

void HSHomeObject::reset_pg_chunks(pg_id_t pg_id) {
bool res = chunk_selector_->reset_pg_chunks(pg_id);
RELEASE_ASSERT(res, "Failed to reset all chunks in pg {}", pg_id);
auto fut = homestore::hs()->cp_mgr().trigger_cp_flush(true /* force */);
auto on_complete = [&](auto success) {
RELEASE_ASSERT(success, "Failed to trigger CP flush");
LOGI("CP Flush completed");
};
on_complete(std::move(fut).get());
}

void HSHomeObject::cleanup_pg_resources(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on pg resource release with unknown pg_id {}", pg_id);
return;
}

// destroy index table
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
if (nullptr != hs_pg->index_table_) {
auto uuid_str = boost::uuids::to_string(hs_pg->index_table_->uuid());
index_table_pg_map_.erase(uuid_str);
hs()->index_service().remove_index_table(hs_pg->index_table_);
hs_pg->index_table_->destroy();
}

// destroy pg super blk
hs_pg->pg_sb_.destroy();

// return pg chunks to dev heap
// which must be done after destroying pg super blk to avoid multiple pg use same chunks
bool res = chunk_selector_->return_pg_chunks_to_dev_heap(pg_id);
RELEASE_ASSERT(res, "Failed to return pg {} chunks to dev_heap", pg_id);

// erase pg in pg map
_pg_map.erase(iter);
}

void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) {
RELEASE_ASSERT(hs_pg->pg_info_.replica_set_uuid == hs_pg->repl_dev_->group_id(),
"PGInfo replica set uuid mismatch with ReplDev instance for {}",
Expand Down Expand Up @@ -332,9 +408,14 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c
// add entry in map, so that index recovery can update the PG.
std::scoped_lock lg(index_lock_);
auto it = index_table_pg_map_.find(uuid_str);
RELEASE_ASSERT(it != index_table_pg_map_.end(), "IndexTable should be recovered before PG");
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;
if (it != index_table_pg_map_.end()) {
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;
} else {
RELEASE_ASSERT(hs_pg->pg_sb_->state == PGState::DESTROYED, "IndexTable should be recovered before PG");
hs_pg->index_table_ = nullptr;
LOGI("Index table not found for destroyed pg_id={}, index_table_uuid={}", pg_id, uuid_str);
}

add_pg_to_map(std::move(hs_pg));
}
Expand Down Expand Up @@ -364,6 +445,7 @@ HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, share
pg_sb_.create(sizeof(pg_info_superblk) - sizeof(char) + pg_info_.members.size() * sizeof(pg_members) +
num_chunks * sizeof(homestore::chunk_num_t));
pg_sb_->id = pg_info_.id;
pg_sb_->state = PGState::ALIVE;
pg_sb_->num_members = pg_info_.members.size();
pg_sb_->num_chunks = num_chunks;
pg_sb_->pg_size = pg_info_.size;
Expand Down
20 changes: 20 additions & 0 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ShardError toShardError(ReplServiceError const& e) {
}
}


uint64_t ShardManager::max_shard_size() { return Gi; }

uint64_t ShardManager::max_shard_num_in_pg() { return ((uint64_t)0x01) << shard_width; }
Expand Down Expand Up @@ -544,6 +545,25 @@ bool HSHomeObject::release_chunk_based_on_create_shard_message(sisl::blob const&
}
}

void HSHomeObject::destroy_shards(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock, _shard_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on shards destroy with unknown pg_id {}", pg_id);
return;
}

auto& pg = iter->second;
for (auto& shard : pg->shards_) {
// release open shard v_chunk
auto hs_shard = s_cast< HS_Shard* >(shard.get());
// destroy shard super blk
hs_shard->sb_.destroy();
// erase shard in shard map
_shard_map.erase(shard->info.id);
}
}

HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t p_chunk_id,
homestore::chunk_num_t v_chunk_id) :
Shard(std::move(shard_info)), sb_(_shard_meta_name) {
Expand Down
10 changes: 8 additions & 2 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,14 @@ void ReplicationStateMachine::on_replace_member(const homestore::replica_member_
}

void ReplicationStateMachine::on_destroy(const homestore::group_id_t& group_id) {
// TODO:: add the logic to handle destroy
LOGI("replica destroyed");
auto PG_ID = home_object_->get_pg_id_with_group_id(group_id);
if (!PG_ID.has_value()) {
LOGW("do not have pg mapped by group_id {}", boost::uuids::to_string(group_id));
return;
}
home_object_->pg_destroy(PG_ID.value());
LOGI("replica destroyed, cleared PG {} resources with group_id {}", PG_ID.value(),
boost::uuids::to_string(group_id));
}

homestore::AsyncReplResult<>
Expand Down
20 changes: 18 additions & 2 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ class HomeObjectFixture : public ::testing::Test {
run_on_pg_leader(pg_id, [&]() {
auto v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(v_chunkID.has_value(), "failed to get shard v_chunk_id");
g_helper->set_v_chunk_id(v_chunkID.value());
g_helper->set_auxiliary_uint64_id(v_chunkID.value());
});

// get v_chunk_id from IPC and compare with local
auto leader_v_chunk_id = g_helper->get_v_chunk_id();
auto leader_v_chunk_id = g_helper->get_auxiliary_uint64_id();
auto local_v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(local_v_chunkID.has_value(), "failed to get shard v_chunk_id");
RELEASE_ASSERT(leader_v_chunk_id == local_v_chunkID, "v_chunk_id supposed to be identical");
Expand Down Expand Up @@ -332,6 +332,22 @@ class HomeObjectFixture : public ::testing::Test {
}
}

void verify_pg_destroy(pg_id_t pg_id, const string& index_table_uuid_str,
const std::vector< shard_id_t >& shard_id_vec) {
// check pg
ASSERT_FALSE(pg_exist(pg_id));
ASSERT_EQ(_obj_inst->index_table_pg_map_.find(index_table_uuid_str), _obj_inst->index_table_pg_map_.end());
// check shards
auto e = _obj_inst->shard_manager()->list_shards(pg_id).get();
ASSERT_EQ(e.error(), ShardError::UNKNOWN_PG);
for (const auto& shard_id : shard_id_vec) {
ASSERT_FALSE(shard_exist(shard_id));
}
// check chunk_selector
const auto& chunk_selector = _obj_inst->chunk_selector();
ASSERT_EQ(chunk_selector->m_per_pg_chunks.find(pg_id), chunk_selector->m_per_pg_chunks.end());
}

void verify_hs_pg(HSHomeObject::HS_PG* lhs_pg, HSHomeObject::HS_PG* rhs_pg) {
// verify index table
EXPECT_EQ(lhs_pg->index_table_->uuid(), rhs_pg->index_table_->uuid());
Expand Down
Loading
Loading