Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replicate createpg message across raft group #136

Merged
merged 10 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "1.0.4"
version = "1.0.5"
homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
topics = ("ebay")
Expand Down
3 changes: 2 additions & 1 deletion src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

namespace homeobject {

ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP);
ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP, CRC_MISMATCH,
NO_SPACE_LEFT, DRIVE_WRITE_ERROR);

struct PGMember {
explicit PGMember(peer_id_t _id) : id(_id) {}
Expand Down
10 changes: 4 additions & 6 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
const homestore::MultiBlkId& pbas,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

Expand Down Expand Up @@ -290,7 +290,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,
homestore::blk_alloc_hints HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

Expand Down Expand Up @@ -351,7 +351,7 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo
void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

Expand Down Expand Up @@ -392,9 +392,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
}

auto& multiBlks = r.value();
if (multiBlks != tombstone_pbas) {
repl_dev->async_free_blks(lsn, multiBlks);
}
if (multiBlks != tombstone_pbas) { repl_dev->async_free_blks(lsn, multiBlks); }

if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); }
}
Expand Down
3 changes: 1 addition & 2 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ void HSHomeObject::init_homestore() {

HomeStore::instance()->format_and_start({
{HS_SERVICE::META, hs_format_params{.size_pct = 5.0}},
{HS_SERVICE::LOG_REPLICATED, hs_format_params{.size_pct = 10.0}},
{HS_SERVICE::LOG_LOCAL, hs_format_params{.size_pct = 0.1}}, // TODO: Remove this after HS disables LOG_LOCAL
{HS_SERVICE::LOG, hs_format_params{.size_pct = 10.0, .chunk_size = 32 * Mi}},
{HS_SERVICE::REPLICATION,
hs_format_params{.size_pct = 79.0,
.num_chunks = 65000,
Expand Down
19 changes: 18 additions & 1 deletion src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,13 @@ class HSHomeObject : public HomeObjectImpl {
private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

// create pg related
PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info);
static std::string serialize_pg_info(const PGInfo& info);
static PGInfo deserialize_pg_info(const unsigned char* pg_info_str, size_t size);
void add_pg_to_map(unique< HS_PG > hs_pg);

// create shard related
shard_id_t generate_new_shard_id(pg_id_t pg);
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t);

Expand Down Expand Up @@ -260,6 +266,17 @@ class HSHomeObject : public HomeObjectImpl {
*/
void init_cp();

/**
* @brief Callback function invoked when createPG message is committed on a shard.
*
* @param lsn The logical sequence number of the message.
* @param header The header of the message.
* @param repl_dev The replication device.
* @param hs_ctx The replication request context.
*/
void on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand All @@ -270,7 +287,7 @@ class HSHomeObject : public HomeObjectImpl {
* @param hs_ctx The replication request context.
*/
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
homestore::ReplDev* repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx);
shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx);

/**
* @brief Retrieves the chunk number associated with the given shard ID.
Expand Down
135 changes: 106 additions & 29 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/string_generator.hpp>
#include <homestore/replication_service.hpp>
#include "hs_homeobject.hpp"
#include "replication_state_machine.hpp"
#include "hs_hmobj_cp.hpp"

using namespace homestore;
namespace homeobject {
static constexpr uint64_t io_align{512};

PGError toPgError(ReplServiceError const& e) {
switch (e) {
Expand Down Expand Up @@ -34,6 +36,14 @@ PGError toPgError(ReplServiceError const& e) {
return PGError::TIMEOUT;
case ReplServiceError::SERVER_NOT_FOUND:
return PGError::UNKNOWN_PG;
case ReplServiceError::NO_SPACE_LEFT:
return PGError::NO_SPACE_LEFT;
case ReplServiceError::DRIVE_WRITE_ERROR:
return PGError::DRIVE_WRITE_ERROR;
/* TODO: enable this after add erro type to homestore
case ReplServiceError::CRC_MISMATCH:
return PGError::CRC_MISMATCH;
*/
case ReplServiceError::OK:
DEBUG_ASSERT(false, "Should not process OK!");
[[fallthrough]];
Expand All @@ -48,32 +58,102 @@ PGError toPgError(ReplServiceError const& e) {
}

PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) {
auto pg_id = pg_info.id;
if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) return folly::Unit();

pg_info.replica_set_uuid = boost::uuids::random_generator()();
return hs_repl_service()
.create_repl_dev(pg_info.replica_set_uuid, peers)
.via(executor_)
.thenValue([this, pg_info = std::move(pg_info)](auto&& v) mutable -> PGManager::NullResult {
.thenValue([this, pg_info = std::move(pg_info)](auto&& v) mutable -> PGManager::NullAsyncResult {
if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); }
// we will write a PGHeader across the raft group and when it is committed
// all raft members will create PGinfo and index table for this PG.

// TODO create index table during create shard.
auto index_table = create_index_table();
auto uuid_str = boost::uuids::to_string(index_table->uuid());

auto pg_id = pg_info.id;
auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(v.value()), index_table);
std::scoped_lock lock_guard(index_lock_);
RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found");
index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table};

LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str);
hs_pg->index_table_ = index_table;
// Add to index service, so that it gets cleaned up when index service is shutdown.
homestore::hs()->index_service().add_index_table(index_table);
add_pg_to_map(std::move(hs_pg));
return folly::Unit();
// FIXME:https://github.com/eBay/HomeObject/pull/136#discussion_r1470504271
return do_create_pg(v.value(), std::move(pg_info));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we do if this fails? We need to handle that and either offload it somewhere to retry until it works; or unwind this group optimistically so we don't leak a repl_dev.

Copy link
Collaborator Author

@JacksonYao287 JacksonYao287 Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question!

i do not think we should retry, since if it crashes after create_repl_dev and before sending the createPG message, when restarting, the pginfo is lost, the repl_dev does not know which PG it should belongs.

my solution is
1 if do_create_pg fails, we just delete the repl_dev(unwind this group). create_repl_dev is not frequnent and not in io path, so even if this will be time-consuming, it is acceptable!

2 when restarting and after applying all the logs, get the repl_dev list from Homestore and iterate all the pgs to to find out if there is any repl_dev belonging to no pg and delete this repl_dev.

if we do this , we need to add two apis to Homestore:
1 get_repl_dev_list(repl_dev_type t);
2 delete_repl_dev(uuid )

I think this can make sure we do not leak any repl_dev

cc @hkadayam

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like we put group_id in the repl_dev superblock, can that be used to retrieve the pginfo from HomeObject?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiaoxichen
the gorup_id is a randomly generated uuid, and the pg_id is a uint_16.
but yes, we can generate uuid from uint_16, and get unit_16 from previously generated uuid, so that we get a natural map between uuid and uint_16.

the key point is that the pginfo originally exists in the create_pg_message, which need to be replicated across the repl_dev(raft group). if this message is not replicated to the majority of this group, then it is not committed. when restarts, the uncommitted message will probabaly be dropped,and repl_dev can not get pginfo from that dropped message.

Copy link
Collaborator

@xiaoxichen xiaoxichen Jan 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to say if we can avoid replicating pginfo at all to avoid the 2PC in this PR proposal. If we created the repl_dev we are all set, followers can construct pginfo from repl_dev. During recovery the pginfo also can be recovered by the sb of repl_dev/raft_config;

the 3 required fields we already have
members which can get from raft cluster config, the replica_set_uuid is group_id, the only thing needed is pg_id.

As you said , we can

  1. encode pg_id into replica_set_uuid so we get pg_id out from replicat_set_uuid
  2. put the pg_id into _user_ctx field of libnuraft::cluster_config, c.f link
  3. put the entire serailized_pg_info into _user_ctx

2/3 can be implemented as the cluster_config was loaded by calling RaftReplDev::load_config().

struct PGInfo {
    explicit PGInfo(pg_id_t _id) : id(_id) {}
    pg_id_t id;
    mutable MemberSet members;
    peer_id_t replica_set_uuid;

    auto operator<=>(PGInfo const& rhs) const { return id <=> rhs.id; }
    auto operator==(PGInfo const& rhs) const { return id == rhs.id; }
};

Copy link
Collaborator Author

@JacksonYao287 JacksonYao287 Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a summary, there are two problems:
1 HS side: how to handle create repl_dev failure(for example , fail to add any member to the raft group)?
this is the responsibility of HS, we should not consider this in HO side.

2 HO side: if we get a good repl_dev from HS, but we fail to create PG(for example, createPG message fail to be replicated to the majority of the raft group and is not committed, or crash happens at some moment ), how to do cleanup to avoid repl_dev leak?

according to the meeting today, this will be done in phrase three and by that time we will have my new features in homestore and homeobject, which are probably very helpful for solving this problem. so what about just leave the problem here for now (only happy path) and revisit it in phase 3.
@xiaoxichen @szmyd @sanebay

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes lets create an issue and put a #FIXME in the code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure , done

Copy link
Collaborator

@szmyd szmyd Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiaoxichen yes i know; i brought this up initially. Actually; there is one problem that is the generalized form of all problems:

  • I (SM) belong to a raft group (repl_dev) that I am not managing a PG for according to truth (CM).

All issues regarding repl_dev failure, pg_creation failure, migration during offline member (orphaned pg)...they generalize to this. The existence of a PG under that raft group is inconsequential. The solution is what I suggest, anything else is an optimization for a specialization of this issue and need not take the risk of complicating machinery just to correct something that is not in the immediate necessary to correct.

This is from years of fighting the raft membership issues with AM; we do try to cleanup in the cases that we can, but don't worry ourselves about it as the global cleanup routine will correct things. Many attempts to optimize resulted in out-of-quroum groups and deadlocks in the code.

We should just do what's easy here to start so things are correct and we can asses if there are timing issues that warrant an optimization beyond fire-and-forget cleanup.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szmyd yes you said makes perfect sense. The raft membership change in SM would be less than AM but still reconcile based on CM's PGMap is the right path. There are some timing issue we need to be careful though.

});
}

PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info) {
auto serailized_pg_info = serialize_pg_info(pg_info);
auto info_size = serailized_pg_info.size();

auto total_size = sizeof(ReplicationMessageHeader) + info_size;
auto header = sisl::blob(iomanager.iobuf_alloc(total_size, io_align), total_size);
ReplicationMessageHeader repl_msg_header;
repl_msg_header.msg_type = ReplicationMessageType::CREATE_PG_MSG;
repl_msg_header.payload_size = info_size;
repl_msg_header.payload_crc =
crc32_ieee(init_crc32, r_cast< const unsigned char* >(serailized_pg_info.c_str()), info_size);
repl_msg_header.seal();
std::memcpy(header.bytes(), &repl_msg_header, sizeof(ReplicationMessageHeader));
std::memcpy(header.bytes() + sizeof(ReplicationMessageHeader), serailized_pg_info.c_str(), info_size);

// we do not need any hdr_buf_ , since we put everything in header blob
auto req = repl_result_ctx< PGManager::NullResult >::make(0, io_align);

// replicate this create pg message to all raft members of this group
repl_dev->async_alloc_write(header, sisl::blob{}, sisl::sg_list{}, req);
return req->result().deferValue([header = std::move(header)](auto const& e) -> PGManager::NullAsyncResult {
iomanager.iobuf_free(const_cast< uint8_t* >(header.cbytes()));
if (!e) { return folly::makeUnexpected(e.error()); }
return folly::Unit();
});
}

