diff --git a/conanfile.py b/conanfile.py index 00edf3d..4d79d90 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "2.1.13" + version = "2.1.17" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" @@ -49,7 +49,7 @@ def build_requirements(self): def requirements(self): self.requires("sisl/[^12.2]@oss/master", transitive_headers=True) - self.requires("homestore/[^6.4]@oss/master") + self.requires("homestore/[^6.5.19]@oss/master") self.requires("iomgr/[^11.3]@oss/master") self.requires("lz4/1.9.4", override=True) self.requires("openssl/3.3.1", override=True) diff --git a/src/lib/homestore_backend/heap_chunk_selector.h b/src/lib/homestore_backend/heap_chunk_selector.h index cbcbbd2..c0a84b0 100644 --- a/src/lib/homestore_backend/heap_chunk_selector.h +++ b/src/lib/homestore_backend/heap_chunk_selector.h @@ -151,6 +151,18 @@ class HeapChunkSelector : public homestore::ChunkSelector { uint32_t get_chunk_size() const; + /** + * @brief Returns the number of disks we seen + * + * Warning : calling this before HS fully start might getting wrong result. + * + * This function returns the number of disks the chunk selector seen. + * It should be the accurate source that how many disks in the system for data. + * If a disk is down in degraded mode, it wont be load and no chunk will be + * added into selector. + */ + uint32_t total_disks() const { return m_per_dev_heap.size(); } + private: void add_chunk_internal(const chunk_num_t, bool add_to_heap = true); diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index 6ffadc9..e5da7ef 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -101,7 +101,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s if (!repl_dev->is_leader()) { LOGW("failed to put blob for pg [{}], shard [{}], not leader", pg_id, shard.id); - return folly::makeUnexpected(BlobErrorCode::NOT_LEADER); + return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id())); } // Create a put_blob request which allocates for header, key and blob_header, user_key. Data sgs are added later @@ -385,7 +385,7 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo if (!repl_dev->is_leader()) { LOGW("failed to del blob for pg [{}], shard [{}], blob_id [{}], not leader", pg_id, shard.id, blob_id); - return folly::makeUnexpected(BlobErrorCode::NOT_LEADER); + return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id())); } // Create an unaligned header request unaligned diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index 305f5d7..039a4b9 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -60,6 +60,10 @@ class HSReplApplication : public homestore::ReplApplication { return it->second; } + void on_repl_devs_init_completed() override { + _home_object->on_replica_restart(); + } + std::pair< std::string, uint16_t > lookup_peer(homestore::replica_id_t uuid) const override { std::string endpoint; // for folly::uri to parse correctly, we need to add "http://" prefix @@ -195,13 +199,14 @@ void HSHomeObject::init_homestore() { .chunk_sel_type = chunk_selector_type_t::CUSTOM}}, }); } - + // We dont have any repl dev now, explicitly call init_completed_cb() where we register PG/Shard meta types. + repl_app->on_repl_devs_init_completed(); // Create a superblock that contains our SvcId auto svc_sb = homestore::superblk< svc_info_superblk_t >(_svc_meta_name); svc_sb.create(sizeof(svc_info_superblk_t)); svc_sb->svc_id_ = _our_id; svc_sb.write(); - on_replica_restart(); + } else { RELEASE_ASSERT(!_our_id.is_nil(), "No SvcId read after HomeStore recovery!"); auto const new_id = app->discover_svcid(_our_id); @@ -313,6 +318,7 @@ HomeObjectStats HSHomeObject::_get_stats() const { stats.num_open_shards = num_open_shards; stats.avail_open_shards = chunk_selector()->total_chunks() - num_open_shards; + stats.num_disks = chunk_selector()->total_disks(); return stats; } diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 4aabff1..4f3be36 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -328,8 +328,8 @@ class HSHomeObject : public HomeObjectImpl { std::vector< ShardEntry > shard_list_{0}; - objId cur_obj_id_{1, 0}; - uint64_t cur_shard_idx_{0}; + objId cur_obj_id_{0, 0}; + int64_t cur_shard_idx_{-1}; std::vector cur_blob_list_{0}; uint64_t cur_start_blob_idx_{0}; uint64_t cur_batch_blob_count_{0}; diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp index 8ffd0dd..f0afa8e 100644 --- a/src/lib/homestore_backend/pg_blob_iterator.cpp +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -64,7 +64,7 @@ objId HSHomeObject::PGBlobIterator::expected_next_obj_id() { return objId(cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id + 1); } //next shard - if (cur_shard_idx_ < shard_list_.size() - 1) { + if (cur_shard_idx_ < static_cast< int64_t >(shard_list_.size() - 1)) { auto next_shard_seq_num = shard_list_[cur_shard_idx_ + 1].info.id & 0xFFFFFFFFFFFF; return objId(next_shard_seq_num, 0); } @@ -103,7 +103,7 @@ bool HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& m shard_ids.push_back(shard.info.id); } - auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg->durable_entities().blob_sequence_num, + auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg_info.size, pg_info.chunk_size, pg->durable_entities().blob_sequence_num, pg->shard_sequence_num_, &members, &shard_ids); builder_.FinishSizePrefixed(pg_entry); @@ -212,7 +212,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe } sisl::io_blob_safe blob; - auto retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry); + uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry); for (int i = 0; i < retries; i++) { auto result = load_blob_data(info, state).get(); if (result.hasError() && result.error().code == BlobErrorCode::READ_FAILED) { @@ -241,7 +241,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe builder_.FinishSizePrefixed( CreateResyncBlobDataBatchDirect(builder_, &blob_entries, end_of_shard)); - LOGD("create blobs snapshot data batch: shard_id={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}", + LOGD("create blobs snapshot data batch: shard_seq_num={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}", cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id, total_bytes, blob_entries.size(), end_of_shard); pack_resync_message(data_blob, SyncMessageType::SHARD_BATCH); diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index be1672f..9a12f05 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -92,7 +92,7 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, } } -void ReplicationStateMachine::on_restart() { home_object_->on_replica_restart(); } +void ReplicationStateMachine::on_restart() { LOGD("ReplicationStateMachine::on_restart");} void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, cintrusive< repl_req_ctx >& ctx) { @@ -231,16 +231,18 @@ std::shared_ptr< homestore::snapshot_context > ReplicationStateMachine::last_sna return snp; } -int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snapshot_context > context, - std::shared_ptr< homestore::snapshot_data > snp_data) { +int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context, + std::shared_ptr< homestore::snapshot_obj > snp_obj) { HSHomeObject::PGBlobIterator* pg_iter = nullptr; auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); - if (snp_data->user_ctx == nullptr) { + if (snp_obj->user_ctx == nullptr) { // Create the pg blob iterator for the first time. pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id(), context->get_lsn()); - snp_data->user_ctx = (void*)pg_iter; - } else { pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_data->user_ctx); } + snp_obj->user_ctx = (void*)pg_iter; + } else { + pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_obj->user_ctx); + } // Nuraft uses obj_id as a way to track the state of the snapshot read and write. // Nuraft starts with obj_id == 0 as first message always, leader send all the shards and @@ -258,16 +260,17 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(), s->get_last_log_idx()); //TODO snp_data->offset is int64, need to change to uint64 in homestore - if (snp_data->offset == int64_t(LAST_OBJ_ID)) { + if (snp_obj->offset == LAST_OBJ_ID) { // No more shards to read, baseline resync is finished after this. - snp_data->is_last_obj = true; + snp_obj->is_last_obj = true; LOGD("Read snapshot end, {}", log_str); return 0; } - auto obj_id = objId(snp_data->offset); + auto obj_id = objId(snp_obj->offset); log_str = fmt::format("{} shard_seq_num={} batch_num={}", log_str, obj_id.shard_seq_num, obj_id.batch_id); + LOGD("read current snp obj {}", log_str) //invalid Id if (!pg_iter->update_cursor(obj_id)) { LOGW("Invalid objId in snapshot read, {}, current shard_seq_num={}, current batch_num={}", @@ -278,7 +281,7 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap //pg metadata message //shardId starts from 1 if (obj_id.shard_seq_num == 0) { - if (!pg_iter->create_pg_snapshot_data(snp_data->blob)) { + if (!pg_iter->create_pg_snapshot_data(snp_obj->blob)) { LOGE("Failed to create pg snapshot data for snapshot read, {}", log_str); return -1; } @@ -290,54 +293,54 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap LOGE("Failed to generate shard blob list for snapshot read, {}", log_str); return -1; } - if (!pg_iter->create_shard_snapshot_data(snp_data->blob)) { + if (!pg_iter->create_shard_snapshot_data(snp_obj->blob)) { LOGE("Failed to create shard meta data for snapshot read, {}", log_str); return -1; } return 0; } //general blob message - if (!pg_iter->create_blobs_snapshot_data(snp_data->blob)) { + if (!pg_iter->create_blobs_snapshot_data(snp_obj->blob)) { LOGE("Failed to create blob batch data for snapshot read, {}", log_str); return -1; } return 0; } -void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context, - std::shared_ptr< homestore::snapshot_data > snp_data) { +void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context, + std::shared_ptr< homestore::snapshot_obj > snp_obj) { RELEASE_ASSERT(context != nullptr, "Context null"); - RELEASE_ASSERT(snp_data != nullptr, "Snapshot data null"); + RELEASE_ASSERT(snp_obj != nullptr, "Snapshot data null"); auto r_dev = repl_dev(); if (!m_snp_rcv_handler) { m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev); } auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); - auto obj_id = objId(static_cast< snp_obj_id_t >(snp_data->offset)); + auto obj_id = objId(static_cast< snp_obj_id_t >(snp_obj->offset)); auto log_suffix = fmt::format("group={} term={} lsn={} shard={} batch_num={} size={}", uuids::to_string(r_dev->group_id()), s->get_last_log_term(), s->get_last_log_idx(), - obj_id.shard_seq_num, obj_id.batch_id, snp_data->blob.size()); + obj_id.shard_seq_num, obj_id.batch_id, snp_obj->blob.size()); - if (snp_data->is_last_obj) { + if (snp_obj->is_last_obj) { LOGD("Write snapshot reached is_last_obj true {}", log_suffix); return; } // Check message integrity // TODO: add a flip here to simulate corrupted message - auto header = r_cast< const SyncMessageHeader* >(snp_data->blob.cbytes()); + auto header = r_cast< const SyncMessageHeader* >(snp_obj->blob.cbytes()); if (header->corrupted()) { LOGE("corrupted message in write_snapshot_data, lsn:{}, obj_id {} shard {} batch {}", s->get_last_log_idx(), obj_id.value, obj_id.shard_seq_num, obj_id.batch_id); return; } - if (auto payload_size = snp_data->blob.size() - sizeof(SyncMessageHeader); payload_size != header->payload_size) { + if (auto payload_size = snp_obj->blob.size() - sizeof(SyncMessageHeader); payload_size != header->payload_size) { LOGE("payload size mismatch in write_snapshot_data {} != {}, lsn:{}, obj_id {} shard {} batch {}", payload_size, header->payload_size, s->get_last_log_idx(), obj_id.value, obj_id.shard_seq_num, obj_id.batch_id); return; } - auto data_buf = snp_data->blob.cbytes() + sizeof(SyncMessageHeader); + auto data_buf = snp_obj->blob.cbytes() + sizeof(SyncMessageHeader); if (obj_id.shard_seq_num == 0) { // PG metadata & shard list message @@ -348,7 +351,8 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn // Check if the snapshot context is same as the current snapshot context. // If not, drop the previous context and re-init a new one if (m_snp_rcv_handler->get_context_lsn() != context->get_lsn()) { - m_snp_rcv_handler->reset_context(pg_data->pg_id(), context->get_lsn()); + LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn()); + m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id()); // TODO: Reset all data of current PG - let's resync on a pristine base } @@ -359,7 +363,7 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn obj_id.value, obj_id.shard_seq_num, obj_id.batch_id, ret); return; } - snp_data->offset = + snp_obj->offset = objId(HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_next_shard()), 0).value; LOGD("Write snapshot, processed PG data pg_id:{} {}", pg_data->pg_id(), log_suffix); return; @@ -380,7 +384,7 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn return; } // Request for the next batch - snp_data->offset = objId(obj_id.shard_seq_num, 1).value; + snp_obj->offset = objId(obj_id.shard_seq_num, 1).value; LOGD("Write snapshot, processed shard data shard_seq_num:{} {}", obj_id.shard_seq_num, log_suffix); return; } @@ -403,12 +407,12 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn if (blob_batch->is_last_batch()) { auto next_shard = m_snp_rcv_handler->get_next_shard(); if (next_shard == HSHomeObject::SnapshotReceiveHandler::shard_list_end_marker) { - snp_data->offset = LAST_OBJ_ID; + snp_obj->offset = LAST_OBJ_ID; } else { - snp_data->offset = objId(HSHomeObject::get_sequence_num_from_shard_id(next_shard), 0).value; + snp_obj->offset = objId(HSHomeObject::get_sequence_num_from_shard_id(next_shard), 0).value; } } else { - snp_data->offset = objId(obj_id.shard_seq_num, obj_id.batch_id + 1).value; + snp_obj->offset = objId(obj_id.shard_seq_num, obj_id.batch_id + 1).value; } LOGD("Write snapshot, processed blob data shard_seq_num:{} batch_num:{} {}", obj_id.shard_seq_num, obj_id.batch_id, diff --git a/src/lib/homestore_backend/replication_state_machine.hpp b/src/lib/homestore_backend/replication_state_machine.hpp index 6fa861d..ca5c953 100644 --- a/src/lib/homestore_backend/replication_state_machine.hpp +++ b/src/lib/homestore_backend/replication_state_machine.hpp @@ -180,10 +180,10 @@ class ReplicationStateMachine : public homestore::ReplDevListener { homestore::AsyncReplResult<> create_snapshot(std::shared_ptr< homestore::snapshot_context > context) override; virtual bool apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) override; virtual std::shared_ptr< homestore::snapshot_context > last_snapshot() override; - virtual int read_snapshot_data(std::shared_ptr< homestore::snapshot_context > context, - std::shared_ptr< homestore::snapshot_data > snp_data) override; - virtual void write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context, - std::shared_ptr< homestore::snapshot_data > snp_data) override; + virtual int read_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context, + std::shared_ptr< homestore::snapshot_obj > snp_obj) override; + virtual void write_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context, + std::shared_ptr< homestore::snapshot_obj > snp_obj) override; virtual void free_user_snp_ctx(void*& user_snp_ctx) override; private: diff --git a/src/lib/homestore_backend/resync_pg_data.fbs b/src/lib/homestore_backend/resync_pg_data.fbs index 3d28097..bf7d059 100644 --- a/src/lib/homestore_backend/resync_pg_data.fbs +++ b/src/lib/homestore_backend/resync_pg_data.fbs @@ -11,6 +11,8 @@ table Member { table ResyncPGMetaData { pg_id : uint16; // only low 16 bit is used for pg_id; replica_set_uuid : [ubyte]; // uuid of replica set + pg_size : uint64; // pg size; + chunk_size : uint64; // chunk size; blob_seq_num : uint64; // blob sequence number, used to assign next blob id; shard_seq_num: uint64; // shard sequence number, used to assign next shard id; members : [Member]; // peers; diff --git a/src/lib/homestore_backend/snapshot_receive_handler.cpp b/src/lib/homestore_backend/snapshot_receive_handler.cpp index 0544cc9..195f7d0 100644 --- a/src/lib/homestore_backend/snapshot_receive_handler.cpp +++ b/src/lib/homestore_backend/snapshot_receive_handler.cpp @@ -21,6 +21,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaD // Create local PG PGInfo pg_info(pg_meta.pg_id()); + pg_info.size = pg_meta.pg_size(); + pg_info.chunk_size = pg_meta.chunk_size(); std::copy_n(pg_meta.replica_set_uuid()->data(), 16, pg_info.replica_set_uuid.begin()); for (unsigned int i = 0; i < pg_meta.members()->size(); i++) { const auto member = pg_meta.members()->Get(i); diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index 958aa3a..4dd417b 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -196,6 +196,8 @@ TEST_F(HomeObjectFixture, PGBlobIterator) { auto u1 = pg_msg->replica_set_uuid(); auto u2 = pg->pg_info_.replica_set_uuid; ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end())); + ASSERT_EQ(pg_msg->pg_size(), pg->pg_info_.size); + ASSERT_EQ(pg_msg->chunk_size(), pg->pg_info_.chunk_size); ASSERT_EQ(pg_msg->blob_seq_num(), pg->durable_entities().blob_sequence_num.load()); ASSERT_EQ(pg_msg->shard_seq_num(), pg->shard_sequence_num_); @@ -300,6 +302,9 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { // We have to create a PG first to init repl_dev constexpr pg_id_t pg_id = 1; create_pg(pg_id); // to create repl dev + auto iter = _obj_inst->_pg_map.find(pg_id); + ASSERT_TRUE(iter != _obj_inst->_pg_map.end()); + auto pg = iter->second.get(); PGStats stats; ASSERT_TRUE(_obj_inst->pg_manager()->get_stats(pg_id, stats)); auto r_dev = homestore::HomeStore::instance()->repl_service().get_repl_dev(stats.replica_set_uuid); @@ -324,7 +329,7 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { shard_ids.push_back(i); } auto pg_entry = - CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, blob_seq_num, num_shards_per_pg, &members, &shard_ids); + CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, pg->pg_info_.size, pg->pg_info_.chunk_size, blob_seq_num, num_shards_per_pg, &members, &shard_ids); builder.Finish(pg_entry); auto pg_meta = GetResyncPGMetaData(builder.GetBufferPointer()); auto ret = handler->process_pg_snapshot_data(*pg_meta); diff --git a/src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp b/src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp index 3a7a39d..0251e4f 100644 --- a/src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp +++ b/src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp @@ -152,6 +152,8 @@ TEST_F(HeapChunkSelectorTest, test_for_each_chunk) { ASSERT_EQ(size.load(), 18); } +TEST_F(HeapChunkSelectorTest, test_total_disks) { ASSERT_EQ(HCS.total_disks(), 3); } + TEST_F(HeapChunkSelectorTest, test_identical_layout) { const homestore::blk_count_t count = 1; homestore::blk_alloc_hints hints;