Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add db-upgrade V3 #3417

Merged
merged 16 commits into from
Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,8 @@ std::string NebulaKeyUtils::adminTaskKey(int32_t seqId, JobID jobId, TaskID task
return key;
}

std::string NebulaKeyUtils::dataVersionKey() {
return "\xFF\xFF\xFF\xFF";
}

} // namespace nebula
2 changes: 2 additions & 0 deletions src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ class NebulaKeyUtils final {

static std::string adminTaskKey(int32_t seqId, JobID jobId, TaskID taskId);

static std::string dataVersionKey();

static_assert(sizeof(NebulaKeyType) == sizeof(PartitionID));

private:
Expand Down
1 change: 1 addition & 0 deletions src/tools/db-upgrade/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ nebula_add_executable(
DbUpgraderTool.cpp
NebulaKeyUtilsV1.cpp
NebulaKeyUtilsV2.cpp
NebulaKeyUtilsV3.cpp
DbUpgrader.cpp
OBJECTS
$<TARGET_OBJECTS:meta_service_handler>
Expand Down
129 changes: 122 additions & 7 deletions src/tools/db-upgrade/DbUpgrader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
#include "common/fs/FileUtils.h"
#include "common/utils/IndexKeyUtils.h"
#include "common/utils/NebulaKeyUtils.h"
#include "rocksdb/sst_file_writer.h"
#include "tools/db-upgrade/NebulaKeyUtilsV1.h"
#include "tools/db-upgrade/NebulaKeyUtilsV2.h"
#include "tools/db-upgrade/NebulaKeyUtilsV3.h"

DEFINE_string(src_db_path,
"",
Expand All @@ -22,10 +24,11 @@ DEFINE_string(dst_db_path,
"multi paths should be split by comma");
DEFINE_string(upgrade_meta_server, "127.0.0.1:45500", "Meta servers' address.");
DEFINE_uint32(write_batch_num, 100, "The size of the batch written to rocksdb");
DEFINE_uint32(upgrade_version,
0,
"When the value is 1, upgrade the data from 1.x to 2.0 GA. "
"When the value is 2, upgrade the data from 2.0 RC to 2.0 GA.");
DEFINE_string(upgrade_version,
"",
"When the value is 1:2, upgrade the data from 1.x to 2.0 GA. "
"When the value is 2RC:2, upgrade the data from 2.0 RC to 2.0 GA."
"When the value is 2:3, upgrade the data from 2.0 GA to 3.0 .");
DEFINE_bool(compactions,
true,
"When the upgrade of the space is completed, "
Expand Down Expand Up @@ -83,7 +86,7 @@ Status UpgraderSpace::initSpace(const std::string& sId) {

// Use readonly rocksdb
readEngine_.reset(new nebula::kvstore::RocksEngine(
spaceId_, spaceVidLen_, srcPath_, "", nullptr, nullptr, true));
spaceId_, spaceVidLen_, srcPath_, "", nullptr, nullptr, false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rocksdb is originally a read-only

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because upgrade to 3.0 just need append new data by ingest sst file and write a dataVersionKey to identity the data encode version. So there is no need to write a new rocksdb, just upgrade in place

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confused about readEngine_.reset a writeable one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read means source and write means destnation

writeEngine_.reset(new nebula::kvstore::RocksEngine(spaceId_, spaceVidLen_, dstPath_));

parts_.clear();
Expand Down Expand Up @@ -882,6 +885,114 @@ std::string UpgraderSpace::encodeRowVal(const RowReader* reader,
return std::move(rowWrite).moveEncodedStr();
}

void UpgraderSpace::runPartV3() {
std::chrono::milliseconds take_dura{10};
if (auto pId = partQueue_.try_take_for(take_dura)) {
PartitionID partId = *pId;
// Handle vertex and edge, if there is an index, generate index data
LOG(INFO) << "Start to handle vertex/edge/index data in space id " << spaceId_ << " part id "
<< partId;
auto prefix = NebulaKeyUtilsV3::partTagPrefix(partId);
std::unique_ptr<kvstore::KVIterator> iter;
auto retCode = readEngine_->prefix(prefix, &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Space id " << spaceId_ << " part " << partId << " no found!";
LOG(ERROR) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id "
<< partId << " failed";

auto unFinishedPart = --unFinishedPart_;
if (unFinishedPart == 0) {
// all parts has finished
LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id "
<< spaceId_ << " finished";
} else {
pool_->add(std::bind(&UpgraderSpace::runPartV3, this));
}
return;
}
auto write_sst = [&, this](const std::vector<kvstore::KV>& data) {
::rocksdb::Options option;
option.create_if_missing = true;
option.compression = ::rocksdb::CompressionType::kNoCompression;
::rocksdb::SstFileWriter sst_file_writer(::rocksdb::EnvOptions(), option);
std::string file = ::fmt::format(
".nebula_upgrade.space-{}.part-{}.{}.sst", spaceId_, partId, std::time(nullptr));
::rocksdb::Status s = sst_file_writer.Open(file);
if (!s.ok()) {
LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":"
<< s.code();
}
for (auto item : data) {
s = sst_file_writer.Put(item.first, item.second);
if (!s.ok()) {
LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":"
<< s.code();
}
}
s = sst_file_writer.Finish();
if (!s.ok()) {
LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":"
<< s.code();
}
std::lock_guard<std::mutex> lck(this->ingest_sst_file_mut_);
ingest_sst_file_.push_back(file);
};
std::vector<kvstore::KV> data;
std::string lastVertexKey = "";
while (iter && iter->valid()) {
auto vertex = NebulaKeyUtilsV3::getVertexKey(iter->key());
if (vertex == lastVertexKey) {
iter->next();
continue;
SuperYoko marked this conversation as resolved.
Show resolved Hide resolved
}
data.emplace_back(vertex, "");
lastVertexKey = vertex;
if (data.size() >= 100000) {
write_sst(data);
data.clear();
}
iter->next();
}
if (!data.empty()) {
write_sst(data);
data.clear();
}
LOG(INFO) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " << partId
<< " succeed";

auto unFinishedPart = --unFinishedPart_;
if (unFinishedPart == 0) {
// all parts has finished
LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id "
<< spaceId_ << " finished.";
} else {
pool_->add(std::bind(&UpgraderSpace::runPartV3, this));
}
} else {
LOG(INFO) << "Handle vertex/edge/index of parts data in space id " << spaceId_ << " finished";
}
}
void UpgraderSpace::doProcessV3() {
LOG(INFO) << "Start to handle data in space id " << spaceId_;
// Parallel process part
auto partConcurrency = std::min(static_cast<size_t>(FLAGS_max_concurrent_parts), parts_.size());
LOG(INFO) << "Max concurrent parts: " << partConcurrency;
unFinishedPart_ = parts_.size();

LOG(INFO) << "Start to handle vertex/edge/index of parts data in space id " << spaceId_;
for (size_t i = 0; i < partConcurrency; ++i) {
pool_->add(std::bind(&UpgraderSpace::runPartV3, this));
}

while (unFinishedPart_ != 0) {
sleep(10);
}
auto code = readEngine_->ingest(ingest_sst_file_, true);
if (code != ::nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(FATAL) << "Faild upgrade 2:3 when ingest sst file:" << static_cast<int>(code);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG(FATAL) will cause crash ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and if ingest failed, upgrader can do nothing except crash.

}
readEngine_->put(NebulaKeyUtils::dataVersionKey(), NebulaKeyUtilsV3::dataVersionValue());
}
std::vector<std::string> UpgraderSpace::indexVertexKeys(
PartitionID partId,
VertexID& vId,
Expand Down Expand Up @@ -1094,10 +1205,14 @@ void DbUpgrader::doSpace() {
LOG(INFO) << "Upgrade from path " << upgraderSpaceIter->srcPath_ << " space id "
<< upgraderSpaceIter->entry_ << " to path " << upgraderSpaceIter->dstPath_
<< " begin";
if (FLAGS_upgrade_version == 1) {
if (FLAGS_upgrade_version == "1:2") {
upgraderSpaceIter->doProcessV1();
} else {
} else if (FLAGS_upgrade_version == "2RC:2") {
upgraderSpaceIter->doProcessV2();
} else if (FLAGS_upgrade_version == "2:3") {
upgraderSpaceIter->doProcessV3();
} else {
LOG(FATAL) << "error upgrade version " << FLAGS_upgrade_version;
}

auto ret = upgraderSpaceIter->copyWal();
Expand Down
10 changes: 9 additions & 1 deletion src/tools/db-upgrade/DbUpgrader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ DECLARE_string(src_db_path);
DECLARE_string(dst_db_path);
DECLARE_string(upgrade_meta_server);
DECLARE_uint32(write_batch_num);
DECLARE_uint32(upgrade_version);
DECLARE_string(upgrade_version);
DECLARE_bool(compactions);
DECLARE_uint32(max_concurrent_parts);
DECLARE_uint32(max_concurrent_spaces);
Expand Down Expand Up @@ -55,6 +55,9 @@ class UpgraderSpace {
// Processing v2 Rc data upgrade to v2 Ga
void doProcessV2();

// Processing v2 Ga data upgrade to v3
void doProcessV3();

// Perform manual compact
void doCompaction();

Expand Down Expand Up @@ -111,6 +114,8 @@ class UpgraderSpace {

void runPartV2();

void runPartV3();

public:
// Source data path
std::string srcPath_;
Expand Down Expand Up @@ -159,6 +164,9 @@ class UpgraderSpace {
folly::UnboundedBlockingQueue<PartitionID> partQueue_;

std::atomic<size_t> unFinishedPart_;

std::mutex ingest_sst_file_mut_;
std::vector<std::string> ingest_sst_file_;
};

// Upgrade one data path in storage conf
Expand Down
17 changes: 9 additions & 8 deletions src/tools/db-upgrade/DbUpgraderTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ void printHelp() {
A list of meta severs' ip:port separated by comma.
Default: 127.0.0.1:45500
--upgrade_version=<1|2>
This tool can only upgrade 1.x data or 2.0 RC data.
When the value is 1, upgrade the data from 1.x to 2.0 GA.
When the value is 2, upgrade the data from 2.0 RC to 2.0 GA.
Default: 0
--upgrade_version=<1:2|2RC:2|2:3>
This tool can only upgrade 1.x data, 2.0 RC, or 2.0 GA data.
1:2 upgrade the data from 1.x to 2.0GA
2RC:2 upgrade the data from 2.0RC to 2.0GA
2:3 upgrade the data from 2.0GA to 3.0
Default: ""
optional:
--write_batch_num=<N>
Expand Down Expand Up @@ -164,9 +165,9 @@ int main(int argc, char* argv[]) {
CHECK_NOTNULL(schemaMan);
CHECK_NOTNULL(indexMan);

if (FLAGS_upgrade_version != 1 && FLAGS_upgrade_version != 2) {
LOG(ERROR) << "Flag upgrade_version : " << FLAGS_upgrade_version
<< " illegal, upgrade_version can only be 1 or 2";
std::vector<std::string> versions = {"1:2", "2RC:2", "2:3"};
if (std::find(versions.begin(), versions.end(), FLAGS_upgrade_version) == versions.end()) {
LOG(ERROR) << "Flag upgrade_version : " << FLAGS_upgrade_version;
return EXIT_FAILURE;
}
LOG(INFO) << "Prepare phase end";
Expand Down
26 changes: 26 additions & 0 deletions src/tools/db-upgrade/NebulaKeyUtilsV3.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "tools/db-upgrade/NebulaKeyUtilsV3.h"

namespace nebula {
std::string NebulaKeyUtilsV3::partTagPrefix(PartitionID partId) {
PartitionID item = (partId << kPartitionOffset) | static_cast<uint32_t>(kTag_);
std::string key;
key.reserve(sizeof(PartitionID));
key.append(reinterpret_cast<const char*>(&item), sizeof(PartitionID));
return key;
}
std::string NebulaKeyUtilsV3::getVertexKey(folly::StringPiece tagKey) {
std::string key = tagKey.toString();
key[3] = static_cast<uint32_t>(kVertex);
key.resize(key.size() - sizeof(TagID));
return key;
}
std::string NebulaKeyUtilsV3::dataVersionValue() {
return "3.0";
}

} // namespace nebula
18 changes: 18 additions & 0 deletions src/tools/db-upgrade/NebulaKeyUtilsV3.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once
#include "common/utils/Types.h"
namespace nebula {
class NebulaKeyUtilsV3 {
public:
static std::string partTagPrefix(PartitionID partId);
static std::string getVertexKey(folly::StringPiece tagKey);
static std::string dataVersionValue();

private:
enum NebulaKeyTypeV3 : uint32_t { kTag_ = 0x00000001, kVertex = 0x00000007 };
};

} // namespace nebula