void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& header,
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< PGManager::NullResult >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(hs_ctx).get();
}

ReplicationMessageHeader* msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));

if (msg_header->corrupted()) {
LOGE("create PG message header is corrupted , lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); }
return;
}

auto serailized_pg_info_buf = header.cbytes() + sizeof(ReplicationMessageHeader);
const auto serailized_pg_info_size = header.size() - sizeof(ReplicationMessageHeader);

if (crc32_ieee(init_crc32, serailized_pg_info_buf, serailized_pg_info_size) != msg_header->payload_crc) {
// header & value is inconsistent;
LOGE("create PG message header is inconsistent with value, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); }
return;
}

auto pg_info = deserialize_pg_info(serailized_pg_info_buf, serailized_pg_info_size);
auto pg_id = pg_info.id;
if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) {
LOGW("PG already exists, lsn:{}, pg_id {}", lsn, pg_id);
if (ctx) { ctx->promise_.setValue(folly::Unit()); }
return;
}

// create index table and pg
// TODO create index table during create shard.
auto index_table = create_index_table();
auto uuid_str = boost::uuids::to_string(index_table->uuid());

auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev), index_table);
std::scoped_lock lock_guard(index_lock_);
RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found");
index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table};

LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str);
hs_pg->index_table_ = index_table;
// Add to index service, so that it gets cleaned up when index service is shutdown.
homestore::hs()->index_service().add_index_table(index_table);
add_pg_to_map(std::move(hs_pg));
if (ctx) ctx->promise_.setValue(folly::Unit());
}

PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) {
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP));
Expand All @@ -89,16 +169,15 @@ void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) {
RELEASE_ASSERT(_pg_map.end() != it1, "Unknown map insert error!");
}

