Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix meta upgrade for multi instance #3734

Merged
merged 1 commit into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions src/daemons/MetaDaemonInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
return nullptr;
}

auto engineRet = kvstore->part(nebula::kDefaultSpaceId, nebula::kDefaultPartId);
if (!nebula::ok(engineRet)) {
LOG(ERROR) << "Get nebula store engine failed";
return nullptr;
}

auto engine = nebula::value(engineRet)->engine();
LOG(INFO) << "Waiting for the leader elected...";
nebula::HostAddr leader;
while (true) {
Expand Down Expand Up @@ -138,22 +145,21 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
LOG(ERROR) << "Meta version is invalid";
return nullptr;
} else if (version == nebula::meta::MetaVersion::V1) {
auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(kvstore.get());
auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(engine);
if (!ret.ok()) {
LOG(ERROR) << ret;
LOG(ERROR) << "Update meta from V1 to V2 failed " << ret;
return nullptr;
}

nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get(), nebula::meta::MetaVersion::V2);
nebula::meta::MetaVersionMan::setMetaVersionToKV(engine, nebula::meta::MetaVersion::V2);
} else if (version == nebula::meta::MetaVersion::V2) {
LOG(INFO) << "version 3";
auto ret = nebula::meta::MetaVersionMan::updateMetaV2ToV3(kvstore.get());
auto ret = nebula::meta::MetaVersionMan::updateMetaV2ToV3(engine);
if (!ret.ok()) {
LOG(ERROR) << ret;
LOG(ERROR) << "Update meta from V2 to V3 failed " << ret;
return nullptr;
}

nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get(), nebula::meta::MetaVersion::V3);
nebula::meta::MetaVersionMan::setMetaVersionToKV(engine, nebula::meta::MetaVersion::V3);
}

LOG(INFO) << "Nebula store init succeeded, clusterId " << gClusterId;
Expand Down
114 changes: 61 additions & 53 deletions src/meta/MetaVersionMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "meta/MetaVersionMan.h"

#include "common/fs/FileUtils.h"
#include "meta/ActiveHostsMan.h"
#include "meta/processors/job/JobDescription.h"
#include "meta/processors/job/JobUtils.h"
Expand Down Expand Up @@ -51,78 +52,86 @@ MetaVersion MetaVersionMan::getVersionByHost(kvstore::KVStore* kv) {
}

// static
bool MetaVersionMan::setMetaVersionToKV(kvstore::KVStore* kv, MetaVersion version) {
CHECK_NOTNULL(kv);
std::vector<kvstore::KV> data;
data.emplace_back(kMetaVersionKey,
std::string(reinterpret_cast<const char*>(&version), sizeof(MetaVersion)));
bool ret = true;
folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(
kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) {
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Put failed, error: " << static_cast<int32_t>(code);
ret = false;
} else {
LOG(INFO) << "Write meta version 3 succeeds";
}
baton.post();
});
baton.wait();
return ret;
bool MetaVersionMan::setMetaVersionToKV(kvstore::KVEngine* engine, MetaVersion version) {
CHECK_NOTNULL(engine);
std::string versionValue =
std::string(reinterpret_cast<const char*>(&version), sizeof(MetaVersion));
auto code = engine->put(kMetaVersionKey, std::move(versionValue));
return code == nebula::cpp2::ErrorCode::SUCCEEDED;
}

// static
Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVEngine* engine) {
CHECK_NOTNULL(engine);
auto snapshot = folly::sformat("META_UPGRADE_SNAPSHOT_{}", MetaKeyUtils::genTimestampStr());
auto meteRet = kv->createCheckpoint(kDefaultSpaceId, snapshot);
if (meteRet.isLeftType()) {

std::string path = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot);
if (!fs::FileUtils::exist(path) && !fs::FileUtils::makeDir(path)) {
LOG(ERROR) << "Make checkpoint dir: " << path << " failed";
return Status::Error("Create snapshot file failed");
}

std::string dataPath = folly::sformat("{}/data", path);
auto code = engine->createCheckpoint(dataPath);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create snapshot failed: " << snapshot;
return Status::Error("Create snapshot failed");
}
auto status = doUpgradeV1ToV2(kv);

auto status = doUpgradeV1ToV2(engine);
if (!status.ok()) {
// rollback by snapshot
return status;
}

// delete snapshot file
auto dmRet = kv->dropCheckpoint(kDefaultSpaceId, snapshot);
if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
auto checkpointPath = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot);

if (fs::FileUtils::exist(checkpointPath) && !fs::FileUtils::remove(checkpointPath.data(), true)) {
LOG(ERROR) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually";
}
return Status::OK();
}

Status MetaVersionMan::updateMetaV2ToV3(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
Status MetaVersionMan::updateMetaV2ToV3(kvstore::KVEngine* engine) {
CHECK_NOTNULL(engine);
auto snapshot = folly::sformat("META_UPGRADE_SNAPSHOT_{}", MetaKeyUtils::genTimestampStr());
auto meteRet = kv->createCheckpoint(kDefaultSpaceId, snapshot);
if (meteRet.isLeftType()) {

std::string path = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot);
if (!fs::FileUtils::exist(path) && !fs::FileUtils::makeDir(path)) {
LOG(ERROR) << "Make checkpoint dir: " << path << " failed";
return Status::Error("Create snapshot file failed");
}

std::string dataPath = folly::sformat("{}/data", path);
auto code = engine->createCheckpoint(dataPath);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create snapshot failed: " << snapshot;
return Status::Error("Create snapshot failed");
}
auto status = doUpgradeV2ToV3(kv);

auto status = doUpgradeV2ToV3(engine);
if (!status.ok()) {
// rollback by snapshot
return status;
}

// delete snapshot file
auto dmRet = kv->dropCheckpoint(kDefaultSpaceId, snapshot);
if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
auto checkpointPath = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot);
if (fs::FileUtils::exist(checkpointPath) && !fs::FileUtils::remove(checkpointPath.data(), true)) {
LOG(ERROR) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually";
}
return Status::OK();
}

