Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some bugs in baseline resync #236

Merged
merged 9 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.13"
version = "2.1.17"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down Expand Up @@ -49,7 +49,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.4]@oss/master")
self.requires("homestore/[^6.5.19]@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/3.3.1", override=True)
Expand Down
12 changes: 12 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ class HeapChunkSelector : public homestore::ChunkSelector {

uint32_t get_chunk_size() const;

/**
* @brief Returns the number of disks we seen
*
* Warning : calling this before HS fully start might getting wrong result.
*
* This function returns the number of disks the chunk selector seen.
* It should be the accurate source that how many disks in the system for data.
* If a disk is down in degraded mode, it wont be load and no chunk will be
* added into selector.
*/
uint32_t total_disks() const { return m_per_dev_heap.size(); }

private:
void add_chunk_internal(const chunk_num_t, bool add_to_heap = true);

Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s

if (!repl_dev->is_leader()) {
LOGW("failed to put blob for pg [{}], shard [{}], not leader", pg_id, shard.id);
return folly::makeUnexpected(BlobErrorCode::NOT_LEADER);
return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id()));
}

// Create a put_blob request which allocates for header, key and blob_header, user_key. Data sgs are added later
Expand Down Expand Up @@ -385,7 +385,7 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo

if (!repl_dev->is_leader()) {
LOGW("failed to del blob for pg [{}], shard [{}], blob_id [{}], not leader", pg_id, shard.id, blob_id);
return folly::makeUnexpected(BlobErrorCode::NOT_LEADER);
return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id()));
}

// Create an unaligned header request unaligned
Expand Down
10 changes: 8 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class HSReplApplication : public homestore::ReplApplication {
return it->second;
}

void on_repl_devs_init_completed() override {
_home_object->on_replica_restart();
}

std::pair< std::string, uint16_t > lookup_peer(homestore::replica_id_t uuid) const override {
std::string endpoint;
// for folly::uri to parse correctly, we need to add "http://" prefix
Expand Down Expand Up @@ -195,13 +199,14 @@ void HSHomeObject::init_homestore() {
.chunk_sel_type = chunk_selector_type_t::CUSTOM}},
});
}

// We dont have any repl dev now, explicitly call init_completed_cb() where we register PG/Shard meta types.
repl_app->on_repl_devs_init_completed();
// Create a superblock that contains our SvcId
auto svc_sb = homestore::superblk< svc_info_superblk_t >(_svc_meta_name);
svc_sb.create(sizeof(svc_info_superblk_t));
svc_sb->svc_id_ = _our_id;
svc_sb.write();
on_replica_restart();

} else {
RELEASE_ASSERT(!_our_id.is_nil(), "No SvcId read after HomeStore recovery!");
auto const new_id = app->discover_svcid(_our_id);
Expand Down Expand Up @@ -313,6 +318,7 @@ HomeObjectStats HSHomeObject::_get_stats() const {

stats.num_open_shards = num_open_shards;
stats.avail_open_shards = chunk_selector()->total_chunks() - num_open_shards;
stats.num_disks = chunk_selector()->total_disks();
return stats;
}

Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ class HSHomeObject : public HomeObjectImpl {

std::vector< ShardEntry > shard_list_{0};

objId cur_obj_id_{1, 0};
uint64_t cur_shard_idx_{0};
objId cur_obj_id_{0, 0};
int64_t cur_shard_idx_{-1};
std::vector<BlobInfo> cur_blob_list_{0};
uint64_t cur_start_blob_idx_{0};
uint64_t cur_batch_blob_count_{0};
Expand Down
8 changes: 4 additions & 4 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ objId HSHomeObject::PGBlobIterator::expected_next_obj_id() {
return objId(cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id + 1);
}
//next shard
if (cur_shard_idx_ < shard_list_.size() - 1) {
if (cur_shard_idx_ < static_cast< int64_t >(shard_list_.size() - 1)) {
auto next_shard_seq_num = shard_list_[cur_shard_idx_ + 1].info.id & 0xFFFFFFFFFFFF;
return objId(next_shard_seq_num, 0);
}
Expand Down Expand Up @@ -103,7 +103,7 @@ bool HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& m
shard_ids.push_back(shard.info.id);
}

auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg->durable_entities().blob_sequence_num,
auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg_info.size, pg_info.chunk_size, pg->durable_entities().blob_sequence_num,
pg->shard_sequence_num_, &members, &shard_ids);
builder_.FinishSizePrefixed(pg_entry);

Expand Down Expand Up @@ -212,7 +212,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
}