#if 0
std::string HSHomeObject::serialize_pg_info(PGInfo const& pginfo) {
std::string HSHomeObject::serialize_pg_info(const PGInfo& pginfo) {
nlohmann::json j;
j["pg_info"]["pg_id_t"] = pginfo.id;
j["pg_info"]["repl_uuid"] = boost::uuids::to_string(pginfo.replica_set_uuid);

nlohmann::json members_j = {};
nlohmann::json members_j{};
for (auto const& member : pginfo.members) {
nlohmann::json member_j;
member_j["member_id"] = member.id;
member_j["member_id"] = boost::uuids::to_string(member.id);
member_j["name"] = member.name;
member_j["priority"] = member.priority;
members_j.push_back(member_j);
Expand All @@ -107,23 +186,21 @@ std::string HSHomeObject::serialize_pg_info(PGInfo const& pginfo) {
return j.dump();
}

PGInfo HSHomeObject::deserialize_pg_info(std::string const& json_str) {
auto pg_json = nlohmann::json::parse(json_str);
PGInfo HSHomeObject::deserialize_pg_info(const unsigned char* json_str, size_t size) {
auto pg_json = nlohmann::json::parse(json_str, json_str + size);

PGInfo pg_info;
pg_info.id = pg_json["pg_info"]["pg_id_t"].get< pg_id_t >();
PGInfo pg_info(pg_json["pg_info"]["pg_id_t"].get< pg_id_t >());
pg_info.replica_set_uuid = boost::uuids::string_generator()(pg_json["pg_info"]["repl_uuid"].get< std::string >());

for (auto const& m : pg_info["pg_info"]["members"]) {
PGMember member;
member.id = m["member_id"].get< pg_id_t >();
for (auto const& m : pg_json["pg_info"]["members"]) {
auto uuid_str = m["member_id"].get< std::string >();
PGMember member(boost::uuids::string_generator()(uuid_str));
member.name = m["name"].get< std::string >();
member.priority = m["priority"].get< int32_t >();
pg_info.members.emplace(std::move(member));
}
return pg_info;
}
#endif

void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) {
homestore::superblk< pg_info_superblk > pg_sb(_pg_meta_name);
Expand Down
11 changes: 5 additions & 6 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow
value.size = msg_size;
value.iovs.push_back(iovec(buf_ptr, msg_size));
// replicate this create shard message to PG members;
repl_dev->async_alloc_write(header, sisl::blob{}, value, req);
repl_dev->async_alloc_write(header, sisl::blob{buf_ptr, (uint32_t)msg_size}, value, req);
return req->result().deferValue([this](auto const& e) -> ShardManager::Result< ShardInfo > {
if (!e) return folly::makeUnexpected(e.error());
return e.value();
Expand Down Expand Up @@ -138,18 +138,17 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const
value.iovs.push_back(iovec(buf_ptr, msg_size));

// replicate this seal shard message to PG members;
repl_dev->async_alloc_write(header, sisl::blob{}, value, req);
repl_dev->async_alloc_write(header, sisl::blob{buf_ptr, (uint32_t)msg_size}, value, req);
return req->result();
}

void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
homestore::ReplDev* repl_dev,
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {

if (hs_ctx != nullptr) {
auto ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx);
do_shard_message_commit(lsn, *r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())),
blkids, ctx->hdr_buf_, hs_ctx);
blkids, hs_ctx->key, hs_ctx);
return;
}

Expand Down Expand Up @@ -180,7 +179,7 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader
homestore::MultiBlkId const& blkids, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
}

Expand Down
8 changes: 4 additions & 4 deletions src/lib/homestore_backend/replication_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

namespace homeobject {

VENUM(ReplicationMessageType, uint16_t, CREATE_SHARD_MSG = 0, SEAL_SHARD_MSG = 1, PUT_BLOB_MSG = 2, DEL_BLOB_MSG = 3,
UNKNOWN_MSG = 4);
VENUM(ReplicationMessageType, uint16_t, CREATE_PG_MSG = 0, CREATE_SHARD_MSG = 1, SEAL_SHARD_MSG = 2, PUT_BLOB_MSG = 3,
DEL_BLOB_MSG = 4, UNKNOWN_MSG = 5);

// magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum'
static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34;
Expand All @@ -20,8 +20,8 @@ struct ReplicationMessageHeader {
uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC};
uint32_t protocol_version{HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1};
ReplicationMessageType msg_type; // message type
pg_id_t pg_id;
shard_id_t shard_id;
pg_id_t pg_id{0};
shard_id_t shard_id{0};
uint32_t payload_size;
uint32_t payload_crc;
uint8_t reserved_pad[4]{};
Expand Down
10 changes: 10 additions & 0 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c
LOGI("applying raft log commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_PG_MSG: {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add tests ?

Copy link
Collaborator Author

@JacksonYao287 JacksonYao287 Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the existing UT has covered this case

EXPECT_TRUE(homeobj_->pg_manager()->create_pg(std::move(info)).get());

do we need add more specific test case?

home_object_->on_create_pg_message_commit(lsn, header, repl_dev(), ctx);
break;
}
case ReplicationMessageType::CREATE_SHARD_MSG:
case ReplicationMessageType::SEAL_SHARD_MSG: {
home_object_->on_shard_message_commit(lsn, header, pbas, repl_dev(), ctx);
Expand Down Expand Up @@ -40,6 +44,12 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const&, sisl::
LOGI("on_rollback with lsn:{}", lsn);
}

void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) {
LOGE("on_error with :{}, lsn {}", error, ctx->lsn);
// TODO:: block is already freeed at homestore side, handle error if necessay.
}

homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) {
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
switch (msg_header->msg_type) {
Expand Down
Loading
Loading