Skip to content

Commit

Permalink
replicate createpg message across raft group (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 authored Feb 13, 2024
1 parent 021b4c9 commit 135d76e
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 51 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "1.0.4"
version = "1.0.5"
homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
topics = ("ebay")
Expand Down
3 changes: 2 additions & 1 deletion src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

namespace homeobject {

ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP);
ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP, CRC_MISMATCH,
NO_SPACE_LEFT, DRIVE_WRITE_ERROR);

struct PGMember {
explicit PGMember(peer_id_t _id) : id(_id) {}
Expand Down
10 changes: 4 additions & 6 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
const homestore::MultiBlkId& pbas,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

Expand Down Expand Up @@ -290,7 +290,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,
homestore::blk_alloc_hints HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

Expand Down Expand Up @@ -351,7 +351,7 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo
void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

Expand Down Expand Up @@ -392,9 +392,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
}

auto& multiBlks = r.value();
if (multiBlks != tombstone_pbas) {
repl_dev->async_free_blks(lsn, multiBlks);
}
if (multiBlks != tombstone_pbas) { repl_dev->async_free_blks(lsn, multiBlks); }

if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); }
}
Expand Down
3 changes: 1 addition & 2 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ void HSHomeObject::init_homestore() {

HomeStore::instance()->format_and_start({
{HS_SERVICE::META, hs_format_params{.size_pct = 5.0}},
{HS_SERVICE::LOG_REPLICATED, hs_format_params{.size_pct = 10.0}},
{HS_SERVICE::LOG_LOCAL, hs_format_params{.size_pct = 0.1}}, // TODO: Remove this after HS disables LOG_LOCAL
{HS_SERVICE::LOG, hs_format_params{.size_pct = 10.0, .chunk_size = 32 * Mi}},
{HS_SERVICE::REPLICATION,
hs_format_params{.size_pct = 79.0,
.num_chunks = 65000,
Expand Down
19 changes: 18 additions & 1 deletion src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,13 @@ class HSHomeObject : public HomeObjectImpl {
private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

// create pg related
PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info);
static std::string serialize_pg_info(const PGInfo& info);
static PGInfo deserialize_pg_info(const unsigned char* pg_info_str, size_t size);
void add_pg_to_map(unique< HS_PG > hs_pg);

// create shard related
shard_id_t generate_new_shard_id(pg_id_t pg);
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t);

Expand Down Expand Up @@ -260,6 +266,17 @@ class HSHomeObject : public HomeObjectImpl {
*/
void init_cp();

/**
* @brief Callback function invoked when createPG message is committed on a shard.
*
* @param lsn The logical sequence number of the message.
* @param header The header of the message.
* @param repl_dev The replication device.
* @param hs_ctx The replication request context.
*/
void on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand All @@ -270,7 +287,7 @@ class HSHomeObject : public HomeObjectImpl {
* @param hs_ctx The replication request context.
*/
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
homestore::ReplDev* repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx);
shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx);

/**
* @brief Retrieves the chunk number associated with the given shard ID.
Expand Down
135 changes: 106 additions & 29 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/string_generator.hpp>
#include <homestore/replication_service.hpp>
#include "hs_homeobject.hpp"
#include "replication_state_machine.hpp"
#include "hs_hmobj_cp.hpp"

using namespace homestore;
namespace homeobject {
static constexpr uint64_t io_align{512};

PGError toPgError(ReplServiceError const& e) {
switch (e) {
Expand Down Expand Up @@ -34,6 +36,14 @@ PGError toPgError(ReplServiceError const& e) {
return PGError::TIMEOUT;
case ReplServiceError::SERVER_NOT_FOUND:
return PGError::UNKNOWN_PG;
case ReplServiceError::NO_SPACE_LEFT:
return PGError::NO_SPACE_LEFT;
case ReplServiceError::DRIVE_WRITE_ERROR:
return PGError::DRIVE_WRITE_ERROR;
/* TODO: enable this after add erro type to homestore
case ReplServiceError::CRC_MISMATCH:
return PGError::CRC_MISMATCH;
*/
case ReplServiceError::OK:
DEBUG_ASSERT(false, "Should not process OK!");
[[fallthrough]];
Expand All @@ -48,32 +58,102 @@ PGError toPgError(ReplServiceError const& e) {
}

PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) {
auto pg_id = pg_info.id;
if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) return folly::Unit();

pg_info.replica_set_uuid = boost::uuids::random_generator()();
return hs_repl_service()
.create_repl_dev(pg_info.replica_set_uuid, peers)
.via(executor_)
.thenValue([this, pg_info = std::move(pg_info)](auto&& v) mutable -> PGManager::NullResult {
.thenValue([this, pg_info = std::move(pg_info)](auto&& v) mutable -> PGManager::NullAsyncResult {
if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); }
// we will write a PGHeader across the raft group and when it is committed
// all raft members will create PGinfo and index table for this PG.

// TODO create index table during create shard.
auto index_table = create_index_table();
auto uuid_str = boost::uuids::to_string(index_table->uuid());

auto pg_id = pg_info.id;
auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(v.value()), index_table);
std::scoped_lock lock_guard(index_lock_);
RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found");
index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table};

LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str);
hs_pg->index_table_ = index_table;
// Add to index service, so that it gets cleaned up when index service is shutdown.
homestore::hs()->index_service().add_index_table(index_table);
add_pg_to_map(std::move(hs_pg));
return folly::Unit();
// FIXME:https://github.com/eBay/HomeObject/pull/136#discussion_r1470504271
return do_create_pg(v.value(), std::move(pg_info));
});
}

PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info) {
auto serailized_pg_info = serialize_pg_info(pg_info);
auto info_size = serailized_pg_info.size();

auto total_size = sizeof(ReplicationMessageHeader) + info_size;
auto header = sisl::blob(iomanager.iobuf_alloc(total_size, io_align), total_size);
ReplicationMessageHeader repl_msg_header;
repl_msg_header.msg_type = ReplicationMessageType::CREATE_PG_MSG;
repl_msg_header.payload_size = info_size;
repl_msg_header.payload_crc =
crc32_ieee(init_crc32, r_cast< const unsigned char* >(serailized_pg_info.c_str()), info_size);
repl_msg_header.seal();
std::memcpy(header.bytes(), &repl_msg_header, sizeof(ReplicationMessageHeader));
std::memcpy(header.bytes() + sizeof(ReplicationMessageHeader), serailized_pg_info.c_str(), info_size);

// we do not need any hdr_buf_ , since we put everything in header blob
auto req = repl_result_ctx< PGManager::NullResult >::make(0, io_align);

// replicate this create pg message to all raft members of this group
repl_dev->async_alloc_write(header, sisl::blob{}, sisl::sg_list{}, req);
return req->result().deferValue([header = std::move(header)](auto const& e) -> PGManager::NullAsyncResult {
iomanager.iobuf_free(const_cast< uint8_t* >(header.cbytes()));
if (!e) { return folly::makeUnexpected(e.error()); }
return folly::Unit();
});
}

void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& header,
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< PGManager::NullResult >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(hs_ctx).get();
}

ReplicationMessageHeader* msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));

if (msg_header->corrupted()) {
LOGE("create PG message header is corrupted , lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); }
return;
}

auto serailized_pg_info_buf = header.cbytes() + sizeof(ReplicationMessageHeader);
const auto serailized_pg_info_size = header.size() - sizeof(ReplicationMessageHeader);

if (crc32_ieee(init_crc32, serailized_pg_info_buf, serailized_pg_info_size) != msg_header->payload_crc) {
// header & value is inconsistent;
LOGE("create PG message header is inconsistent with value, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); }
return;
}

auto pg_info = deserialize_pg_info(serailized_pg_info_buf, serailized_pg_info_size);
auto pg_id = pg_info.id;
if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) {
LOGW("PG already exists, lsn:{}, pg_id {}", lsn, pg_id);
if (ctx) { ctx->promise_.setValue(folly::Unit()); }
return;
}

// create index table and pg
// TODO create index table during create shard.
auto index_table = create_index_table();
auto uuid_str = boost::uuids::to_string(index_table->uuid());

auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev), index_table);
std::scoped_lock lock_guard(index_lock_);
RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found");
index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table};

LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str);
hs_pg->index_table_ = index_table;
// Add to index service, so that it gets cleaned up when index service is shutdown.
homestore::hs()->index_service().add_index_table(index_table);
add_pg_to_map(std::move(hs_pg));
if (ctx) ctx->promise_.setValue(folly::Unit());
}

PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) {
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP));
Expand All @@ -89,16 +169,15 @@ void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) {
RELEASE_ASSERT(_pg_map.end() != it1, "Unknown map insert error!");
}