sisl::io_blob_safe blob;
auto retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
for (int i = 0; i < retries; i++) {
auto result = load_blob_data(info, state).get();
if (result.hasError() && result.error().code == BlobErrorCode::READ_FAILED) {
Expand Down Expand Up @@ -241,7 +241,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
builder_.FinishSizePrefixed(
CreateResyncBlobDataBatchDirect(builder_, &blob_entries, end_of_shard));

LOGD("create blobs snapshot data batch: shard_id={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}",
LOGD("create blobs snapshot data batch: shard_seq_num={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}",
cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id, total_bytes, blob_entries.size(), end_of_shard);

pack_resync_message(data_blob, SyncMessageType::SHARD_BATCH);
Expand Down
58 changes: 31 additions & 27 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header,
}
}

void ReplicationStateMachine::on_restart() { home_object_->on_replica_restart(); }
void ReplicationStateMachine::on_restart() { LOGD("ReplicationStateMachine::on_restart");}

void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) {
Expand Down Expand Up @@ -231,16 +231,18 @@ std::shared_ptr< homestore::snapshot_context > ReplicationStateMachine::last_sna
return snp;
}

int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) {
int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) {
HSHomeObject::PGBlobIterator* pg_iter = nullptr;
auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot();

if (snp_data->user_ctx == nullptr) {
if (snp_obj->user_ctx == nullptr) {
// Create the pg blob iterator for the first time.
pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id(), context->get_lsn());
snp_data->user_ctx = (void*)pg_iter;
} else { pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_data->user_ctx); }
snp_obj->user_ctx = (void*)pg_iter;
} else {
pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_obj->user_ctx);
}

