Skip to content

Commit

Permalink
Blob duplication handling
Browse files Browse the repository at this point in the history
This commit addresses blob duplication issues caused by resync, it mainly happens when resend snapshot mesg during baseline resync and apply log after snapshot completion. This helps avoid unnecessary GC due to duplicated data.
This mechanism is effective only for duplicated blobs. Since we do not record the blks of the `shardInfo` stored in the data service, we are unable to skip data writes for duplicated shards.
  • Loading branch information
yuwmao committed Dec 23, 2024
1 parent 1f5b242 commit f743e81
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 23 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.5.19]@oss/master")
self.requires("homestore/[^6.6.0]@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/3.3.1", override=True)
Expand Down
36 changes: 30 additions & 6 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ BlobError toBlobError(ReplServiceError const& e) {
case ReplServiceError::RESULT_NOT_EXIST_YET:
[[fallthrough]];
case ReplServiceError::TERM_MISMATCH:
[[fallthrough]];
case ReplServiceError::DATA_DUPLICATED:
return BlobError(BlobErrorCode::REPLICATION_ERROR);
case ReplServiceError::NOT_LEADER:
return BlobError(BlobErrorCode::NOT_LEADER);
Expand Down Expand Up @@ -111,6 +113,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
req->header()->payload_crc = 0;
req->header()->shard_id = shard.id;
req->header()->pg_id = pg_id;
req->header()->blob_id = new_blob_id;
req->header()->seal();

// Serialize blob_id as Replication key
Expand Down Expand Up @@ -192,11 +195,11 @@ bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, exist_already:{}, status:{}, pbas: {}",
blob_info.shard_id, blob_info.blob_id, exist_already, status, blob_info.pbas.to_string());
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", blob_info.blob_id, enum_name(status));
return false;
}
if (!exist_already) {
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", blob_info.blob_id, enum_name(status));
return false;
}
// The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always
// done ahead of the Index Checkpoint. Hence, if the index already has this entity, whatever durable
// counters updated as part of the update would have been persisted already in PG superblock. So if we were
Expand All @@ -215,6 +218,9 @@ bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob
de.active_blob_count.fetch_add(1, std::memory_order_relaxed);
de.total_occupied_blk_count.fetch_add(blob_info.pbas.blk_count(), std::memory_order_relaxed);
});
} else {
LOGTRACEMOD(blobmgr, "blob already exists in index table, skip it. shard_id: {} blob_id: {}",
blob_info.shard_id, blob_info.blob_id);
}
return true;
}
Expand Down Expand Up @@ -354,6 +360,13 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

auto pg_iter = _pg_map.find(msg_header->pg_id);
if (pg_iter == _pg_map.end()) {
LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later",
msg_header->pg_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(msg_header->shard_id);
if (shard_iter == _shard_map.end()) {
Expand All @@ -362,11 +375,22 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

homestore::blk_alloc_hints hints;

auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());
BLOGD(msg_header->shard_id, "n/a", "Picked p_chunk_id={}", hs_shard->sb_->p_chunk_id);

homestore::blk_alloc_hints hints;
hints.chunk_id_hint = hs_shard->sb_->p_chunk_id;

if (msg_header->blob_id != 0) {
//check if the blob already exists, if yes, return the blk id
auto index_table = d_cast< HS_PG* >(pg_iter->second.get())->index_table_;
auto r = get_blob_from_index_table(index_table, msg_header->shard_id, msg_header->blob_id);
if (r.hasValue()) {
LOGT("Blob has already been persisted, blob_id:{}, shard_id:{}", msg_header->blob_id, msg_header->shard_id);
hints.committed_blk_id = r.value();
}
}

return hints;
}

Expand Down
3 changes: 2 additions & 1 deletion src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ PGError toPgError(ReplServiceError const& e) {
[[fallthrough]];
case ReplServiceError::FAILED:
return PGError::UNKNOWN;
default:
return PGError::UNKNOWN;
}
return PGError::UNKNOWN;
}

