Skip to content

Commit

Permalink
Blob duplication handling
Browse files Browse the repository at this point in the history
This commit addresses blob duplication issues caused by resync, it mainly happens when resend snapshot mesg during baseline resync and apply log after snapshot completion. This helps avoid unnecessary GC due to duplicated data.
This mechanism is effective only for duplicated blobs. Since we do not record the blks of the `shardInfo` stored in the data service, we are unable to skip data writes for duplicated shards.
  • Loading branch information
yuwmao committed Dec 17, 2024
1 parent 1f5b242 commit 2a7e47a
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 24 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.5.19]@oss/master")
self.requires("homestore/[^6.6.0]@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/3.3.1", override=True)
Expand Down
38 changes: 32 additions & 6 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ BlobError toBlobError(ReplServiceError const& e) {
case ReplServiceError::RESULT_NOT_EXIST_YET:
[[fallthrough]];
case ReplServiceError::TERM_MISMATCH:
[[fallthrough]];
case ReplServiceError::DATA_DUPLICATED:
return BlobError(BlobErrorCode::REPLICATION_ERROR);
case ReplServiceError::NOT_LEADER:
return BlobError(BlobErrorCode::NOT_LEADER);
Expand Down Expand Up @@ -111,6 +113,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
req->header()->payload_crc = 0;
req->header()->shard_id = shard.id;
req->header()->pg_id = pg_id;
req->header()->blob_id = new_blob_id;
req->header()->seal();

// Serialize blob_id as Replication key
Expand Down Expand Up @@ -189,10 +192,10 @@ bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob
RELEASE_ASSERT(index_table != nullptr, "Index table not initialized");

// Write to index table with key {shard id, blob id} and value {pba}.
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, exist_already:{}, status:{}, pbas: {}",
blob_info.shard_id, blob_info.blob_id, exist_already, status, blob_info.pbas.to_string());
if (!exist_already) {
auto const [conflict, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, conflict:{}, status:{}, pbas: {}",
blob_info.shard_id, blob_info.blob_id, conflict, status, blob_info.pbas.to_string());
if (!conflict) {
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", blob_info.blob_id, enum_name(status));
return false;
Expand All @@ -215,6 +218,9 @@ bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob
de.active_blob_count.fetch_add(1, std::memory_order_relaxed);
de.total_occupied_blk_count.fetch_add(blob_info.pbas.blk_count(), std::memory_order_relaxed);
});
} else {
LOGTRACEMOD(blobmgr, "blob already exists in index table, skip it. shard_id: {} blob_id: {}",
blob_info.shard_id, blob_info.blob_id);
}
return true;
}
Expand Down Expand Up @@ -354,6 +360,13 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

auto pg_iter = _pg_map.find(msg_header->pg_id);
if (pg_iter == _pg_map.end()) {
LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later",
msg_header->pg_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(msg_header->shard_id);
if (shard_iter == _shard_map.end()) {
Expand All @@ -362,11 +375,24 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

homestore::blk_alloc_hints hints;

auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());
BLOGD(msg_header->shard_id, "n/a", "Picked p_chunk_id={}", hs_shard->sb_->p_chunk_id);

homestore::blk_alloc_hints hints;
hints.chunk_id_hint = hs_shard->sb_->p_chunk_id;

if (msg_header->blob_id != 0) {
//check if the blob already exists, if yes, return the blk id
auto index_table = d_cast< HS_PG* >(pg_iter->second.get())->index_table_;
auto r = get_blob_from_index_table(index_table, msg_header->shard_id, msg_header->blob_id);
if (r.hasValue()) {
LOGD("Blob has already been persisted, blob_id:{}, shard_id:{}", msg_header->blob_id, msg_header->shard_id);
hints.committed_blk_id = r.value();
} else {
LOGT("blob not found in index table, shard_id:{}, blob_id:{}", msg_header->shard_id, msg_header->blob_id);
}
}

return hints;
}

