From 6c9e9a92dff3fd1e8bf26054150a016e7d06aab4 Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Wed, 24 Jan 2024 22:24:27 -0700 Subject: [PATCH 01/10] replicate createPG message across raft group --- src/include/homeobject/pg_manager.hpp | 3 +- src/lib/homestore_backend/hs_homeobject.hpp | 20 +++ src/lib/homestore_backend/hs_pg_manager.cpp | 170 +++++++++++++++--- .../homestore_backend/replication_message.hpp | 4 +- .../replication_state_machine.cpp | 4 + 5 files changed, 169 insertions(+), 32 deletions(-) diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index 808d9d55..8df7746c 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -9,7 +9,8 @@ namespace homeobject { -ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP); +ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP, PG_ALREADY_EXISTS, + CRC_MISMATCH); struct PGMember { explicit PGMember(peer_id_t _id) : id(_id) {} diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index d04fac5d..488defa8 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -219,7 +219,15 @@ class HSHomeObject : public HomeObjectImpl { private: static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); } + // create pg related + PGManager::NullAsyncResult replicate_create_pg_msg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info); + static std::string serialize_pg_info(const PGInfo& info); + static PGInfo deserialize_pg_info(const char* pg_info_str, size_t size); void add_pg_to_map(unique< HS_PG > hs_pg); + void do_create_pg_message_commit(int64_t lsn, ReplicationMessageHeader& header, homestore::MultiBlkId const& blkids, + sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx); + + // create shard related shard_id_t generate_new_shard_id(pg_id_t pg); uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t); @@ -260,6 +268,18 @@ class HSHomeObject : public HomeObjectImpl { */ void init_cp(); + /** + * @brief Callback function invoked when createPG message is committed on a shard. + * + * @param lsn The logical sequence number of the message. + * @param header The header of the message. + * @param blkids The IDs of the blocks associated with the message. + * @param repl_dev The replication device. + * @param hs_ctx The replication request context. + */ + void on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids, + homestore::ReplDev* repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx); + /** * @brief Callback function invoked when a message is committed on a shard. * diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 3279234b..ca8286b6 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -1,4 +1,5 @@ #include +#include #include #include "hs_homeobject.hpp" #include "replication_state_machine.hpp" @@ -34,6 +35,12 @@ PGError toPgError(ReplServiceError const& e) { return PGError::TIMEOUT; case ReplServiceError::SERVER_NOT_FOUND: return PGError::UNKNOWN_PG; + /* TODO: enable this after add erro type to homestore + case ReplServiceError::ALREADY_EXISTS: + return PGError::PG_ALREADY_EXISTS; + case ReplServiceError::CRC_MISMATCH: + return PGError::CRC_MISMATCH; + */ case ReplServiceError::OK: DEBUG_ASSERT(false, "Should not process OK!"); [[fallthrough]]; @@ -48,32 +55,140 @@ PGError toPgError(ReplServiceError const& e) { } PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) { + auto pg_id = pg_info.id; + { + auto lg = std::shared_lock(_pg_lock); + auto it = _pg_map.find(pg_id); + if (_pg_map.end() != it) return folly::makeUnexpected(PGError::PG_ALREADY_EXISTS); + } + pg_info.replica_set_uuid = boost::uuids::random_generator()(); return hs_repl_service() .create_repl_dev(pg_info.replica_set_uuid, peers) .via(executor_) - .thenValue([this, pg_info = std::move(pg_info)](auto&& v) mutable -> PGManager::NullResult { + .thenValue([this, pg_info = std::move(pg_info)](auto&& v) mutable -> PGManager::NullAsyncResult { if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); } + // we will write a PGHeader to the raft group and commit it when PGHeader is commited on raft + // group. so that all raft members will create the PGinfo and index table for this PG. + return replicate_create_pg_msg(v.value(), std::move(pg_info)); + }); +} + +PGManager::NullAsyncResult HSHomeObject::replicate_create_pg_msg(cshared< homestore::ReplDev > repl_dev, + PGInfo&& pg_info) { + auto serailized_pg_info = HSHomeObject::serialize_pg_info(pg_info); + auto msg_size = sisl::round_up(serailized_pg_info.size(), repl_dev->get_blk_size()); + auto req = repl_result_ctx< PGManager::NullResult >::make(msg_size, 512 /*alignment*/); + + auto buf_ptr = req->hdr_buf_.bytes(); + std::memset(buf_ptr, 0, msg_size); + std::memcpy(buf_ptr, serailized_pg_info.c_str(), serailized_pg_info.size()); + + req->header_.msg_type = ReplicationMessageType::CREATE_PG_MSG; + req->header_.pg_id = pg_info.id; + req->header_.shard_id = 0; + req->header_.payload_size = msg_size; + req->header_.payload_crc = crc32_ieee(init_crc32, buf_ptr, msg_size); + req->header_.seal(); + sisl::blob header; + header.set_bytes(r_cast< uint8_t* >(&req->header_)); + header.set_size(sizeof(req->header_)); + + sisl::sg_list value; + value.size = msg_size; + value.iovs.push_back(iovec(buf_ptr, msg_size)); + // replicate this create pg message to all raft members of this group + repl_dev->async_alloc_write(header, sisl::blob{}, value, req); + return req->result().deferValue([this](auto const& e) -> PGManager::NullAsyncResult { + if (!e) { return folly::makeUnexpected(e.error()); } + return folly::Unit(); + }); +} - // TODO create index table during create shard. - auto index_table = create_index_table(); - auto uuid_str = boost::uuids::to_string(index_table->uuid()); - - auto pg_id = pg_info.id; - auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(v.value()), index_table); - std::scoped_lock lock_guard(index_lock_); - RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found"); - index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table}; - - LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str); - hs_pg->index_table_ = index_table; - // Add to index service, so that it gets cleaned up when index service is shutdown. - homestore::hs()->index_service().add_index_table(index_table); - add_pg_to_map(std::move(hs_pg)); - return folly::Unit(); +void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, + homestore::MultiBlkId const& blkids, homestore::ReplDev* repl_dev, + cintrusive< homestore::repl_req_ctx >& hs_ctx) { + if (hs_ctx != nullptr) { + auto ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(hs_ctx).get(); + do_create_pg_message_commit(lsn, *r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())), + blkids, ctx->hdr_buf_, hs_ctx); + return; + } + + // when homeobject restarts and replay the committed log, hs_ctx will be nullptr + sisl::sg_list value; + value.size = blkids.blk_count() * repl_dev->get_blk_size(); + auto value_buf = iomanager.iobuf_alloc(512, value.size); + value.iovs.push_back(iovec{.iov_base = value_buf, .iov_len = value.size}); + // header will be released when this function returns, but we still need the header when async_read() finished. + auto header_ptr = r_cast< const ReplicationMessageHeader* >(header.cbytes()); + repl_dev->async_read(blkids, value, value.size) + .thenValue([this, lsn, msg_header = *header_ptr, blkids, value](auto&& err) mutable { + if (err) { + LOGW("failed to read data from homestore pba, lsn:{}", lsn); + } else { + sisl::blob value_blob(r_cast< uint8_t* >(value.iovs[0].iov_base), value.size); + do_create_pg_message_commit(lsn, msg_header, blkids, value_blob, nullptr); + } + iomanager.iobuf_free(r_cast< uint8_t* >(value.iovs[0].iov_base)); }); } +void HSHomeObject::do_create_pg_message_commit(int64_t lsn, ReplicationMessageHeader& header, + homestore::MultiBlkId const& blkids, sisl::blob value, + cintrusive< homestore::repl_req_ctx >& hs_ctx) { + repl_result_ctx< PGManager::NullResult >* ctx{nullptr}; + if (hs_ctx != nullptr) { + ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(hs_ctx).get(); + } + + if (header.corrupted()) { + LOGW("replication message header is corrupted with crc error, lsn:{}", lsn); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); } + return; + } + + if (crc32_ieee(init_crc32, value.cbytes(), value.size()) != header.payload_crc) { + // header & value is inconsistent; + LOGW("replication message header is inconsistent with value, lsn:{}", lsn); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); } + return; + } + + auto pg_info = deserialize_pg_info(r_cast< const char* >(value.cbytes()), value.size()); + auto pg_id = pg_info.id; + { + auto lg = std::shared_lock(_pg_lock); + auto it = _pg_map.find(pg_id); + if (_pg_map.end() != it) { + LOGW("PG already exists, lsn:{}", lsn); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::PG_ALREADY_EXISTS)); } + return; + } + } + + // create index table and pg + // TODO create index table during create shard. + auto index_table = create_index_table(); + auto uuid_str = boost::uuids::to_string(index_table->uuid()); + + auto repl_dev = hs_repl_service().get_repl_dev(pg_info.replica_set_uuid); + RELEASE_ASSERT(repl_dev.hasValue(), "Failed to get ReplDev for group_id={}", + boost::uuids::to_string(pg_info.replica_set_uuid)); + auto repl_dev_ptr = repl_dev.value(); + auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev_ptr), index_table); + std::scoped_lock lock_guard(index_lock_); + RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found"); + index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table}; + + LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str); + hs_pg->index_table_ = index_table; + // Add to index service, so that it gets cleaned up when index service is shutdown. + homestore::hs()->index_service().add_index_table(index_table); + add_pg_to_map(std::move(hs_pg)); + if (ctx) ctx->promise_.setValue(folly::Unit()); +} + PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member) { return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP)); @@ -89,16 +204,15 @@ void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) { RELEASE_ASSERT(_pg_map.end() != it1, "Unknown map insert error!"); } -#if 0 -std::string HSHomeObject::serialize_pg_info(PGInfo const& pginfo) { +std::string HSHomeObject::serialize_pg_info(const PGInfo& pginfo) { nlohmann::json j; j["pg_info"]["pg_id_t"] = pginfo.id; j["pg_info"]["repl_uuid"] = boost::uuids::to_string(pginfo.replica_set_uuid); - nlohmann::json members_j = {}; + nlohmann::json members_j{}; for (auto const& member : pginfo.members) { nlohmann::json member_j; - member_j["member_id"] = member.id; + member_j["member_id"] = boost::uuids::to_string(member.id); member_j["name"] = member.name; member_j["priority"] = member.priority; members_j.push_back(member_j); @@ -107,23 +221,21 @@ std::string HSHomeObject::serialize_pg_info(PGInfo const& pginfo) { return j.dump(); } -PGInfo HSHomeObject::deserialize_pg_info(std::string const& json_str) { - auto pg_json = nlohmann::json::parse(json_str); +PGInfo HSHomeObject::deserialize_pg_info(const char* json_str, size_t size) { + auto pg_json = nlohmann::json::parse(json_str, json_str + size); - PGInfo pg_info; - pg_info.id = pg_json["pg_info"]["pg_id_t"].get< pg_id_t >(); + PGInfo pg_info(pg_json["pg_info"]["pg_id_t"].get< pg_id_t >()); pg_info.replica_set_uuid = boost::uuids::string_generator()(pg_json["pg_info"]["repl_uuid"].get< std::string >()); - for (auto const& m : pg_info["pg_info"]["members"]) { - PGMember member; - member.id = m["member_id"].get< pg_id_t >(); + for (auto const& m : pg_json["pg_info"]["members"]) { + auto uuid_str = m["member_id"].get< std::string >(); + PGMember member(boost::uuids::string_generator()(uuid_str)); member.name = m["name"].get< std::string >(); member.priority = m["priority"].get< int32_t >(); pg_info.members.emplace(std::move(member)); } return pg_info; } -#endif void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) { homestore::superblk< pg_info_superblk > pg_sb(_pg_meta_name); diff --git a/src/lib/homestore_backend/replication_message.hpp b/src/lib/homestore_backend/replication_message.hpp index 3fed4d06..c36e81f0 100644 --- a/src/lib/homestore_backend/replication_message.hpp +++ b/src/lib/homestore_backend/replication_message.hpp @@ -7,8 +7,8 @@ namespace homeobject { -VENUM(ReplicationMessageType, uint16_t, CREATE_SHARD_MSG = 0, SEAL_SHARD_MSG = 1, PUT_BLOB_MSG = 2, DEL_BLOB_MSG = 3, - UNKNOWN_MSG = 4); +VENUM(ReplicationMessageType, uint16_t, CREATE_PG_MSG = 0, CREATE_SHARD_MSG = 1, SEAL_SHARD_MSG = 2, PUT_BLOB_MSG = 3, + DEL_BLOB_MSG = 4, UNKNOWN_MSG = 5); // magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum' static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34; diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index af671f10..6dfd7448 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -7,6 +7,10 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c LOGI("applying raft log commit with lsn:{}", lsn); const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); switch (msg_header->msg_type) { + case ReplicationMessageType::CREATE_PG_MSG: { + home_object_->on_create_pg_message_commit(lsn, header, pbas, repl_dev(), ctx); + break; + } case ReplicationMessageType::CREATE_SHARD_MSG: case ReplicationMessageType::SEAL_SHARD_MSG: { home_object_->on_shard_message_commit(lsn, header, pbas, repl_dev(), ctx); From 48ac71d485b191e82cc14be039e28698d319bc0e Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Thu, 25 Jan 2024 04:35:25 -0700 Subject: [PATCH 02/10] update --- src/lib/homestore_backend/hs_homeobject.hpp | 2 +- src/lib/homestore_backend/hs_pg_manager.cpp | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 488defa8..63b42bba 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -220,7 +220,7 @@ class HSHomeObject : public HomeObjectImpl { static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); } // create pg related - PGManager::NullAsyncResult replicate_create_pg_msg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info); + PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info); static std::string serialize_pg_info(const PGInfo& info); static PGInfo deserialize_pg_info(const char* pg_info_str, size_t size); void add_pg_to_map(unique< HS_PG > hs_pg); diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index ca8286b6..e294dc54 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -70,13 +70,12 @@ PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); } // we will write a PGHeader to the raft group and commit it when PGHeader is commited on raft // group. so that all raft members will create the PGinfo and index table for this PG. - return replicate_create_pg_msg(v.value(), std::move(pg_info)); + return do_create_pg(v.value(), std::move(pg_info)); }); } -PGManager::NullAsyncResult HSHomeObject::replicate_create_pg_msg(cshared< homestore::ReplDev > repl_dev, - PGInfo&& pg_info) { - auto serailized_pg_info = HSHomeObject::serialize_pg_info(pg_info); +PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info) { + auto serailized_pg_info = serialize_pg_info(pg_info); auto msg_size = sisl::round_up(serailized_pg_info.size(), repl_dev->get_blk_size()); auto req = repl_result_ctx< PGManager::NullResult >::make(msg_size, 512 /*alignment*/); @@ -109,7 +108,7 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he homestore::MultiBlkId const& blkids, homestore::ReplDev* repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx) { if (hs_ctx != nullptr) { - auto ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(hs_ctx).get(); + auto ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(hs_ctx); do_create_pg_message_commit(lsn, *r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())), blkids, ctx->hdr_buf_, hs_ctx); return; @@ -120,7 +119,6 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he value.size = blkids.blk_count() * repl_dev->get_blk_size(); auto value_buf = iomanager.iobuf_alloc(512, value.size); value.iovs.push_back(iovec{.iov_base = value_buf, .iov_len = value.size}); - // header will be released when this function returns, but we still need the header when async_read() finished. auto header_ptr = r_cast< const ReplicationMessageHeader* >(header.cbytes()); repl_dev->async_read(blkids, value, value.size) .thenValue([this, lsn, msg_header = *header_ptr, blkids, value](auto&& err) mutable { @@ -143,14 +141,14 @@ void HSHomeObject::do_create_pg_message_commit(int64_t lsn, ReplicationMessageHe } if (header.corrupted()) { - LOGW("replication message header is corrupted with crc error, lsn:{}", lsn); + LOGE("create PG message header is corrupted , lsn:{}", lsn); if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); } return; } if (crc32_ieee(init_crc32, value.cbytes(), value.size()) != header.payload_crc) { // header & value is inconsistent; - LOGW("replication message header is inconsistent with value, lsn:{}", lsn); + LOGE("create PG message header is inconsistent with value, lsn:{}", lsn); if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); } return; } @@ -161,7 +159,7 @@ void HSHomeObject::do_create_pg_message_commit(int64_t lsn, ReplicationMessageHe auto lg = std::shared_lock(_pg_lock); auto it = _pg_map.find(pg_id); if (_pg_map.end() != it) { - LOGW("PG already exists, lsn:{}", lsn); + LOGW("PG already exists, lsn:{}, pg_id {}", lsn, pg_id); if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::PG_ALREADY_EXISTS)); } return; } From ec557fa61be2ef2484bca7ed7fa55dec9d57ca04 Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Fri, 26 Jan 2024 08:29:09 -0700 Subject: [PATCH 03/10] fix comments --- src/include/homeobject/pg_manager.hpp | 3 +-- src/lib/homestore_backend/hs_pg_manager.cpp | 25 +++++++-------------- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index 8df7746c..35fa39b5 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -9,8 +9,7 @@ namespace homeobject { -ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP, PG_ALREADY_EXISTS, - CRC_MISMATCH); +ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP, CRC_MISMATCH); struct PGMember { explicit PGMember(peer_id_t _id) : id(_id) {} diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index e294dc54..b219812e 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -36,8 +36,6 @@ PGError toPgError(ReplServiceError const& e) { case ReplServiceError::SERVER_NOT_FOUND: return PGError::UNKNOWN_PG; /* TODO: enable this after add erro type to homestore - case ReplServiceError::ALREADY_EXISTS: - return PGError::PG_ALREADY_EXISTS; case ReplServiceError::CRC_MISMATCH: return PGError::CRC_MISMATCH; */ @@ -56,11 +54,7 @@ PGError toPgError(ReplServiceError const& e) { PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) { auto pg_id = pg_info.id; - { - auto lg = std::shared_lock(_pg_lock); - auto it = _pg_map.find(pg_id); - if (_pg_map.end() != it) return folly::makeUnexpected(PGError::PG_ALREADY_EXISTS); - } + if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) return folly::Unit(); pg_info.replica_set_uuid = boost::uuids::random_generator()(); return hs_repl_service() @@ -68,8 +62,8 @@ PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< .via(executor_) .thenValue([this, pg_info = std::move(pg_info)](auto&& v) mutable -> PGManager::NullAsyncResult { if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); } - // we will write a PGHeader to the raft group and commit it when PGHeader is commited on raft - // group. so that all raft members will create the PGinfo and index table for this PG. + // we will write a PGHeader across the raft group and when it is committed + // all raft members will create PGinfo and index table for this PG. return do_create_pg(v.value(), std::move(pg_info)); }); } @@ -155,14 +149,11 @@ void HSHomeObject::do_create_pg_message_commit(int64_t lsn, ReplicationMessageHe auto pg_info = deserialize_pg_info(r_cast< const char* >(value.cbytes()), value.size()); auto pg_id = pg_info.id; - { - auto lg = std::shared_lock(_pg_lock); - auto it = _pg_map.find(pg_id); - if (_pg_map.end() != it) { - LOGW("PG already exists, lsn:{}, pg_id {}", lsn, pg_id); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::PG_ALREADY_EXISTS)); } - return; - } + + if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) { + LOGW("PG already exists, lsn:{}, pg_id {}", lsn, pg_id); + if (ctx) { ctx->promise_.setValue(folly::Unit()); } + return; } // create index table and pg From ed7da9765f430c22d1445b11cd661e6092cd0f0d Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Sat, 27 Jan 2024 06:12:27 -0700 Subject: [PATCH 04/10] fix error --- src/lib/homestore_backend/hs_pg_manager.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index b219812e..2191230e 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -91,7 +91,7 @@ PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDe value.size = msg_size; value.iovs.push_back(iovec(buf_ptr, msg_size)); // replicate this create pg message to all raft members of this group - repl_dev->async_alloc_write(header, sisl::blob{}, value, req); + repl_dev->async_alloc_write(header, sisl::blob{buf_ptr, (uint32_t)msg_size}, value, req); return req->result().deferValue([this](auto const& e) -> PGManager::NullAsyncResult { if (!e) { return folly::makeUnexpected(e.error()); } return folly::Unit(); @@ -102,9 +102,8 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he homestore::MultiBlkId const& blkids, homestore::ReplDev* repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx) { if (hs_ctx != nullptr) { - auto ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(hs_ctx); do_create_pg_message_commit(lsn, *r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())), - blkids, ctx->hdr_buf_, hs_ctx); + blkids, hs_ctx->key, hs_ctx); return; } @@ -130,7 +129,11 @@ void HSHomeObject::do_create_pg_message_commit(int64_t lsn, ReplicationMessageHe homestore::MultiBlkId const& blkids, sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< PGManager::NullResult >* ctx{nullptr}; - if (hs_ctx != nullptr) { + // if this is a follower, the promise_ of hs_ctx will be nullptr, so need to setValue. + // but we can not judge whether this is leader from hs_ctx directly at this time. + // since repl_req_ctx::value is only applicable for leader, so we take it as a criteria to determine + // it is a leader or not. + if (hs_ctx && hs_ctx->value.size > 0) { ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(hs_ctx).get(); } @@ -149,7 +152,6 @@ void HSHomeObject::do_create_pg_message_commit(int64_t lsn, ReplicationMessageHe auto pg_info = deserialize_pg_info(r_cast< const char* >(value.cbytes()), value.size()); auto pg_id = pg_info.id; - if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) { LOGW("PG already exists, lsn:{}, pg_id {}", lsn, pg_id); if (ctx) { ctx->promise_.setValue(folly::Unit()); } From 876adf42ed0c09d7826a27b2d1c0c2c59f6514c4 Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Sat, 27 Jan 2024 08:25:04 -0700 Subject: [PATCH 05/10] fix shard message error --- src/lib/homestore_backend/hs_shard_manager.cpp | 13 ++++++++----- src/lib/homestore_backend/tests/hs_shard_tests.cpp | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index c1d3c4fd..79940239 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -95,7 +95,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow value.size = msg_size; value.iovs.push_back(iovec(buf_ptr, msg_size)); // replicate this create shard message to PG members; - repl_dev->async_alloc_write(header, sisl::blob{}, value, req); + repl_dev->async_alloc_write(header, sisl::blob{buf_ptr, (uint32_t)msg_size}, value, req); return req->result().deferValue([this](auto const& e) -> ShardManager::Result< ShardInfo > { if (!e) return folly::makeUnexpected(e.error()); return e.value(); @@ -138,7 +138,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const value.iovs.push_back(iovec(buf_ptr, msg_size)); // replicate this seal shard message to PG members; - repl_dev->async_alloc_write(header, sisl::blob{}, value, req); + repl_dev->async_alloc_write(header, sisl::blob{buf_ptr, (uint32_t)msg_size}, value, req); return req->result(); } @@ -147,9 +147,8 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header cintrusive< homestore::repl_req_ctx >& hs_ctx) { if (hs_ctx != nullptr) { - auto ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx); do_shard_message_commit(lsn, *r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())), - blkids, ctx->hdr_buf_, hs_ctx); + blkids, hs_ctx->key, hs_ctx); return; } @@ -180,7 +179,11 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader homestore::MultiBlkId const& blkids, sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr}; - if (hs_ctx != nullptr) { + // if this is a follower, the promise_ of hs_ctx will be nullptr, so need to setValue. + // but we can not judge whether this is leader from hs_ctx directly at this time. + // since repl_req_ctx::value is only applicable for leader, so we take it as a criteria to determine + // it is a leader or not. + if (hs_ctx && hs_ctx->value.size > 0) { ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get(); } diff --git a/src/lib/homestore_backend/tests/hs_shard_tests.cpp b/src/lib/homestore_backend/tests/hs_shard_tests.cpp index caece480..c8a9a6c8 100644 --- a/src/lib/homestore_backend/tests/hs_shard_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_shard_tests.cpp @@ -108,7 +108,7 @@ TEST_F(TestFixture, MockSealShard) { sisl::sg_list value; value.size = msg_size; value.iovs.push_back(iovec(buf_ptr, msg_size)); - repl_dev->async_alloc_write(header, sisl::blob{}, value, req); + repl_dev->async_alloc_write(header, sisl::blob{buf_ptr, (uint32_t)msg_size}, value, req); auto info = req->result().get(); EXPECT_TRUE(info); EXPECT_TRUE(info.value().id == shard_info.id); From 66c0d0f0deb3be421bfbf6501b4389875c04aa44 Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Sun, 28 Jan 2024 22:43:10 -0700 Subject: [PATCH 06/10] fix shard and blol error at follower side --- src/lib/homestore_backend/hs_blob_manager.cpp | 10 ++++------ src/lib/homestore_backend/hs_pg_manager.cpp | 6 +----- src/lib/homestore_backend/hs_shard_manager.cpp | 6 +----- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index 0b37f32f..9c8d98b7 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -145,7 +145,7 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr}; - if (hs_ctx != nullptr) { + if (hs_ctx && hs_ctx->is_proposer) { ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get(); } @@ -290,7 +290,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, homestore::blk_alloc_hints HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr}; - if (hs_ctx != nullptr) { + if (hs_ctx && hs_ctx->is_proposer) { ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get(); } @@ -351,7 +351,7 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr}; - if (hs_ctx != nullptr) { + if (hs_ctx && hs_ctx->is_proposer) { ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get(); } @@ -392,9 +392,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis } auto& multiBlks = r.value(); - if (multiBlks != tombstone_pbas) { - repl_dev->async_free_blks(lsn, multiBlks); - } + if (multiBlks != tombstone_pbas) { repl_dev->async_free_blks(lsn, multiBlks); } if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); } } diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 2191230e..b428b0b0 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -129,11 +129,7 @@ void HSHomeObject::do_create_pg_message_commit(int64_t lsn, ReplicationMessageHe homestore::MultiBlkId const& blkids, sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< PGManager::NullResult >* ctx{nullptr}; - // if this is a follower, the promise_ of hs_ctx will be nullptr, so need to setValue. - // but we can not judge whether this is leader from hs_ctx directly at this time. - // since repl_req_ctx::value is only applicable for leader, so we take it as a criteria to determine - // it is a leader or not. - if (hs_ctx && hs_ctx->value.size > 0) { + if (hs_ctx && hs_ctx->is_proposer) { ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(hs_ctx).get(); } diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 79940239..4c46ba63 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -179,11 +179,7 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader homestore::MultiBlkId const& blkids, sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr}; - // if this is a follower, the promise_ of hs_ctx will be nullptr, so need to setValue. - // but we can not judge whether this is leader from hs_ctx directly at this time. - // since repl_req_ctx::value is only applicable for leader, so we take it as a criteria to determine - // it is a leader or not. - if (hs_ctx && hs_ctx->value.size > 0) { + if (hs_ctx && hs_ctx->is_proposer) { ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get(); } From 59f6b8f0feb44872972250424e4d3fe70cb0474f Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Mon, 29 Jan 2024 19:36:47 -0700 Subject: [PATCH 07/10] add FIX_ME --- src/lib/homestore_backend/hs_pg_manager.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index b428b0b0..41dd3abd 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -64,6 +64,8 @@ PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); } // we will write a PGHeader across the raft group and when it is committed // all raft members will create PGinfo and index table for this PG. + + // FIXME:https://github.com/eBay/HomeObject/pull/136#discussion_r1470504271 return do_create_pg(v.value(), std::move(pg_info)); }); } From 4f532234d77bcf40071622f0dae821eb7e25dc30 Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Tue, 30 Jan 2024 06:58:51 -0700 Subject: [PATCH 08/10] put pginfo in header --- src/lib/homestore_backend/hs_homeobject.hpp | 11 +-- src/lib/homestore_backend/hs_pg_manager.cpp | 90 +++++++------------ .../homestore_backend/hs_shard_manager.cpp | 2 +- .../homestore_backend/replication_message.hpp | 4 +- .../replication_state_machine.cpp | 2 +- 5 files changed, 39 insertions(+), 70 deletions(-) diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 63b42bba..55ce60ee 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -222,10 +222,8 @@ class HSHomeObject : public HomeObjectImpl { // create pg related PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info); static std::string serialize_pg_info(const PGInfo& info); - static PGInfo deserialize_pg_info(const char* pg_info_str, size_t size); + static PGInfo deserialize_pg_info(const unsigned char* pg_info_str, size_t size); void add_pg_to_map(unique< HS_PG > hs_pg); - void do_create_pg_message_commit(int64_t lsn, ReplicationMessageHeader& header, homestore::MultiBlkId const& blkids, - sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx); // create shard related shard_id_t generate_new_shard_id(pg_id_t pg); @@ -273,12 +271,11 @@ class HSHomeObject : public HomeObjectImpl { * * @param lsn The logical sequence number of the message. * @param header The header of the message. - * @param blkids The IDs of the blocks associated with the message. * @param repl_dev The replication device. * @param hs_ctx The replication request context. */ - void on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids, - homestore::ReplDev* repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx); + void on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, shared< homestore::ReplDev > repl_dev, + cintrusive< homestore::repl_req_ctx >& hs_ctx); /** * @brief Callback function invoked when a message is committed on a shard. @@ -290,7 +287,7 @@ class HSHomeObject : public HomeObjectImpl { * @param hs_ctx The replication request context. */ void on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids, - homestore::ReplDev* repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx); + shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx); /** * @brief Retrieves the chunk number associated with the given shard ID. diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 41dd3abd..84a816ec 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -7,6 +7,7 @@ using namespace homestore; namespace homeobject { +static constexpr uint64_t io_align{512}; PGError toPgError(ReplServiceError const& e) { switch (e) { @@ -72,83 +73,58 @@ PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info) { auto serailized_pg_info = serialize_pg_info(pg_info); - auto msg_size = sisl::round_up(serailized_pg_info.size(), repl_dev->get_blk_size()); - auto req = repl_result_ctx< PGManager::NullResult >::make(msg_size, 512 /*alignment*/); - - auto buf_ptr = req->hdr_buf_.bytes(); - std::memset(buf_ptr, 0, msg_size); - std::memcpy(buf_ptr, serailized_pg_info.c_str(), serailized_pg_info.size()); - - req->header_.msg_type = ReplicationMessageType::CREATE_PG_MSG; - req->header_.pg_id = pg_info.id; - req->header_.shard_id = 0; - req->header_.payload_size = msg_size; - req->header_.payload_crc = crc32_ieee(init_crc32, buf_ptr, msg_size); - req->header_.seal(); - sisl::blob header; - header.set_bytes(r_cast< uint8_t* >(&req->header_)); - header.set_size(sizeof(req->header_)); - - sisl::sg_list value; - value.size = msg_size; - value.iovs.push_back(iovec(buf_ptr, msg_size)); + auto info_size = serailized_pg_info.size(); + + auto total_size = sizeof(ReplicationMessageHeader) + info_size; + auto header = sisl::blob(iomanager.iobuf_alloc(total_size, io_align), total_size); + ReplicationMessageHeader repl_msg_header; + repl_msg_header.msg_type = ReplicationMessageType::CREATE_PG_MSG; + repl_msg_header.payload_size = info_size; + repl_msg_header.payload_crc = + crc32_ieee(init_crc32, r_cast< const unsigned char* >(serailized_pg_info.c_str()), info_size); + repl_msg_header.seal(); + std::memcpy(header.bytes(), &repl_msg_header, sizeof(ReplicationMessageHeader)); + std::memcpy(header.bytes() + sizeof(ReplicationMessageHeader), serailized_pg_info.c_str(), info_size); + + // we do not need any hdr_buf_ , since we put everything in header blob + auto req = repl_result_ctx< PGManager::NullResult >::make(0, io_align); + // replicate this create pg message to all raft members of this group - repl_dev->async_alloc_write(header, sisl::blob{buf_ptr, (uint32_t)msg_size}, value, req); - return req->result().deferValue([this](auto const& e) -> PGManager::NullAsyncResult { + repl_dev->async_alloc_write(header, sisl::blob{}, sisl::sg_list{}, req); + return req->result().deferValue([header = std::move(header)](auto const& e) -> PGManager::NullAsyncResult { + iomanager.iobuf_free(const_cast< uint8_t* >(header.cbytes())); if (!e) { return folly::makeUnexpected(e.error()); } return folly::Unit(); }); } void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, - homestore::MultiBlkId const& blkids, homestore::ReplDev* repl_dev, - cintrusive< homestore::repl_req_ctx >& hs_ctx) { - if (hs_ctx != nullptr) { - do_create_pg_message_commit(lsn, *r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())), - blkids, hs_ctx->key, hs_ctx); - return; - } - - // when homeobject restarts and replay the committed log, hs_ctx will be nullptr - sisl::sg_list value; - value.size = blkids.blk_count() * repl_dev->get_blk_size(); - auto value_buf = iomanager.iobuf_alloc(512, value.size); - value.iovs.push_back(iovec{.iov_base = value_buf, .iov_len = value.size}); - auto header_ptr = r_cast< const ReplicationMessageHeader* >(header.cbytes()); - repl_dev->async_read(blkids, value, value.size) - .thenValue([this, lsn, msg_header = *header_ptr, blkids, value](auto&& err) mutable { - if (err) { - LOGW("failed to read data from homestore pba, lsn:{}", lsn); - } else { - sisl::blob value_blob(r_cast< uint8_t* >(value.iovs[0].iov_base), value.size); - do_create_pg_message_commit(lsn, msg_header, blkids, value_blob, nullptr); - } - iomanager.iobuf_free(r_cast< uint8_t* >(value.iovs[0].iov_base)); - }); -} - -void HSHomeObject::do_create_pg_message_commit(int64_t lsn, ReplicationMessageHeader& header, - homestore::MultiBlkId const& blkids, sisl::blob value, + shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< PGManager::NullResult >* ctx{nullptr}; if (hs_ctx && hs_ctx->is_proposer) { ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(hs_ctx).get(); } - if (header.corrupted()) { + ReplicationMessageHeader* msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())); + + if (msg_header->corrupted()) { LOGE("create PG message header is corrupted , lsn:{}", lsn); if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); } return; } - if (crc32_ieee(init_crc32, value.cbytes(), value.size()) != header.payload_crc) { + auto serailized_pg_info_buf = header.cbytes() + sizeof(ReplicationMessageHeader); + const auto serailized_pg_info_size = header.size() - sizeof(ReplicationMessageHeader); + + if (crc32_ieee(init_crc32, serailized_pg_info_buf, serailized_pg_info_size) != msg_header->payload_crc) { // header & value is inconsistent; LOGE("create PG message header is inconsistent with value, lsn:{}", lsn); if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); } return; } - auto pg_info = deserialize_pg_info(r_cast< const char* >(value.cbytes()), value.size()); + auto pg_info = deserialize_pg_info(serailized_pg_info_buf, serailized_pg_info_size); auto pg_id = pg_info.id; if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) { LOGW("PG already exists, lsn:{}, pg_id {}", lsn, pg_id); @@ -161,11 +137,7 @@ void HSHomeObject::do_create_pg_message_commit(int64_t lsn, ReplicationMessageHe auto index_table = create_index_table(); auto uuid_str = boost::uuids::to_string(index_table->uuid()); - auto repl_dev = hs_repl_service().get_repl_dev(pg_info.replica_set_uuid); - RELEASE_ASSERT(repl_dev.hasValue(), "Failed to get ReplDev for group_id={}", - boost::uuids::to_string(pg_info.replica_set_uuid)); - auto repl_dev_ptr = repl_dev.value(); - auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev_ptr), index_table); + auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev), index_table); std::scoped_lock lock_guard(index_lock_); RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found"); index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table}; @@ -210,7 +182,7 @@ std::string HSHomeObject::serialize_pg_info(const PGInfo& pginfo) { return j.dump(); } -PGInfo HSHomeObject::deserialize_pg_info(const char* json_str, size_t size) { +PGInfo HSHomeObject::deserialize_pg_info(const unsigned char* json_str, size_t size) { auto pg_json = nlohmann::json::parse(json_str, json_str + size); PGInfo pg_info(pg_json["pg_info"]["pg_id_t"].get< pg_id_t >()); diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 4c46ba63..65772928 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -143,7 +143,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const } void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids, - homestore::ReplDev* repl_dev, + shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx) { if (hs_ctx != nullptr) { diff --git a/src/lib/homestore_backend/replication_message.hpp b/src/lib/homestore_backend/replication_message.hpp index c36e81f0..2a2cc701 100644 --- a/src/lib/homestore_backend/replication_message.hpp +++ b/src/lib/homestore_backend/replication_message.hpp @@ -20,8 +20,8 @@ struct ReplicationMessageHeader { uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC}; uint32_t protocol_version{HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1}; ReplicationMessageType msg_type; // message type - pg_id_t pg_id; - shard_id_t shard_id; + pg_id_t pg_id{0}; + shard_id_t shard_id{0}; uint32_t payload_size; uint32_t payload_crc; uint8_t reserved_pad[4]{}; diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 6dfd7448..7af6d932 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -8,7 +8,7 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); switch (msg_header->msg_type) { case ReplicationMessageType::CREATE_PG_MSG: { - home_object_->on_create_pg_message_commit(lsn, header, pbas, repl_dev(), ctx); + home_object_->on_create_pg_message_commit(lsn, header, repl_dev(), ctx); break; } case ReplicationMessageType::CREATE_SHARD_MSG: From 4c75d29aaacc3f4464c98dc6d685dd511c6749d2 Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Fri, 2 Feb 2024 00:50:11 -0700 Subject: [PATCH 09/10] using latest homestore --- conanfile.py | 2 +- src/include/homeobject/pg_manager.hpp | 3 ++- src/lib/homestore_backend/hs_homeobject.cpp | 5 +++-- src/lib/homestore_backend/hs_pg_manager.cpp | 4 ++++ .../replication_state_machine.cpp | 6 ++++++ .../replication_state_machine.hpp | 15 +++++++++++++++ 6 files changed, 31 insertions(+), 4 deletions(-) diff --git a/conanfile.py b/conanfile.py index 51938371..8d9f3968 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "1.0.4" + version = "1.0.5" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" topics = ("ebay") diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index 35fa39b5..400f7ee7 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -9,7 +9,8 @@ namespace homeobject { -ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP, CRC_MISMATCH); +ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP, CRC_MISMATCH, + NO_SPACE_LEFT, DRIVE_WRITE_ERROR); struct PGMember { explicit PGMember(peer_id_t _id) : id(_id) {} diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index 24162700..3842148b 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -126,8 +126,9 @@ void HSHomeObject::init_homestore() { HomeStore::instance()->format_and_start({ {HS_SERVICE::META, hs_format_params{.size_pct = 5.0}}, - {HS_SERVICE::LOG_REPLICATED, hs_format_params{.size_pct = 10.0}}, - {HS_SERVICE::LOG_LOCAL, hs_format_params{.size_pct = 0.1}}, // TODO: Remove this after HS disables LOG_LOCAL + {HS_SERVICE::LOG_REPLICATED, hs_format_params{.size_pct = 10.0, .chunk_size = 32 * Mi}}, + {HS_SERVICE::LOG_LOCAL, + hs_format_params{.size_pct = 0.1, .chunk_size = 32 * Mi}}, // TODO: Remove this after HS disables LOG_LOCAL {HS_SERVICE::REPLICATION, hs_format_params{.size_pct = 79.0, .num_chunks = 65000, diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 84a816ec..e47a193f 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -36,6 +36,10 @@ PGError toPgError(ReplServiceError const& e) { return PGError::TIMEOUT; case ReplServiceError::SERVER_NOT_FOUND: return PGError::UNKNOWN_PG; + case ReplServiceError::NO_SPACE_LEFT: + return PGError::NO_SPACE_LEFT; + case ReplServiceError::DRIVE_WRITE_ERROR: + return PGError::DRIVE_WRITE_ERROR; /* TODO: enable this after add erro type to homestore case ReplServiceError::CRC_MISMATCH: return PGError::CRC_MISMATCH; diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 7af6d932..8331eb6a 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -44,6 +44,12 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const&, sisl:: LOGI("on_rollback with lsn:{}", lsn); } +void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) { + LOGE("on_error with :{}, lsn {}", error, ctx->lsn); + // TODO:: block is already freeed at homestore side, handle error if necessay. +} + homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) { const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); switch (msg_header->msg_type) { diff --git a/src/lib/homestore_backend/replication_state_machine.hpp b/src/lib/homestore_backend/replication_state_machine.hpp index 456270ba..65bcd29a 100644 --- a/src/lib/homestore_backend/replication_state_machine.hpp +++ b/src/lib/homestore_backend/replication_state_machine.hpp @@ -8,6 +8,8 @@ namespace homeobject { class HSHomeObject; +using homestore::repl_req_ctx; +using homestore::ReplServiceError; struct ho_repl_ctx : public homestore::repl_req_ctx { ReplicationMessageHeader header_; @@ -92,6 +94,19 @@ class ReplicationStateMachine : public homestore::ReplDevListener { void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, cintrusive< homestore::repl_req_ctx >& ctx) override; + /// @brief Called when the async_alloc_write call failed to initiate replication + /// + /// Called only on the node which called async_alloc_write + /// + /// + /// NOTE: Listener should do the free any resources created as part of pre-commit. + /// + /// @param header - Header originally passed with ReplDev::async_alloc_write() api + /// @param key - Key originally passed with ReplDev::async_alloc_write() api + /// @param ctx - Context passed as part of the ReplDev::async_alloc_write() api + void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx); + /// @brief Called when replication module is trying to allocate a block to write the value /// /// This function can be called both on leader and follower when it is trying to allocate a block to write the From 4439fec288e9c59758a47acc642c8fa29d62c7ac Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Mon, 5 Feb 2024 18:02:01 -0700 Subject: [PATCH 10/10] update --- src/lib/homestore_backend/hs_homeobject.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index 3842148b..75f244a1 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -126,9 +126,7 @@ void HSHomeObject::init_homestore() { HomeStore::instance()->format_and_start({ {HS_SERVICE::META, hs_format_params{.size_pct = 5.0}}, - {HS_SERVICE::LOG_REPLICATED, hs_format_params{.size_pct = 10.0, .chunk_size = 32 * Mi}}, - {HS_SERVICE::LOG_LOCAL, - hs_format_params{.size_pct = 0.1, .chunk_size = 32 * Mi}}, // TODO: Remove this after HS disables LOG_LOCAL + {HS_SERVICE::LOG, hs_format_params{.size_pct = 10.0, .chunk_size = 32 * Mi}}, {HS_SERVICE::REPLICATION, hs_format_params{.size_pct = 79.0, .num_chunks = 65000,