// Nuraft uses obj_id as a way to track the state of the snapshot read and write.
// Nuraft starts with obj_id == 0 as first message always, leader send all the shards and
Expand All @@ -258,16 +260,17 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap
boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(),
s->get_last_log_idx());
//TODO snp_data->offset is int64, need to change to uint64 in homestore
if (snp_data->offset == int64_t(LAST_OBJ_ID)) {
if (snp_obj->offset == LAST_OBJ_ID) {
// No more shards to read, baseline resync is finished after this.
snp_data->is_last_obj = true;
snp_obj->is_last_obj = true;
LOGD("Read snapshot end, {}", log_str);
return 0;
}

auto obj_id = objId(snp_data->offset);
auto obj_id = objId(snp_obj->offset);
log_str = fmt::format("{} shard_seq_num={} batch_num={}", log_str, obj_id.shard_seq_num, obj_id.batch_id);

LOGD("read current snp obj {}", log_str)
//invalid Id
if (!pg_iter->update_cursor(obj_id)) {
LOGW("Invalid objId in snapshot read, {}, current shard_seq_num={}, current batch_num={}",
Expand All @@ -278,7 +281,7 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap
//pg metadata message
//shardId starts from 1
if (obj_id.shard_seq_num == 0) {
if (!pg_iter->create_pg_snapshot_data(snp_data->blob)) {
if (!pg_iter->create_pg_snapshot_data(snp_obj->blob)) {
LOGE("Failed to create pg snapshot data for snapshot read, {}", log_str);
return -1;
}
Expand All @@ -290,54 +293,54 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap
LOGE("Failed to generate shard blob list for snapshot read, {}", log_str);
return -1;
}
if (!pg_iter->create_shard_snapshot_data(snp_data->blob)) {
if (!pg_iter->create_shard_snapshot_data(snp_obj->blob)) {
LOGE("Failed to create shard meta data for snapshot read, {}", log_str);
return -1;
}
return 0;
}
//general blob message
if (!pg_iter->create_blobs_snapshot_data(snp_data->blob)) {
if (!pg_iter->create_blobs_snapshot_data(snp_obj->blob)) {
LOGE("Failed to create blob batch data for snapshot read, {}", log_str);
return -1;
}
return 0;
}

void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) {
void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) {
RELEASE_ASSERT(context != nullptr, "Context null");
RELEASE_ASSERT(snp_data != nullptr, "Snapshot data null");
RELEASE_ASSERT(snp_obj != nullptr, "Snapshot data null");
auto r_dev = repl_dev();
if (!m_snp_rcv_handler) {
m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev);
}

auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot();
auto obj_id = objId(static_cast< snp_obj_id_t >(snp_data->offset));
auto obj_id = objId(static_cast< snp_obj_id_t >(snp_obj->offset));
auto log_suffix = fmt::format("group={} term={} lsn={} shard={} batch_num={} size={}",
uuids::to_string(r_dev->group_id()), s->get_last_log_term(), s->get_last_log_idx(),
obj_id.shard_seq_num, obj_id.batch_id, snp_data->blob.size());
obj_id.shard_seq_num, obj_id.batch_id, snp_obj->blob.size());

if (snp_data->is_last_obj) {
if (snp_obj->is_last_obj) {
LOGD("Write snapshot reached is_last_obj true {}", log_suffix);
return;
}

// Check message integrity
// TODO: add a flip here to simulate corrupted message
auto header = r_cast< const SyncMessageHeader* >(snp_data->blob.cbytes());
auto header = r_cast< const SyncMessageHeader* >(snp_obj->blob.cbytes());
if (header->corrupted()) {
LOGE("corrupted message in write_snapshot_data, lsn:{}, obj_id {} shard {} batch {}", s->get_last_log_idx(),
obj_id.value, obj_id.shard_seq_num, obj_id.batch_id);
return;
}
if (auto payload_size = snp_data->blob.size() - sizeof(SyncMessageHeader); payload_size != header->payload_size) {
if (auto payload_size = snp_obj->blob.size() - sizeof(SyncMessageHeader); payload_size != header->payload_size) {
LOGE("payload size mismatch in write_snapshot_data {} != {}, lsn:{}, obj_id {} shard {} batch {}", payload_size,
header->payload_size, s->get_last_log_idx(), obj_id.value, obj_id.shard_seq_num, obj_id.batch_id);
return;
}
auto data_buf = snp_data->blob.cbytes() + sizeof(SyncMessageHeader);
auto data_buf = snp_obj->blob.cbytes() + sizeof(SyncMessageHeader);

if (obj_id.shard_seq_num == 0) {
// PG metadata & shard list message
Expand All @@ -348,7 +351,8 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
// Check if the snapshot context is same as the current snapshot context.
// If not, drop the previous context and re-init a new one
if (m_snp_rcv_handler->get_context_lsn() != context->get_lsn()) {
m_snp_rcv_handler->reset_context(pg_data->pg_id(), context->get_lsn());
LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn());
m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id());
// TODO: Reset all data of current PG - let's resync on a pristine base
}

Expand All @@ -359,7 +363,7 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
obj_id.value, obj_id.shard_seq_num, obj_id.batch_id, ret);
return;
}
snp_data->offset =
snp_obj->offset =
objId(HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_next_shard()), 0).value;
LOGD("Write snapshot, processed PG data pg_id:{} {}", pg_data->pg_id(), log_suffix);
return;
Expand All @@ -380,7 +384,7 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
return;
}
// Request for the next batch
snp_data->offset = objId(obj_id.shard_seq_num, 1).value;
snp_obj->offset = objId(obj_id.shard_seq_num, 1).value;
LOGD("Write snapshot, processed shard data shard_seq_num:{} {}", obj_id.shard_seq_num, log_suffix);
return;
}
Expand All @@ -403,12 +407,12 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
if (blob_batch->is_last_batch()) {
auto next_shard = m_snp_rcv_handler->get_next_shard();
if (next_shard == HSHomeObject::SnapshotReceiveHandler::shard_list_end_marker) {
snp_data->offset = LAST_OBJ_ID;
snp_obj->offset = LAST_OBJ_ID;
} else {
snp_data->offset = objId(HSHomeObject::get_sequence_num_from_shard_id(next_shard), 0).value;
snp_obj->offset = objId(HSHomeObject::get_sequence_num_from_shard_id(next_shard), 0).value;
}
} else {
snp_data->offset = objId(obj_id.shard_seq_num, obj_id.batch_id + 1).value;
snp_obj->offset = objId(obj_id.shard_seq_num, obj_id.batch_id + 1).value;
}