Expand Down
3 changes: 2 additions & 1 deletion src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ PGError toPgError(ReplServiceError const& e) {
[[fallthrough]];
case ReplServiceError::FAILED:
return PGError::UNKNOWN;
default:
return PGError::UNKNOWN;
}
return PGError::UNKNOWN;
}

[[maybe_unused]] static homestore::ReplDev& pg_repl_dev(PG const& pg) {
Expand Down
13 changes: 1 addition & 12 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ void HSHomeObject::local_create_shard(ShardInfo shard_info, homestore::chunk_num
auto pg_id = shard_info.placement_group;
auto chunk = chunk_selector_->select_specific_chunk(pg_id, v_chunk_id);
RELEASE_ASSERT(chunk != nullptr, "chunk selection failed with v_chunk_id: {} in PG: {}", v_chunk_id, pg_id);
}
} else { LOGD("shard {} already exist, skip creating shard", shard_info.id); }

// update pg's total_occupied_blk_count
HS_PG* hs_pg{nullptr};
Expand Down Expand Up @@ -379,17 +379,6 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
local_create_shard(shard_info, v_chunk_id, blkids.chunk_num(), blkids.blk_count());
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }

// update pg's total_occupied_blk_count
HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(shard_info.placement_group);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HS_PG* >(iter->second.get());
}
hs_pg->durable_entities_update([&blkids](auto& de) {
de.total_occupied_blk_count.fetch_add(blkids.blk_count(), std::memory_order_relaxed);
});
LOGI("Commit done for CREATE_SHARD_MSG for shard {}", shard_info.id);

break;
Expand Down
4 changes: 3 additions & 1 deletion src/lib/homestore_backend/index_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ HSHomeObject::recover_index_table(homestore::superblk< homestore::index_table_sb
return index_table;
}

//The bool result indicates if the blob already exists and has conflict with the input value.
std::pair< bool, homestore::btree_status_t > HSHomeObject::add_to_index_table(shared< BlobIndexTable > index_table,
const BlobInfo& blob_info) {
BlobRouteKey index_key{BlobRoute{blob_info.shard_id, blob_info.blob_id}};
Expand All @@ -48,7 +49,8 @@ std::pair< bool, homestore::btree_status_t > HSHomeObject::add_to_index_table(sh
&existing_value};
auto status = index_table->put(put_req);
if (status != homestore::btree_status_t::success) {
if (existing_value.pbas().is_valid() || existing_value.pbas() == tombstone_pbas) {
if (existing_value.pbas().is_valid() || existing_value.pbas() == tombstone_pbas
|| existing_value.pbas() != blob_info.pbas) {
// Check if the blob id already exists in the index or its tombstone.
return {true, status};
}
Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore_backend/replication_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ struct ReplicationMessageHeader : public BaseMessageHeader<ReplicationMessageHea
pg_id_t pg_id{0};
uint8_t reserved_pad[4]{};
shard_id_t shard_id{0};
blob_id_t blob_id{0};

bool corrupted() const{
if (magic_num != HOMEOBJECT_REPLICATION_MAGIC || protocol_version != HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1) {
Expand Down
6 changes: 3 additions & 3 deletions src/lib/homestore_backend/snapshot_receive_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,15 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
.thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > {
// TODO: do we need to update repl_dev metrics?
if (err) {
LOGE("Failed to write shard info to blk_id: {}", blk_id.to_string());
LOGE("Failed to write blob info to blk_id: {}", blk_id.to_string());
return folly::makeUnexpected(BlobError(BlobErrorCode::REPLICATION_ERROR));
}
LOGD("Shard info written to blk_id:{}", blk_id.to_string());
LOGD("Blob info written to blk_id:{}", blk_id.to_string());
return 0;
})
.get();
if (ret.hasError()) {
LOGE("Failed to write shard info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string());
LOGE("Failed to write blob info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string());
free_allocated_blks();
return WRITE_DATA_ERR;
}
Expand Down

0 comments on commit 2a7e47a

Please sign in to comment.