// static
Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
MetaDataUpgrade upgrader(kv);
Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVEngine* engine) {
MetaDataUpgrade upgrader(engine);
{
// kSpacesTable
auto prefix = nebula::meta::v1::kSpacesTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -143,7 +152,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kPartsTable
auto prefix = nebula::meta::v1::kPartsTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -164,7 +173,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kHostsTable
auto prefix = nebula::meta::v1::kHostsTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -185,7 +194,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kLeadersTable
auto prefix = nebula::meta::v1::kLeadersTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -206,7 +215,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kTagsTable
auto prefix = nebula::meta::v1::kTagsTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -227,7 +236,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kEdgesTable
auto prefix = nebula::meta::v1::kEdgesTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -248,7 +257,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kIndexesTable
auto prefix = nebula::meta::v1::kIndexesTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -269,7 +278,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kConfigsTable
auto prefix = nebula::meta::v1::kConfigsTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -290,7 +299,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kJob
auto prefix = JobUtil::jobPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand Down Expand Up @@ -324,7 +333,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
nebula::meta::v1::kJobArchive});
std::unique_ptr<kvstore::KVIterator> iter;
for (auto& prefix : prefixes) {
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -351,22 +360,22 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
}
}
}
if (!setMetaVersionToKV(kv, MetaVersion::V2)) {
if (!setMetaVersionToKV(engine, MetaVersion::V2)) {
return Status::Error("Persist meta version failed");
} else {
return Status::OK();
}
}

Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {
MetaDataUpgrade upgrader(kv);
Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVEngine* engine) {
MetaDataUpgrade upgrader(engine);
// Step 1: Upgrade HeartBeat into machine list
{
// collect all hosts association with zone
std::vector<HostAddr> zoneHosts;
const auto& zonePrefix = MetaKeyUtils::zonePrefix();
std::unique_ptr<kvstore::KVIterator> zoneIter;
auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, zonePrefix, &zoneIter);
auto code = engine->prefix(zonePrefix, &zoneIter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get active hosts failed";
return Status::Error("Get hosts failed");
Expand All @@ -382,7 +391,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {

const auto& prefix = MetaKeyUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
code = engine->prefix(prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get active hosts failed";
return Status::Error("Get hosts failed");
Expand Down Expand Up @@ -420,7 +429,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {
{
const auto& prefix = MetaKeyUtils::spacePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto code = engine->prefix(prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get spaces failed";
return Status::Error("Get spaces failed");
Expand All @@ -430,7 +439,6 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {
if (FLAGS_print_info) {
upgrader.printSpacesV2(iter->val());
}
auto spaceProperties = meta::v2::MetaServiceUtilsV2::parseSpace(iter->val());
auto status = upgrader.rewriteSpacesV2ToV3(iter->key(), iter->val());
if (!status.ok()) {
LOG(ERROR) << status;
Expand All @@ -439,7 +447,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {
iter->next();
}
}
if (!setMetaVersionToKV(kv, MetaVersion::V3)) {
if (!setMetaVersionToKV(engine, MetaVersion::V3)) {
return Status::Error("Persist meta version failed");
} else {
return Status::OK();
Expand Down
11 changes: 6 additions & 5 deletions src/meta/MetaVersionMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "common/base/Base.h"
#include "common/utils/MetaKeyUtils.h"
#include "kvstore/KVEngine.h"
#include "kvstore/KVStore.h"

namespace nebula {
Expand All @@ -29,18 +30,18 @@ class MetaVersionMan final {

static MetaVersion getMetaVersionFromKV(kvstore::KVStore* kv);

static bool setMetaVersionToKV(kvstore::KVStore* kv, MetaVersion version);
static bool setMetaVersionToKV(kvstore::KVEngine* engine, MetaVersion version);

static Status updateMetaV1ToV2(kvstore::KVStore* kv);
static Status updateMetaV1ToV2(kvstore::KVEngine* engine);

static Status updateMetaV2ToV3(kvstore::KVStore* kv);
static Status updateMetaV2ToV3(kvstore::KVEngine* engine);

private:
static MetaVersion getVersionByHost(kvstore::KVStore* kv);

static Status doUpgradeV1ToV2(kvstore::KVStore* kv);
static Status doUpgradeV1ToV2(kvstore::KVEngine* engine);

static Status doUpgradeV2ToV3(kvstore::KVStore* kv);
static Status doUpgradeV2ToV3(kvstore::KVEngine* engine);
};

} // namespace meta
Expand Down
4 changes: 2 additions & 2 deletions src/meta/upgrade/MetaDataUpgrade.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Status MetaDataUpgrade::rewriteSpacesV2ToV3(const folly::StringPiece &key,
auto groupName = *oldProps.group_name_ref();
auto groupKey = meta::v2::MetaServiceUtilsV2::groupKey(groupName);
std::string zoneValue;
auto code = kv_->get(kDefaultSpaceId, kDefaultPartId, std::move(groupKey), &zoneValue);
auto code = engine_->get(std::move(groupKey), &zoneValue);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("Get Group Failed");
}
Expand All @@ -107,7 +107,7 @@ Status MetaDataUpgrade::rewriteSpacesV2ToV3(const folly::StringPiece &key,
} else {
const auto &zonePrefix = MetaKeyUtils::zonePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kv_->prefix(kDefaultSpaceId, kDefaultPartId, zonePrefix, &iter);
auto code = engine_->prefix(zonePrefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("Get Zones Failed");
}
Expand Down
Loading