Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support load/save from GCS #4006

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,14 @@ void SinkReplyBuilder2::Send() {
uint64_t before_ns = util::fb2::ProactorBase::GetMonotonicTimeNs();
reply_stats.io_write_cnt++;
reply_stats.io_write_bytes += total_size_;

DVLOG(2) << "Writing " << total_size_ << " bytes";
if (auto ec = sink_->Write(vecs_.data(), vecs_.size()); ec)
ec_ = ec;

uint64_t after_ns = util::fb2::ProactorBase::GetMonotonicTimeNs();
reply_stats.send_stats.count++;
reply_stats.send_stats.total_duration += (after_ns - before_ns) / 1'000;
DVLOG(2) << "Finished writing " << total_size_ << " bytes";
send_active_ = false;
}

Expand Down
10 changes: 5 additions & 5 deletions src/server/detail/save_stages_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace fs = std::filesystem;
namespace {

bool IsCloudPath(string_view path) {
return absl::StartsWith(path, kS3Prefix);
return absl::StartsWith(path, kS3Prefix) || absl::StartsWith(path, kGCSPrefix);
}

// Create a directory and all its parents if they don't exist.
Expand Down Expand Up @@ -240,7 +240,7 @@ RdbSaver::SnapshotStats SaveStagesController::GetCurrentSnapshotProgress() const
// Summary file is always last in snapshots array.
void SaveStagesController::SaveDfs() {
// Extend all filenames with -{sid} or -summary and append .dfs.tmp
const string_view ext = is_cloud_ ? ".dfs" : ".dfs.tmp";
const string_view ext = snapshot_storage_->IsCloud() ? ".dfs" : ".dfs.tmp";
ShardId sid = 0;
for (auto& [_, filename] : snapshots_) {
filename = full_path_;
Expand Down Expand Up @@ -286,7 +286,7 @@ void SaveStagesController::SaveRdb() {
filename = full_path_;
if (!filename.has_extension())
filename += ".rdb";
if (!is_cloud_)
if (!snapshot_storage_->IsCloud())
filename += ".tmp";

if (auto err = snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_)); err) {
Expand Down Expand Up @@ -342,7 +342,7 @@ void SaveStagesController::InitResources() {

// Remove .tmp extension or delete files in case of error
GenericError SaveStagesController::FinalizeFileMovement() {
if (is_cloud_)
if (snapshot_storage_->IsCloud())
return {};
DVLOG(1) << "FinalizeFileMovement start";

Expand Down Expand Up @@ -391,7 +391,7 @@ GenericError SaveStagesController::BuildFullPath() {
dest_buf.resize(len);

full_path_ = dir_path / dest_buf;
is_cloud_ = IsCloudPath(full_path_.string());

return {};
}

Expand Down
1 change: 0 additions & 1 deletion src/server/detail/save_stages_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ struct SaveStagesController : public SaveStagesInputs {
private:
time_t start_time_;
std::filesystem::path full_path_;
bool is_cloud_;

AggregateGenericError shared_err_;
std::vector<std::pair<std::unique_ptr<RdbSnapshot>, std::filesystem::path>> snapshots_;
Expand Down
7 changes: 2 additions & 5 deletions src/server/detail/snapshot_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ constexpr string_view kSummarySuffix = "summary.dfs"sv;

pair<string, string> GetBucketPath(string_view path) {
string_view clean = path;
if (absl::StartsWith(clean, kS3Prefix)) {
clean = absl::StripPrefix(clean, kS3Prefix);
} else {
clean = absl::StripPrefix(clean, kGCSPrefix);
}
auto prefix = absl::StartsWith(clean, kS3Prefix) ? kS3Prefix : kGCSPrefix;
clean = absl::StripPrefix(clean, prefix);

size_t pos = clean.find('/');
if (pos == string_view::npos) {
Expand Down
12 changes: 12 additions & 0 deletions src/server/detail/snapshot_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class SnapshotStorage {
// Searches for all the relevant snapshot files given the RDB file or DFS summary file path.
io::Result<std::vector<std::string>, GenericError> ExpandSnapshot(const std::string& load_path);

virtual bool IsCloud() const {
return false;
}

protected:
virtual io::Result<std::vector<std::string>, GenericError> ExpandFromPath(
const std::string& path) = 0;
Expand Down Expand Up @@ -94,6 +98,10 @@ class GcsSnapshotStorage : public SnapshotStorage {
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
std::string_view dbfilename) override;

bool IsCloud() const final {
return true;
}

private:
io::Result<std::vector<std::string>, GenericError> ExpandFromPath(const std::string& path) final;

Expand All @@ -117,6 +125,10 @@ class AwsS3SnapshotStorage : public SnapshotStorage {
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
std::string_view dbfilename) override;

bool IsCloud() const final {
return true;
}

private:
io::Result<std::vector<std::string>, GenericError> ExpandFromPath(const std::string& path) final;

Expand Down
23 changes: 19 additions & 4 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ using strings::HumanReadableNumBytes;
namespace {

const auto kRedisVersion = "6.2.11";
constexpr string_view kS3Prefix = "s3://"sv;

using EngineFunc = void (ServerFamily::*)(CmdArgList args, Transaction* tx,
SinkReplyBuilder* builder, ConnectionContext* cntx);
Expand All @@ -252,8 +251,12 @@ string UnknownCmd(string cmd, CmdArgList args) {
absl::StrJoin(args.begin(), args.end(), ", ", CmdArgListFormatter()));
}

bool IsCloudPath(string_view path) {
return absl::StartsWith(path, kS3Prefix);
bool IsS3Path(string_view path) {
return absl::StartsWith(path, detail::kS3Prefix);
}

bool IsGCSPath(string_view path) {
return absl::StartsWith(path, detail::kGCSPrefix);
}

// Check that if TLS is used at least one form of client authentication is
Expand Down Expand Up @@ -866,7 +869,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
}

string flag_dir = GetFlag(FLAGS_dir);
if (IsCloudPath(flag_dir)) {
if (IsS3Path(flag_dir)) {
#ifdef WITH_AWS
shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); });
snapshot_storage_ = std::make_shared<detail::AwsS3SnapshotStorage>(
Expand All @@ -875,6 +878,14 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
#else
LOG(ERROR) << "Compiled without AWS support";
#endif
} else if (IsGCSPath(flag_dir)) {
auto gcs = std::make_shared<detail::GcsSnapshotStorage>();
auto ec = shard_set->pool()->GetNextProactor()->Await([&] { return gcs->Init(3000); });
if (ec) {
LOG(ERROR) << "Failed to initialize GCS snapshot storage: " << ec.message();
exit(1);
}
snapshot_storage_ = std::move(gcs);
} else if (fq_threadpool_) {
snapshot_storage_ = std::make_shared<detail::FileSnapshotStorage>(fq_threadpool_.get());
} else {
Expand Down Expand Up @@ -1174,6 +1185,8 @@ void ServerFamily::SnapshotScheduling() {

io::Result<size_t> ServerFamily::LoadRdb(const std::string& rdb_file,
LoadExistingKeys existing_keys) {
VLOG(1) << "Loading data from " << rdb_file;

error_code ec;
io::ReadonlyFileOrError res = snapshot_storage_->OpenReadFile(rdb_file);
if (res) {
Expand Down Expand Up @@ -1635,6 +1648,8 @@ GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view bas
"SAVING - can not save database"};
}

VLOG(1) << "Saving snapshot to " << basename;

save_controller_ = make_unique<SaveStagesController>(detail::SaveStagesInputs{
new_version, basename, trans, &service_, fq_threadpool_.get(), snapshot_storage_});

Expand Down
Loading