LOGD("Write snapshot, processed blob data shard_seq_num:{} batch_num:{} {}", obj_id.shard_seq_num, obj_id.batch_id,
Expand Down
8 changes: 4 additions & 4 deletions src/lib/homestore_backend/replication_state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ class ReplicationStateMachine : public homestore::ReplDevListener {
homestore::AsyncReplResult<> create_snapshot(std::shared_ptr< homestore::snapshot_context > context) override;
virtual bool apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) override;
virtual std::shared_ptr< homestore::snapshot_context > last_snapshot() override;
virtual int read_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) override;
virtual void write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) override;
virtual int read_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) override;
virtual void write_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) override;
virtual void free_user_snp_ctx(void*& user_snp_ctx) override;

private:
Expand Down
2 changes: 2 additions & 0 deletions src/lib/homestore_backend/resync_pg_data.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ table Member {
table ResyncPGMetaData {
pg_id : uint16; // only low 16 bit is used for pg_id;
replica_set_uuid : [ubyte]; // uuid of replica set
pg_size : uint64; // pg size;
chunk_size : uint64; // chunk size;
blob_seq_num : uint64; // blob sequence number, used to assign next blob id;
shard_seq_num: uint64; // shard sequence number, used to assign next shard id;
members : [Member]; // peers;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/homestore_backend/snapshot_receive_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaD

// Create local PG
PGInfo pg_info(pg_meta.pg_id());
pg_info.size = pg_meta.pg_size();
pg_info.chunk_size = pg_meta.chunk_size();
std::copy_n(pg_meta.replica_set_uuid()->data(), 16, pg_info.replica_set_uuid.begin());
for (unsigned int i = 0; i < pg_meta.members()->size(); i++) {
const auto member = pg_meta.members()->Get(i);
Expand Down
7 changes: 6 additions & 1 deletion src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ TEST_F(HomeObjectFixture, PGBlobIterator) {
auto u1 = pg_msg->replica_set_uuid();
auto u2 = pg->pg_info_.replica_set_uuid;
ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end()));
ASSERT_EQ(pg_msg->pg_size(), pg->pg_info_.size);
ASSERT_EQ(pg_msg->chunk_size(), pg->pg_info_.chunk_size);
ASSERT_EQ(pg_msg->blob_seq_num(), pg->durable_entities().blob_sequence_num.load());
ASSERT_EQ(pg_msg->shard_seq_num(), pg->shard_sequence_num_);

Expand Down Expand Up @@ -300,6 +302,9 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
// We have to create a PG first to init repl_dev
constexpr pg_id_t pg_id = 1;
create_pg(pg_id); // to create repl dev
auto iter = _obj_inst->_pg_map.find(pg_id);
ASSERT_TRUE(iter != _obj_inst->_pg_map.end());
auto pg = iter->second.get();
PGStats stats;
ASSERT_TRUE(_obj_inst->pg_manager()->get_stats(pg_id, stats));
auto r_dev = homestore::HomeStore::instance()->repl_service().get_repl_dev(stats.replica_set_uuid);
Expand All @@ -324,7 +329,7 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
shard_ids.push_back(i);
}
auto pg_entry =
CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, blob_seq_num, num_shards_per_pg, &members, &shard_ids);
CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, pg->pg_info_.size, pg->pg_info_.chunk_size, blob_seq_num, num_shards_per_pg, &members, &shard_ids);
builder.Finish(pg_entry);
auto pg_meta = GetResyncPGMetaData(builder.GetBufferPointer());
auto ret = handler->process_pg_snapshot_data(*pg_meta);
Expand Down
2 changes: 2 additions & 0 deletions src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ TEST_F(HeapChunkSelectorTest, test_for_each_chunk) {
ASSERT_EQ(size.load(), 18);
}

TEST_F(HeapChunkSelectorTest, test_total_disks) { ASSERT_EQ(HCS.total_disks(), 3); }

TEST_F(HeapChunkSelectorTest, test_identical_layout) {
const homestore::blk_count_t count = 1;
homestore::blk_alloc_hints hints;
Expand Down
Loading