[[maybe_unused]] static homestore::ReplDev& pg_repl_dev(PG const& pg) {
Expand Down
13 changes: 1 addition & 12 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ void HSHomeObject::local_create_shard(ShardInfo shard_info, homestore::chunk_num
auto pg_id = shard_info.placement_group;
auto chunk = chunk_selector_->select_specific_chunk(pg_id, v_chunk_id);
RELEASE_ASSERT(chunk != nullptr, "chunk selection failed with v_chunk_id: {} in PG: {}", v_chunk_id, pg_id);
}
} else { LOGD("shard {} already exist, skip creating shard", shard_info.id); }

// update pg's total_occupied_blk_count
HS_PG* hs_pg{nullptr};
Expand Down Expand Up @@ -379,17 +379,6 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
local_create_shard(shard_info, v_chunk_id, blkids.chunk_num(), blkids.blk_count());
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }

// update pg's total_occupied_blk_count
HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(shard_info.placement_group);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HS_PG* >(iter->second.get());
}
hs_pg->durable_entities_update([&blkids](auto& de) {
de.total_occupied_blk_count.fetch_add(blkids.blk_count(), std::memory_order_relaxed);
});
LOGI("Commit done for CREATE_SHARD_MSG for shard {}", shard_info.id);

break;
Expand Down
10 changes: 10 additions & 0 deletions src/lib/homestore_backend/index_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ HSHomeObject::recover_index_table(homestore::superblk< homestore::index_table_sb
return index_table;
}

//The bool result indicates if the blob already exists, but if the existing pbas is the same as the new pbas, it will return homestore::btree_status_t::success.
std::pair< bool, homestore::btree_status_t > HSHomeObject::add_to_index_table(shared< BlobIndexTable > index_table,
const BlobInfo& blob_info) {
BlobRouteKey index_key{BlobRoute{blob_info.shard_id, blob_info.blob_id}};
Expand All @@ -48,8 +49,17 @@ std::pair< bool, homestore::btree_status_t > HSHomeObject::add_to_index_table(sh
&existing_value};
auto status = index_table->put(put_req);
if (status != homestore::btree_status_t::success) {
if (existing_value.pbas().is_valid() && existing_value.pbas() == blob_info.pbas) {
LOGT(
"blob already exists, but existing pbas is the same as the new pbas, ignore it, blob_id={}, pbas={}, status={}",
blob_info.blob_id, blob_info.pbas.to_string(), status);
return {true, homestore::btree_status_t::success};
}
if (existing_value.pbas().is_valid() || existing_value.pbas() == tombstone_pbas) {
// Check if the blob id already exists in the index or its tombstone.
LOGW(
"blob already exists, and conflict occurs, blob_id={}, existing pbas={}, new pbas={}, status={}",
blob_info.blob_id, existing_value.pbas().to_string(), blob_info.pbas.to_string(), status);
return {true, status};
}
LOGE("Failed to put to index table error {}", status);
Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore_backend/replication_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ struct ReplicationMessageHeader : public BaseMessageHeader<ReplicationMessageHea
pg_id_t pg_id{0};
uint8_t reserved_pad[4]{};
shard_id_t shard_id{0};
blob_id_t blob_id{0};

bool corrupted() const{
if (magic_num != HOMEOBJECT_REPLICATION_MAGIC || protocol_version != HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1) {
Expand Down
6 changes: 3 additions & 3 deletions src/lib/homestore_backend/snapshot_receive_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,15 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
.thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > {
// TODO: do we need to update repl_dev metrics?
if (err) {
LOGE("Failed to write shard info to blk_id: {}", blk_id.to_string());
LOGE("Failed to write blob info to blk_id: {}", blk_id.to_string());
return folly::makeUnexpected(BlobError(BlobErrorCode::REPLICATION_ERROR));
}
LOGD("Shard info written to blk_id:{}", blk_id.to_string());
LOGD("Blob info written to blk_id:{}", blk_id.to_string());
return 0;
})
.get();
if (ret.hasError()) {
LOGE("Failed to write shard info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string());
LOGE("Failed to write blob info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string());
free_allocated_blks();
return WRITE_DATA_ERR;
}
Expand Down

0 comments on commit f743e81

Please sign in to comment.