diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 68186af5afb4..478e3d855fa6 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -297,13 +297,14 @@ void SinkReplyBuilder2::Send() { uint64_t before_ns = util::fb2::ProactorBase::GetMonotonicTimeNs(); reply_stats.io_write_cnt++; reply_stats.io_write_bytes += total_size_; - + DVLOG(2) << "Writing " << total_size_ << " bytes"; if (auto ec = sink_->Write(vecs_.data(), vecs_.size()); ec) ec_ = ec; uint64_t after_ns = util::fb2::ProactorBase::GetMonotonicTimeNs(); reply_stats.send_stats.count++; reply_stats.send_stats.total_duration += (after_ns - before_ns) / 1'000; + DVLOG(2) << "Finished writing " << total_size_ << " bytes"; send_active_ = false; } diff --git a/src/server/detail/save_stages_controller.cc b/src/server/detail/save_stages_controller.cc index 924aa427d97b..8b4c21d73b48 100644 --- a/src/server/detail/save_stages_controller.cc +++ b/src/server/detail/save_stages_controller.cc @@ -36,7 +36,7 @@ namespace fs = std::filesystem; namespace { bool IsCloudPath(string_view path) { - return absl::StartsWith(path, kS3Prefix); + return absl::StartsWith(path, kS3Prefix) || absl::StartsWith(path, kGCSPrefix); } // Create a directory and all its parents if they don't exist. @@ -240,7 +240,7 @@ RdbSaver::SnapshotStats SaveStagesController::GetCurrentSnapshotProgress() const // Summary file is always last in snapshots array. void SaveStagesController::SaveDfs() { // Extend all filenames with -{sid} or -summary and append .dfs.tmp - const string_view ext = is_cloud_ ? ".dfs" : ".dfs.tmp"; + const string_view ext = snapshot_storage_->IsCloud() ? ".dfs" : ".dfs.tmp"; ShardId sid = 0; for (auto& [_, filename] : snapshots_) { filename = full_path_; @@ -286,7 +286,7 @@ void SaveStagesController::SaveRdb() { filename = full_path_; if (!filename.has_extension()) filename += ".rdb"; - if (!is_cloud_) + if (!snapshot_storage_->IsCloud()) filename += ".tmp"; if (auto err = snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_)); err) { @@ -342,7 +342,7 @@ void SaveStagesController::InitResources() { // Remove .tmp extension or delete files in case of error GenericError SaveStagesController::FinalizeFileMovement() { - if (is_cloud_) + if (snapshot_storage_->IsCloud()) return {}; DVLOG(1) << "FinalizeFileMovement start"; @@ -391,7 +391,7 @@ GenericError SaveStagesController::BuildFullPath() { dest_buf.resize(len); full_path_ = dir_path / dest_buf; - is_cloud_ = IsCloudPath(full_path_.string()); + return {}; } diff --git a/src/server/detail/save_stages_controller.h b/src/server/detail/save_stages_controller.h index 646fe25093dc..0b9db29c46dc 100644 --- a/src/server/detail/save_stages_controller.h +++ b/src/server/detail/save_stages_controller.h @@ -124,7 +124,6 @@ struct SaveStagesController : public SaveStagesInputs { private: time_t start_time_; std::filesystem::path full_path_; - bool is_cloud_; AggregateGenericError shared_err_; std::vector, std::filesystem::path>> snapshots_; diff --git a/src/server/detail/snapshot_storage.cc b/src/server/detail/snapshot_storage.cc index 454998eb56ab..dd5bb5445da7 100644 --- a/src/server/detail/snapshot_storage.cc +++ b/src/server/detail/snapshot_storage.cc @@ -41,11 +41,8 @@ constexpr string_view kSummarySuffix = "summary.dfs"sv; pair GetBucketPath(string_view path) { string_view clean = path; - if (absl::StartsWith(clean, kS3Prefix)) { - clean = absl::StripPrefix(clean, kS3Prefix); - } else { - clean = absl::StripPrefix(clean, kGCSPrefix); - } + auto prefix = absl::StartsWith(clean, kS3Prefix) ? kS3Prefix : kGCSPrefix; + clean = absl::StripPrefix(clean, prefix); size_t pos = clean.find('/'); if (pos == string_view::npos) { diff --git a/src/server/detail/snapshot_storage.h b/src/server/detail/snapshot_storage.h index e3aa4127ff6e..4fbb64626494 100644 --- a/src/server/detail/snapshot_storage.h +++ b/src/server/detail/snapshot_storage.h @@ -54,6 +54,10 @@ class SnapshotStorage { // Searches for all the relevant snapshot files given the RDB file or DFS summary file path. io::Result, GenericError> ExpandSnapshot(const std::string& load_path); + virtual bool IsCloud() const { + return false; + } + protected: virtual io::Result, GenericError> ExpandFromPath( const std::string& path) = 0; @@ -94,6 +98,10 @@ class GcsSnapshotStorage : public SnapshotStorage { io::Result LoadPath(std::string_view dir, std::string_view dbfilename) override; + bool IsCloud() const final { + return true; + } + private: io::Result, GenericError> ExpandFromPath(const std::string& path) final; @@ -117,6 +125,10 @@ class AwsS3SnapshotStorage : public SnapshotStorage { io::Result LoadPath(std::string_view dir, std::string_view dbfilename) override; + bool IsCloud() const final { + return true; + } + private: io::Result, GenericError> ExpandFromPath(const std::string& path) final; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index f0476f3d2959..dba0c92fccfe 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -228,7 +228,6 @@ using strings::HumanReadableNumBytes; namespace { const auto kRedisVersion = "6.2.11"; -constexpr string_view kS3Prefix = "s3://"sv; using EngineFunc = void (ServerFamily::*)(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); @@ -252,8 +251,12 @@ string UnknownCmd(string cmd, CmdArgList args) { absl::StrJoin(args.begin(), args.end(), ", ", CmdArgListFormatter())); } -bool IsCloudPath(string_view path) { - return absl::StartsWith(path, kS3Prefix); +bool IsS3Path(string_view path) { + return absl::StartsWith(path, detail::kS3Prefix); +} + +bool IsGCSPath(string_view path) { + return absl::StartsWith(path, detail::kGCSPrefix); } // Check that if TLS is used at least one form of client authentication is @@ -866,7 +869,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vectorpool()->GetNextProactor()->Await([&] { util::aws::Init(); }); snapshot_storage_ = std::make_shared( @@ -875,6 +878,14 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector(); + auto ec = shard_set->pool()->GetNextProactor()->Await([&] { return gcs->Init(3000); }); + if (ec) { + LOG(ERROR) << "Failed to initialize GCS snapshot storage: " << ec.message(); + exit(1); + } + snapshot_storage_ = std::move(gcs); } else if (fq_threadpool_) { snapshot_storage_ = std::make_shared(fq_threadpool_.get()); } else { @@ -1174,6 +1185,8 @@ void ServerFamily::SnapshotScheduling() { io::Result ServerFamily::LoadRdb(const std::string& rdb_file, LoadExistingKeys existing_keys) { + VLOG(1) << "Loading data from " << rdb_file; + error_code ec; io::ReadonlyFileOrError res = snapshot_storage_->OpenReadFile(rdb_file); if (res) { @@ -1635,6 +1648,8 @@ GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view bas "SAVING - can not save database"}; } + VLOG(1) << "Saving snapshot to " << basename; + save_controller_ = make_unique(detail::SaveStagesInputs{ new_version, basename, trans, &service_, fq_threadpool_.get(), snapshot_storage_});