#if 0
std::string HSHomeObject::serialize_pg_info(PGInfo const& pginfo) {
std::string HSHomeObject::serialize_pg_info(const PGInfo& pginfo) {
nlohmann::json j;
j["pg_info"]["pg_id_t"] = pginfo.id;
j["pg_info"]["repl_uuid"] = boost::uuids::to_string(pginfo.replica_set_uuid);

nlohmann::json members_j = {};
nlohmann::json members_j{};
for (auto const& member : pginfo.members) {
nlohmann::json member_j;
member_j["member_id"] = member.id;
member_j["member_id"] = boost::uuids::to_string(member.id);
member_j["name"] = member.name;
member_j["priority"] = member.priority;
members_j.push_back(member_j);
Expand All @@ -107,23 +186,21 @@ std::string HSHomeObject::serialize_pg_info(PGInfo const& pginfo) {
return j.dump();
}

PGInfo HSHomeObject::deserialize_pg_info(std::string const& json_str) {
auto pg_json = nlohmann::json::parse(json_str);
PGInfo HSHomeObject::deserialize_pg_info(const unsigned char* json_str, size_t size) {
auto pg_json = nlohmann::json::parse(json_str, json_str + size);

PGInfo pg_info;
pg_info.id = pg_json["pg_info"]["pg_id_t"].get< pg_id_t >();
PGInfo pg_info(pg_json["pg_info"]["pg_id_t"].get< pg_id_t >());
pg_info.replica_set_uuid = boost::uuids::string_generator()(pg_json["pg_info"]["repl_uuid"].get< std::string >());

for (auto const& m : pg_info["pg_info"]["members"]) {
PGMember member;
member.id = m["member_id"].get< pg_id_t >();
for (auto const& m : pg_json["pg_info"]["members"]) {
auto uuid_str = m["member_id"].get< std::string >();
PGMember member(boost::uuids::string_generator()(uuid_str));
member.name = m["name"].get< std::string >();
member.priority = m["priority"].get< int32_t >();
pg_info.members.emplace(std::move(member));
}
return pg_info;
}
#endif

void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) {
homestore::superblk< pg_info_superblk > pg_sb(_pg_meta_name);
Expand Down
11 changes: 5 additions & 6 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow
value.size = msg_size;
value.iovs.push_back(iovec(buf_ptr, msg_size));
// replicate this create shard message to PG members;
repl_dev->async_alloc_write(header, sisl::blob{}, value, req);
repl_dev->async_alloc_write(header, sisl::blob{buf_ptr, (uint32_t)msg_size}, value, req);
return req->result().deferValue([this](auto const& e) -> ShardManager::Result< ShardInfo > {
if (!e) return folly::makeUnexpected(e.error());
return e.value();
Expand Down Expand Up @@ -138,18 +138,17 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const
value.iovs.push_back(iovec(buf_ptr, msg_size));

// replicate this seal shard message to PG members;
repl_dev->async_alloc_write(header, sisl::blob{}, value, req);
repl_dev->async_alloc_write(header, sisl::blob{buf_ptr, (uint32_t)msg_size}, value, req);
return req->result();
}

void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
homestore::ReplDev* repl_dev,
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {

if (hs_ctx != nullptr) {
auto ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx);
do_shard_message_commit(lsn, *r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())),
blkids, ctx->hdr_buf_, hs_ctx);
blkids, hs_ctx->key, hs_ctx);
return;
}

Expand Down Expand Up @@ -180,7 +179,7 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader
homestore::MultiBlkId const& blkids, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
}

Expand Down
8 changes: 4 additions & 4 deletions src/lib/homestore_backend/replication_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

namespace homeobject {

VENUM(ReplicationMessageType, uint16_t, CREATE_SHARD_MSG = 0, SEAL_SHARD_MSG = 1, PUT_BLOB_MSG = 2, DEL_BLOB_MSG = 3,
UNKNOWN_MSG = 4);
VENUM(ReplicationMessageType, uint16_t, CREATE_PG_MSG = 0, CREATE_SHARD_MSG = 1, SEAL_SHARD_MSG = 2, PUT_BLOB_MSG = 3,
DEL_BLOB_MSG = 4, UNKNOWN_MSG = 5);

// magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum'
static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34;
Expand All @@ -20,8 +20,8 @@ struct ReplicationMessageHeader {
uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC};
uint32_t protocol_version{HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1};
ReplicationMessageType msg_type; // message type
pg_id_t pg_id;
shard_id_t shard_id;
pg_id_t pg_id{0};
shard_id_t shard_id{0};
uint32_t payload_size;
uint32_t payload_crc;
uint8_t reserved_pad[4]{};
Expand Down
10 changes: 10 additions & 0 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c
LOGI("applying raft log commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_PG_MSG: {
home_object_->on_create_pg_message_commit(lsn, header, repl_dev(), ctx);
break;
}
case ReplicationMessageType::CREATE_SHARD_MSG:
case ReplicationMessageType::SEAL_SHARD_MSG: {
home_object_->on_shard_message_commit(lsn, header, pbas, repl_dev(), ctx);
Expand Down Expand Up @@ -40,6 +44,12 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const&, sisl::
LOGI("on_rollback with lsn:{}", lsn);
}

void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) {
LOGE("on_error with :{}, lsn {}", error, ctx->lsn);
// TODO:: block is already freeed at homestore side, handle error if necessay.
}

homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) {
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
switch (msg_header->msg_type) {
Expand Down
Loading

0 comments on commit 135d76e

Please sign in to comment.