diff --git a/conanfile.py b/conanfile.py index 4d79d90..607a8e1 100644 --- a/conanfile.py +++ b/conanfile.py @@ -49,7 +49,7 @@ def build_requirements(self): def requirements(self): self.requires("sisl/[^12.2]@oss/master", transitive_headers=True) - self.requires("homestore/[^6.5.19]@oss/master") + self.requires("homestore/[^6.6.0]@oss/master") self.requires("iomgr/[^11.3]@oss/master") self.requires("lz4/1.9.4", override=True) self.requires("openssl/3.3.1", override=True) diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index e5da7ef..ea5bf8e 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -38,6 +38,8 @@ BlobError toBlobError(ReplServiceError const& e) { case ReplServiceError::RESULT_NOT_EXIST_YET: [[fallthrough]]; case ReplServiceError::TERM_MISMATCH: + [[fallthrough]]; + case ReplServiceError::DATA_DUPLICATED: return BlobError(BlobErrorCode::REPLICATION_ERROR); case ReplServiceError::NOT_LEADER: return BlobError(BlobErrorCode::NOT_LEADER); @@ -111,6 +113,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s req->header()->payload_crc = 0; req->header()->shard_id = shard.id; req->header()->pg_id = pg_id; + req->header()->blob_id = new_blob_id; req->header()->seal(); // Serialize blob_id as Replication key @@ -192,11 +195,11 @@ bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob auto const [exist_already, status] = add_to_index_table(index_table, blob_info); LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, exist_already:{}, status:{}, pbas: {}", blob_info.shard_id, blob_info.blob_id, exist_already, status, blob_info.pbas.to_string()); + if (status != homestore::btree_status_t::success) { + LOGE("Failed to insert into index table for blob {} err {}", blob_info.blob_id, enum_name(status)); + return false; + } if (!exist_already) { - if (status != homestore::btree_status_t::success) { - LOGE("Failed to insert into index table for blob {} err {}", blob_info.blob_id, enum_name(status)); - return false; - } // The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always // done ahead of the Index Checkpoint. Hence, if the index already has this entity, whatever durable // counters updated as part of the update would have been persisted already in PG superblock. So if we were @@ -215,6 +218,9 @@ bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob de.active_blob_count.fetch_add(1, std::memory_order_relaxed); de.total_occupied_blk_count.fetch_add(blob_info.pbas.blk_count(), std::memory_order_relaxed); }); + } else { + LOGTRACEMOD(blobmgr, "blob already exists in index table, skip it. shard_id: {} blob_id: {}", + blob_info.shard_id, blob_info.blob_id); } return true; } @@ -354,6 +360,13 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< return folly::makeUnexpected(homestore::ReplServiceError::FAILED); } + auto pg_iter = _pg_map.find(msg_header->pg_id); + if (pg_iter == _pg_map.end()) { + LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later", + msg_header->pg_id); + return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); + } + std::scoped_lock lock_guard(_shard_lock); auto shard_iter = _shard_map.find(msg_header->shard_id); if (shard_iter == _shard_map.end()) { @@ -362,11 +375,22 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); } + homestore::blk_alloc_hints hints; + auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get()); BLOGD(msg_header->shard_id, "n/a", "Picked p_chunk_id={}", hs_shard->sb_->p_chunk_id); - - homestore::blk_alloc_hints hints; hints.chunk_id_hint = hs_shard->sb_->p_chunk_id; + + if (msg_header->blob_id != 0) { + //check if the blob already exists, if yes, return the blk id + auto index_table = d_cast< HS_PG* >(pg_iter->second.get())->index_table_; + auto r = get_blob_from_index_table(index_table, msg_header->shard_id, msg_header->blob_id); + if (r.hasValue()) { + LOGT("Blob has already been persisted, blob_id:{}, shard_id:{}", msg_header->blob_id, msg_header->shard_id); + hints.committed_blk_id = r.value(); + } + } + return hints; } diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 62d593e..fa8a365 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -49,8 +49,9 @@ PGError toPgError(ReplServiceError const& e) { [[fallthrough]]; case ReplServiceError::FAILED: return PGError::UNKNOWN; + default: + return PGError::UNKNOWN; } - return PGError::UNKNOWN; } [[maybe_unused]] static homestore::ReplDev& pg_repl_dev(PG const& pg) { diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 6542e8b..433d6a7 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -318,7 +318,7 @@ void HSHomeObject::local_create_shard(ShardInfo shard_info, homestore::chunk_num auto pg_id = shard_info.placement_group; auto chunk = chunk_selector_->select_specific_chunk(pg_id, v_chunk_id); RELEASE_ASSERT(chunk != nullptr, "chunk selection failed with v_chunk_id: {} in PG: {}", v_chunk_id, pg_id); - } + } else { LOGD("shard {} already exist, skip creating shard", shard_info.id); } // update pg's total_occupied_blk_count HS_PG* hs_pg{nullptr}; @@ -379,17 +379,6 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom local_create_shard(shard_info, v_chunk_id, blkids.chunk_num(), blkids.blk_count()); if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } - // update pg's total_occupied_blk_count - HS_PG* hs_pg{nullptr}; - { - std::shared_lock lock_guard(_pg_lock); - auto iter = _pg_map.find(shard_info.placement_group); - RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); - hs_pg = static_cast< HS_PG* >(iter->second.get()); - } - hs_pg->durable_entities_update([&blkids](auto& de) { - de.total_occupied_blk_count.fetch_add(blkids.blk_count(), std::memory_order_relaxed); - }); LOGI("Commit done for CREATE_SHARD_MSG for shard {}", shard_info.id); break; diff --git a/src/lib/homestore_backend/index_kv.cpp b/src/lib/homestore_backend/index_kv.cpp index d02e941..3428c09 100644 --- a/src/lib/homestore_backend/index_kv.cpp +++ b/src/lib/homestore_backend/index_kv.cpp @@ -40,6 +40,7 @@ HSHomeObject::recover_index_table(homestore::superblk< homestore::index_table_sb return index_table; } +//The bool result indicates if the blob already exists, but if the existing pbas is the same as the new pbas, it will return homestore::btree_status_t::success. std::pair< bool, homestore::btree_status_t > HSHomeObject::add_to_index_table(shared< BlobIndexTable > index_table, const BlobInfo& blob_info) { BlobRouteKey index_key{BlobRoute{blob_info.shard_id, blob_info.blob_id}}; @@ -48,8 +49,17 @@ std::pair< bool, homestore::btree_status_t > HSHomeObject::add_to_index_table(sh &existing_value}; auto status = index_table->put(put_req); if (status != homestore::btree_status_t::success) { + if (existing_value.pbas().is_valid() && existing_value.pbas() == blob_info.pbas) { + LOGT( + "blob already exists, but existing pbas is the same as the new pbas, ignore it, blob_id={}, pbas={}, status={}", + blob_info.blob_id, blob_info.pbas.to_string(), status); + return {true, homestore::btree_status_t::success}; + } if (existing_value.pbas().is_valid() || existing_value.pbas() == tombstone_pbas) { // Check if the blob id already exists in the index or its tombstone. + LOGW( + "blob already exists, and conflict occurs, blob_id={}, existing pbas={}, new pbas={}, status={}", + blob_info.blob_id, existing_value.pbas().to_string(), blob_info.pbas.to_string(), status); return {true, status}; } LOGE("Failed to put to index table error {}", status); diff --git a/src/lib/homestore_backend/replication_message.hpp b/src/lib/homestore_backend/replication_message.hpp index 10bc83d..2acc511 100644 --- a/src/lib/homestore_backend/replication_message.hpp +++ b/src/lib/homestore_backend/replication_message.hpp @@ -67,6 +67,7 @@ struct ReplicationMessageHeader : public BaseMessageHeader BlobManager::AsyncResult< blob_id_t > { // TODO: do we need to update repl_dev metrics? if (err) { - LOGE("Failed to write shard info to blk_id: {}", blk_id.to_string()); + LOGE("Failed to write blob info to blk_id: {}", blk_id.to_string()); return folly::makeUnexpected(BlobError(BlobErrorCode::REPLICATION_ERROR)); } - LOGD("Shard info written to blk_id:{}", blk_id.to_string()); + LOGD("Blob info written to blk_id:{}", blk_id.to_string()); return 0; }) .get(); if (ret.hasError()) { - LOGE("Failed to write shard info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string()); + LOGE("Failed to write blob info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string()); free_allocated_blks(); return WRITE_DATA_ERR; }