Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Baseline Resync Enhancement #244

Merged
merged 3 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.20"
version = "2.1.21"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
36 changes: 30 additions & 6 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ BlobError toBlobError(ReplServiceError const& e) {
case ReplServiceError::RESULT_NOT_EXIST_YET:
[[fallthrough]];
case ReplServiceError::TERM_MISMATCH:
[[fallthrough]];
case ReplServiceError::DATA_DUPLICATED:
return BlobError(BlobErrorCode::REPLICATION_ERROR);
case ReplServiceError::NOT_LEADER:
return BlobError(BlobErrorCode::NOT_LEADER);
Expand Down Expand Up @@ -111,6 +113,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
req->header()->payload_crc = 0;
req->header()->shard_id = shard.id;
req->header()->pg_id = pg_id;
req->header()->blob_id = new_blob_id;
req->header()->seal();

// Serialize blob_id as Replication key
Expand Down Expand Up @@ -192,11 +195,11 @@ bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, exist_already:{}, status:{}, pbas: {}",
blob_info.shard_id, blob_info.blob_id, exist_already, status, blob_info.pbas.to_string());
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", blob_info.blob_id, enum_name(status));
return false;
}
if (!exist_already) {
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", blob_info.blob_id, enum_name(status));
return false;
}
// The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always
// done ahead of the Index Checkpoint. Hence, if the index already has this entity, whatever durable
// counters updated as part of the update would have been persisted already in PG superblock. So if we were
Expand All @@ -215,6 +218,9 @@ bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob
de.active_blob_count.fetch_add(1, std::memory_order_relaxed);
de.total_occupied_blk_count.fetch_add(blob_info.pbas.blk_count(), std::memory_order_relaxed);
});
} else {
LOGTRACEMOD(blobmgr, "blob already exists in index table, skip it. shard_id: {} blob_id: {}",
blob_info.shard_id, blob_info.blob_id);
}
return true;
}
Expand Down Expand Up @@ -354,6 +360,13 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

auto pg_iter = _pg_map.find(msg_header->pg_id);
yuwmao marked this conversation as resolved.
Show resolved Hide resolved
if (pg_iter == _pg_map.end()) {
LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later",
msg_header->pg_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(msg_header->shard_id);
if (shard_iter == _shard_map.end()) {
Expand All @@ -362,11 +375,22 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

homestore::blk_alloc_hints hints;

auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());
BLOGD(msg_header->shard_id, "n/a", "Picked p_chunk_id={}", hs_shard->sb_->p_chunk_id);

homestore::blk_alloc_hints hints;
hints.chunk_id_hint = hs_shard->sb_->p_chunk_id;

if (msg_header->blob_id != 0) {
//check if the blob already exists, if yes, return the blk id
auto index_table = d_cast< HS_PG* >(pg_iter->second.get())->index_table_;
auto r = get_blob_from_index_table(index_table, msg_header->shard_id, msg_header->blob_id);
if (r.hasValue()) {
LOGT("Blob has already been persisted, blob_id:{}, shard_id:{}", msg_header->blob_id, msg_header->shard_id);
hints.committed_blk_id = r.value();
}
}

return hints;
}

Expand Down
13 changes: 1 addition & 12 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ void HSHomeObject::local_create_shard(ShardInfo shard_info, homestore::chunk_num
auto pg_id = shard_info.placement_group;
auto chunk = chunk_selector_->select_specific_chunk(pg_id, v_chunk_id);
RELEASE_ASSERT(chunk != nullptr, "chunk selection failed with v_chunk_id: {} in PG: {}", v_chunk_id, pg_id);
}
} else { LOGD("shard {} already exist, skip creating shard", shard_info.id); }

// update pg's total_occupied_blk_count
HS_PG* hs_pg{nullptr};
Expand Down Expand Up @@ -380,17 +380,6 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
local_create_shard(shard_info, v_chunk_id, blkids.chunk_num(), blkids.blk_count());
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }

// update pg's total_occupied_blk_count
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant, these codes are already put into the local_create_shard.

HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(shard_info.placement_group);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HS_PG* >(iter->second.get());
}
hs_pg->durable_entities_update([&blkids](auto& de) {
de.total_occupied_blk_count.fetch_add(blkids.blk_count(), std::memory_order_relaxed);
});
LOGI("Commit done for CREATE_SHARD_MSG for shard {}", shard_info.id);

break;
Expand Down
14 changes: 12 additions & 2 deletions src/lib/homestore_backend/index_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ HSHomeObject::recover_index_table(homestore::superblk< homestore::index_table_sb
return index_table;
}

//The bool result indicates if the blob already exists, but if the existing pbas is the same as the new pbas, it will return homestore::btree_status_t::success.
std::pair< bool, homestore::btree_status_t > HSHomeObject::add_to_index_table(shared< BlobIndexTable > index_table,
const BlobInfo& blob_info) {
BlobRouteKey index_key{BlobRoute{blob_info.shard_id, blob_info.blob_id}};
Expand All @@ -48,8 +49,17 @@ std::pair< bool, homestore::btree_status_t > HSHomeObject::add_to_index_table(sh
&existing_value};
auto status = index_table->put(put_req);
if (status != homestore::btree_status_t::success) {
if (existing_value.pbas().is_valid() || existing_value.pbas() == tombstone_pbas) {
// Check if the blob id already exists in the index or its tombstone.
if ((existing_value.pbas().is_valid() && existing_value.pbas() == blob_info.pbas)
|| existing_value.pbas() == tombstone_pbas) {
LOGT(
"blob already exists, but existing pbas is the same as the new pbas or has been deleted, ignore it, blob_id={}, pbas={}, status={}",
blob_info.blob_id, blob_info.pbas.to_string(), status);
return {true, homestore::btree_status_t::success};
}
if (existing_value.pbas().is_valid()) {
LOGE(
"blob already exists, and conflict occurs, blob_id={}, existing pbas={}, new pbas={}, status={}",
blob_info.blob_id, existing_value.pbas().to_string(), blob_info.pbas.to_string(), status);
return {true, status};
}
LOGE("Failed to put to index table error {}", status);
Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore_backend/replication_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ struct ReplicationMessageHeader : public BaseMessageHeader<ReplicationMessageHea
pg_id_t pg_id{0};
uint8_t reserved_pad[4]{};
shard_id_t shard_id{0};
blob_id_t blob_id{0};

bool corrupted() const{
if (magic_num != HOMEOBJECT_REPLICATION_MAGIC || protocol_version != HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1) {
Expand Down
8 changes: 7 additions & 1 deletion src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,18 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna

auto pg_data = GetSizePrefixedResyncPGMetaData(data_buf);

//Check if pg exists, if yes, clean the stale pg resources, may be due to previous snapshot failure. Let's resync on a pristine base
if (home_object_->pg_exists(pg_data->pg_id())) {
LOGI("pg already exists, clean pg resources before snapshot, pg_id:{} {}", pg_data->pg_id(), log_suffix);
home_object_->pg_destroy(pg_data->pg_id());
LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn());
m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id());
}
// Check if the snapshot context is same as the current snapshot context.
// If not, drop the previous context and re-init a new one
if (m_snp_rcv_handler->get_context_lsn() != context->get_lsn()) {
LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn());
m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id());
// TODO: Reset all data of current PG - let's resync on a pristine base
}

auto ret = m_snp_rcv_handler->process_pg_snapshot_data(*pg_data);
Expand Down
6 changes: 3 additions & 3 deletions src/lib/homestore_backend/snapshot_receive_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,15 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
.thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > {
// TODO: do we need to update repl_dev metrics?
if (err) {
LOGE("Failed to write shard info to blk_id: {}", blk_id.to_string());
LOGE("Failed to write blob info to blk_id: {}", blk_id.to_string());
return folly::makeUnexpected(BlobError(BlobErrorCode::REPLICATION_ERROR));
}
LOGD("Shard info written to blk_id:{}", blk_id.to_string());
LOGD("Blob info written to blk_id:{}", blk_id.to_string());
return 0;
})
.get();
if (ret.hasError()) {
LOGE("Failed to write shard info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string());
LOGE("Failed to write blob info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string());
free_allocated_blks();
return WRITE_DATA_ERR;
}
Expand Down
Loading