From f6b9fa4f96005d19290fbba27b2f63248e6c58d0 Mon Sep 17 00:00:00 2001 From: wangyuan21 Date: Thu, 18 Mar 2021 18:54:37 +0800 Subject: [PATCH 01/11] Use checkpoint to implement data snapshot when full replication --- src/config.cc | 2 + src/config.h | 2 + src/redis_cmd.cc | 32 +++---- src/replication.cc | 96 ++++++++++++++++----- src/replication.h | 13 +-- src/server.cc | 17 ++++ src/storage.cc | 207 +++++++++++++++++++++++++++++++++++---------- src/storage.h | 33 ++++++-- 8 files changed, 305 insertions(+), 97 deletions(-) diff --git a/src/config.cc b/src/config.cc index b3befd317c8..050585ac59f 100644 --- a/src/config.cc +++ b/src/config.cc @@ -194,6 +194,8 @@ void Config::initFieldCallback() { db_dir = dir + "/db"; if (backup_dir.empty()) backup_dir = dir + "/backup"; if (log_dir.empty()) log_dir = dir; + checkpoint_dir = dir + "/checkpoint"; + sync_checkpoint_dir = dir + "/sync_checkpoint"; return Status::OK(); }}, {"bind", [this](Server* srv, const std::string &k, const std::string& v)->Status { diff --git a/src/config.h b/src/config.h index 79d509765b6..6357cccbe0e 100644 --- a/src/config.h +++ b/src/config.h @@ -68,6 +68,8 @@ struct Config{ std::string dir; std::string db_dir; std::string backup_dir; + std::string checkpoint_dir; + std::string sync_checkpoint_dir; std::string log_dir; std::string pidfile; std::string db_name; diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc index c7c118aaaa9..2edd8600b05 100644 --- a/src/redis_cmd.cc +++ b/src/redis_cmd.cc @@ -4068,31 +4068,26 @@ class CommandFetchMeta : public Commander { // Feed-replica-meta thread std::thread t = std::thread([svr, repl_fd, ip]() { - Util::ThreadSetName("feed-replica-meta"); - int fd; - uint64_t file_size; - rocksdb::BackupID meta_id; - auto s = Engine::Storage::BackupManager::OpenLatestMeta( - svr->storage_, &fd, &meta_id, &file_size); + Util::ThreadSetName("feed-replica-data-info"); + std::string files; + auto s = Engine::Storage::ReplDataManager::GetFullReplDataInfo( + svr->storage_, &files); if (!s.IsOK()) { - const char *message = "-ERR can't create db backup"; + const char *message = "-ERR can't create db checkpoint"; write(repl_fd, message, strlen(message)); - LOG(ERROR) << "[replication] Failed to open latest meta, err: " - << s.Msg(); + LOG(WARNING) << "[replication] Failed to get full data file info," + << " error: " << s.Msg(); close(repl_fd); return; } - // Send the meta ID, meta file size and content - if (Util::SockSend(repl_fd, std::to_string(meta_id)+CRLF).IsOK() && - Util::SockSend(repl_fd, std::to_string(file_size)+CRLF).IsOK() && - Util::SockSendFile(repl_fd, fd, file_size).IsOK()) { - LOG(INFO) << "[replication] Succeed sending backup meta " << meta_id - << " to " << ip; + // Send full data file info + if (Util::SockSend(repl_fd, files+CRLF).IsOK()) { + LOG(INFO) << "[replication] Succeed sending full data file info to " << ip; } else { - LOG(WARNING) << "[replication] Fail to send backup meta" << meta_id + LOG(WARNING) << "[replication] Fail to send full data file info " << ip << ", error: " << strerror(errno); } - close(fd); + svr->storage_->SetCheckpointAccessTime(std::time(nullptr)); close(repl_fd); }); t.detach(); @@ -4132,7 +4127,7 @@ class CommandFetchFile : public Commander { svr->GetFetchFileThreadNum(); } auto start = std::chrono::high_resolution_clock::now(); - auto fd = Engine::Storage::BackupManager::OpenDataFile(svr->storage_, + auto fd = Engine::Storage::ReplDataManager::OpenDataFile(svr->storage_, file, &file_size); if (fd < 0) break; @@ -4160,6 +4155,7 @@ class CommandFetchFile : public Commander { usleep(shortest - duration); } } + svr->storage_->SetCheckpointAccessTime(std::time(nullptr)); svr->DecrFetchFileThread(); close(repl_fd); }); diff --git a/src/replication.cc b/src/replication.cc index 7f6ab2285f2..3c55135f180 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -277,6 +277,12 @@ Status ReplicationThread::Start(std::function &&pre_fullsync_cb, pre_fullsync_cb_ = std::move(pre_fullsync_cb); post_fullsync_cb_ = std::move(post_fullsync_cb); + // Clean synced checkpoint fronm old master because replica starts to flow new master + auto s = rocksdb::DestroyDB(srv_->GetConfig()->sync_checkpoint_dir, rocksdb::Options()); + if (!s.ok()) { + LOG(WARNING) << "Can't clean synced checkpoint from master, error: " << s.ToString(); + } + // cleanup the old backups, so we can start replication in a clean state storage_->PurgeOldBackups(0, 0); @@ -532,6 +538,11 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, auto input = bufferevent_get_input(bev); switch (self->fullsync_state_) { case kFetchMetaID: + // New version master only sends meta file content + if (!self->srv_->GetConfig()->master_use_repl_port) { + self->fullsync_state_ = kFetchMetaContent; + return CBState::AGAIN; + } line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT); if (!line) return CBState::AGAIN; if (line[0] == '-') { @@ -566,17 +577,51 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, self->fullsync_state_ = kFetchMetaContent; LOG(INFO) << "[replication] Succeed fetching meta size: " << self->fullsync_filesize_; case kFetchMetaContent: - if (evbuffer_get_length(input) < self->fullsync_filesize_) { - return CBState::AGAIN; + std::string target_dir; + Engine::Storage::ReplDataManager::MetaInfo meta; + // Master using old version + if (self->srv_->GetConfig()->master_use_repl_port) { + if (evbuffer_get_length(input) < self->fullsync_filesize_) { + return CBState::AGAIN; + } + meta = Engine::Storage::ReplDataManager::ParseMetaAndSave( + self->storage_, self->fullsync_meta_id_, input); + target_dir = self->srv_->GetConfig()->backup_dir; + } else { + // Master using new version + line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT); + if (!line) return CBState::AGAIN; + if (line[0] == '-') { + LOG(ERROR) << "[replication] Failed to fetch meta info: " << line; + free(line); + return CBState::RESTART; + } + std::vector need_files; + Util::Split(std::string(line), ",", &need_files); + for (auto f : need_files) { + meta.files.emplace_back(f, 0); + } + target_dir = self->srv_->GetConfig()->sync_checkpoint_dir; + // Clean invaild files of checkpoint + auto s = Engine::Storage::ReplDataManager::CleanInvalidFiles( + self->storage_, target_dir, need_files); + if (!s.IsOK()) { + LOG(WARNING) << "[replication] Fail to clean up invalid files of old checkpoint," + << " error: " << s.Msg(); + LOG(WARNING) << "[replication] Try to clean all checkpoint files"; + auto s = rocksdb::DestroyDB(target_dir, rocksdb::Options()); + if (!s.ok()) { + LOG(WARNING) << "[replication] Fail to clean all checkpoint files, error: " + << s.ToString(); + } + } } - auto meta = Engine::Storage::BackupManager::ParseMetaAndSave( - self->storage_, self->fullsync_meta_id_, input); assert(evbuffer_get_length(input) == 0); self->fullsync_state_ = kFetchMetaID; - LOG(INFO) << "[replication] Succeeded fetching meta file, fetching files in parallel"; + LOG(INFO) << "[replication] Succeeded fetching full data files info, fetching files in parallel"; self->repl_state_ = kReplFetchSST; - auto s = self->parallelFetchFile(meta.files); + auto s = self->parallelFetchFile(target_dir, meta.files); if (!s.IsOK()) { LOG(ERROR) << "[replication] Failed to parallel fetch files while " + s.Msg(); return CBState::RESTART; @@ -585,7 +630,12 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, // Restore DB from backup self->pre_fullsync_cb_(); - s = self->storage_->RestoreFromBackup(); + // For old version, master uses rocksdb backup to implement data snapshot + if (self->srv_->GetConfig()->master_use_repl_port) { + s = self->storage_->RestoreFromBackup(); + } else { + s = self->storage_->RestoreFromCheckpoint(); + } if (!s.IsOK()) { LOG(ERROR) << "[replication] Failed to restore backup while " + s.Msg() + ", restart fullsync"; return CBState::RESTART; @@ -603,7 +653,8 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, return CBState::QUIT; } -Status ReplicationThread::parallelFetchFile(const std::vector> &files) { +Status ReplicationThread::parallelFetchFile(const std::string &dir, + std::vector> &files) { size_t concurrency = 1; if (files.size() > 20) { // Use 4 threads to download files in parallel @@ -614,7 +665,7 @@ Status ReplicationThread::parallelFetchFile(const std::vector> results; for (size_t tid = 0; tid < concurrency; ++tid) { results.push_back(std::async( - std::launch::async, [this, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status { + std::launch::async, [this, dir, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status { if (this->stop_flag_) { return Status(Status::NotOK, "replication thread was stopped"); } @@ -637,7 +688,7 @@ Status ReplicationThread::parallelFetchFile(const std::vectorstorage_, f_name, f_crc)) { + if (Engine::Storage::ReplDataManager::FileExists(this->storage_, dir, f_name, f_crc)) { skip_cnt.fetch_add(1); uint32_t cur_skip_cnt = skip_cnt.load(); uint32_t cur_fetch_cnt = fetch_cnt.load(); @@ -663,11 +714,13 @@ Status ReplicationThread::parallelFetchFile(const std::vectorGetConfig()->master_use_repl_port) { for (unsigned i = 0; i < fetch_files.size(); i++) { - s = this->fetchFiles(sock_fd, {fetch_files[i]}, {crcs[i]}, fn); + s = this->fetchFiles(sock_fd, dir, {fetch_files[i]}, {crcs[i]}, fn); if (!s.IsOK()) break; } } else { - if (!fetch_files.empty()) s = this->fetchFiles(sock_fd, fetch_files, crcs, fn); + if (!fetch_files.empty()) { + s = this->fetchFiles(sock_fd, dir, fetch_files, crcs, fn); + } } close(sock_fd); return s; @@ -711,8 +764,9 @@ Status ReplicationThread::sendAuth(int sock_fd) { return Status::OK(); } -Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, - std::string file, uint32_t crc, fetch_file_callback fn) { +Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, + const std::string &dir, std::string file, + uint32_t crc, fetch_file_callback fn) { size_t line_len, file_size; // Read file size line @@ -735,7 +789,7 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, } // Write to tmp file - auto tmp_file = Engine::Storage::BackupManager::NewTmpFile(storage_, file); + auto tmp_file = Engine::Storage::ReplDataManager::NewTmpFile(storage_, dir, file); if (!tmp_file) { return Status(Status::NotOK, "unable to create tmp file"); } @@ -759,13 +813,14 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, } } } - if (crc != tmp_crc) { + // Verify file crc checksum if crc is not 0 + if (crc && crc != tmp_crc) { char err_buf[64]; snprintf(err_buf, sizeof(err_buf), "CRC mismatched, %u was expected but got %u", crc, tmp_crc); return Status(Status::NotOK, err_buf); } // File is OK, rename to formal name - auto s = Engine::Storage::BackupManager::SwapTmpFile(storage_, file); + auto s = Engine::Storage::ReplDataManager::SwapTmpFile(storage_, dir, file); if (!s.IsOK()) return s; // Call fetch file callback function @@ -773,8 +828,9 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, return Status::OK(); } -Status ReplicationThread::fetchFiles(int sock_fd, const std::vector &files, - const std::vector &crcs, fetch_file_callback fn) { +Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, + const std::vector &files, const std::vector &crcs, + fetch_file_callback fn) { std::string files_str; for (auto file : files) { files_str += file; @@ -789,7 +845,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::vector evbuffer *evbuf = evbuffer_new(); for (unsigned i = 0; i < files.size(); i++) { DLOG(INFO) << "[fetch] Start to fetch file " << files[i]; - s = fetchFile(sock_fd, evbuf, files[i], crcs[i], fn); + s = fetchFile(sock_fd, evbuf, dir, files[i], crcs[i], fn); if (!s.IsOK()) { s = Status(Status::NotOK, "fetch file err: " + s.Msg()); LOG(WARNING) << "[fetch] Fail to fetch file " << files[i] << ", err: " << s.Msg(); diff --git a/src/replication.h b/src/replication.h index 1beaf0b4867..3f5d6a17980 100644 --- a/src/replication.h +++ b/src/replication.h @@ -163,11 +163,14 @@ class ReplicationThread { // Synchronized-Blocking ops Status sendAuth(int sock_fd); - Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string file, - uint32_t crc, fetch_file_callback fn); - Status fetchFiles(int sock_fd, const std::vector &files, - const std::vector &crcs, fetch_file_callback fn); - Status parallelFetchFile(const std::vector> &files); + Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, + const std::string file, uint32_t crc, fetch_file_callback fn); + Status fetchFiles(int sock_fd, const std::string &dir, + const std::vector &files, + const std::vector &crcs, + fetch_file_callback fn); + Status parallelFetchFile(const std::string &dir, + std::vector> &files); static bool isRestoringError(const char *err); static void EventTimerCB(int, int16_t, void *ctx); diff --git a/src/server.cc b/src/server.cc index 28b7d17228e..aa88840e52d 100644 --- a/src/server.cc +++ b/src/server.cc @@ -501,6 +501,23 @@ void Server::cron() { Status s = dynamicResizeBlockAndSST(); LOG(INFO) << "[server] Schedule to dynamic resize block and sst, result: " << s.Msg(); } + + // No replica uses this checkpoint, we can remove it. + if (counter != 0 && counter % 10 == 0) { + time_t create_time = storage_->GetCheckpointCreateTime(); + time_t access_time = storage_->GetCheckpointAccessTime(); + + if (storage_->ExistCheckpoint()) { + // TODO: support to config the alive time of checkpoint + if ((GetFetchFileThreadNum() == 0 && std::time(nullptr) - access_time > 30) || + (std::time(nullptr) - create_time > 24 * 60 * 60)) { + auto s = rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options()); + if (!s.ok()) { + LOG(WARNING) << "Fail to clean checkpoint, error: " << s.ToString(); + } + } + } + } cleanupExitedSlaves(); counter++; std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/src/storage.cc b/src/storage.cc index 91ebebddaa1..9a14978a765 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include "config.h" #include "redis_db.h" @@ -314,6 +315,62 @@ Status Storage::RestoreFromBackup() { return s.ok() ? Status::OK() : Status(Status::DBBackupErr, s.ToString()); } +Status Storage::Reopen() { + auto s = Open(); + if (!s.IsOK()) { + LOG(ERROR) << "[storage] Fail to reopen db, error: " << s.Msg(); + LOG(ERROR) << "[storage] Exiting..."; + exit(1); + } + LOG(INFO) << "[storage] Succeed reopening db"; + return Status::OK(); +} + +Status Storage::RestoreFromCheckpoint() { + std::string dir = config_->sync_checkpoint_dir; + std::string tmp_dir = config_->db_dir + ".tmp"; + + // Clean old backups and checkpoints because server will work on the new db + PurgeOldBackups(0, 0); + rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options()); + + // Close db + CloseDB(); + + // Rename db dir to tmp, so we can restore if replica fails to load + // the checkpoint from master. + // But only try best effort to make data safe + auto s = backup_env_->RenameFile(config_->db_dir, tmp_dir); + if (!s.ok()) { + Reopen(); + return Status(Status::NotOK, "Fail to rename db dir, error: " + s.ToString()); + } + + // Rename checkpoint dir to db dir + if (!(s = backup_env_->RenameFile(dir, config_->db_dir)).ok()) { + backup_env_->RenameFile(tmp_dir, config_->db_dir); + Reopen(); + return Status(Status::NotOK, "Fail to rename checkpoint dir, error: " + s.ToString()); + } + + // Open the new db, restore if replica fails to open db + auto s2 = Open(); + if (!s2.IsOK()) { + LOG(WARNING) << "[storage] Fail to open master checkpoint, error: " << s2.Msg(); + rocksdb::DestroyDB(config_->db_dir, rocksdb::Options()); + backup_env_->RenameFile(tmp_dir, config_->db_dir); + Reopen(); + return Status(Status::DBOpenErr, + "Fail to open master checkpoint, error: " + s2.Msg()); + } + + // Destory origin db + if (!(s = rocksdb::DestroyDB(tmp_dir, rocksdb::Options())).ok()) { + LOG(WARNING) << "[storage] Fail to destroy " << tmp_dir << ", error:" << s.ToString(); + } + return Status::OK(); +} + void Storage::PurgeOldBackups(uint32_t num_backups_to_keep, uint32_t backup_max_keep_hours) { time_t now = time(nullptr); std::vector backup_infos; @@ -538,36 +595,92 @@ Status Storage::DecrDBRefs() { return Status::OK(); } -Status Storage::BackupManager::OpenLatestMeta(Storage *storage, - int *fd, - rocksdb::BackupID *meta_id, - uint64_t *file_size) { - Status status = storage->CreateBackup(); - if (!status.IsOK()) return status; - std::vector backup_infos; - storage->backup_->GetBackupInfo(&backup_infos); - auto latest_backup = backup_infos.back(); - rocksdb::Status r_status = storage->backup_->VerifyBackup(latest_backup.backup_id); - if (!r_status.ok()) { - return Status(Status::NotOK, r_status.ToString()); - } - *meta_id = latest_backup.backup_id; - std::string meta_file = - storage->config_->backup_dir + "/meta/" + std::to_string(*meta_id); - auto s = storage->backup_env_->FileExists(meta_file); - storage->backup_env_->GetFileSize(meta_file, file_size); - // NOTE: here we use the system's open instead of using rocksdb::Env to open - // a sequential file, because we want to use sendfile syscall. - *fd = open(meta_file.c_str(), O_RDONLY); - if (*fd < 0) { - return Status(Status::NotOK, strerror(errno)); +Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::string *files) { + std::string data_files_dir = storage->config_->checkpoint_dir; + + // Create checkpoint if not exist + if (!storage->backup_env_->FileExists(data_files_dir).ok()) { + rocksdb::Checkpoint* checkpoint = NULL; + rocksdb::Status s = rocksdb::Checkpoint::Create(storage->db_, &checkpoint); + if (!s.ok()) { + LOG(WARNING) << "Fail to create checkpoint, error:" << s.ToString(); + return Status(Status::NotOK, s.ToString()); + } + + // Set checkpoint time + storage->SetCheckpointCreateTime(std::time(nullptr)); + storage->SetCheckpointAccessTime(std::time(nullptr)); + + // Create checkpoint of rocksdb + std::unique_ptr checkpoint_guard(checkpoint); + s = checkpoint->CreateCheckpoint(data_files_dir); + if (!s.ok()) { + LOG(WARNING) << "ail to create checkpoint, error:" << s.ToString(); + return Status(Status::NotOK, s.ToString()); + } + } else { + // Replicas can share checkpiont to replication if the checkpoint existing + // time is less half of WAL ttl. + int64_t can_shared_time = storage->config_->RocksDB.WAL_ttl_seconds / 2; + if (can_shared_time < 10 * 60) can_shared_time = 10 * 60; + if (std::time(nullptr) - storage->GetCheckpointCreateTime() > can_shared_time) { + LOG(WARNING) << "Can't use current checkpoint, waiting next checkpoint"; + return Status(Status::NotOK, "Can't use current checkpoint, waiting next checkpoint"); + } } + + // Get checkpoint file list + std::vector result; + storage->backup_env_->GetChildren(data_files_dir, &result); + for (auto f : result) { + if (f == "." || f == "..") continue; + files->append(f); + files->push_back(','); + } + files->pop_back(); return Status::OK(); } -int Storage::BackupManager::OpenDataFile(Storage *storage, const std::string &rel_path, - uint64_t *file_size) { - std::string abs_path = storage->config_->backup_dir + "/" + rel_path; +Status Storage::ReplDataManager::CleanInvalidFiles(Storage *storage, + const std::string &dir, std::vector &valid_files) { + if (!storage->backup_env_->FileExists(dir).ok()) { + return Status::OK(); + } + + std::vector tmp_files, files; + storage->backup_env_->GetChildren(dir, &tmp_files); + for (auto file : tmp_files) { + if (file == "." || file == "..") continue; + files.push_back(file); + } + + // Find invalid files + std::sort(files.begin(), files.end()); + std::sort(valid_files.begin(), valid_files.end()); + std::vector invalid_files(files.size() + valid_files.size()); + auto it = std::set_difference(files.begin(), files.end(), + valid_files.begin(), valid_files.end(), invalid_files.begin()); + + // Delete invalid files + Status ret; + invalid_files.resize(it - invalid_files.begin()); + for (it = invalid_files.begin(); it != invalid_files.end(); ++it) { + auto s = storage->backup_env_->DeleteFile(dir + *it); + if (!s.ok()) { + ret = Status(Status::NotOK, s.ToString()); + LOG(INFO) << "[storage] Fail to delete invalid file " + << *it << " of master checkpoint"; + } else { + LOG(INFO) << "[storage] Succeed deleting invalid file " + << *it << " of master checkpoint"; + } + } + return ret; +} + +int Storage::ReplDataManager::OpenDataFile(Storage *storage, + const std::string &repl_file,uint64_t *file_size) { + std::string abs_path = storage->config_->checkpoint_dir + "/" + repl_file; auto s = storage->backup_env_->FileExists(abs_path); if (!s.ok()) { LOG(ERROR) << "[storage] Data file [" << abs_path << "] not found"; @@ -581,16 +694,16 @@ int Storage::BackupManager::OpenDataFile(Storage *storage, const std::string &re return rv; } -Storage::BackupManager::MetaInfo Storage::BackupManager::ParseMetaAndSave( +Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave( Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf) { char *line; size_t len; - Storage::BackupManager::MetaInfo meta; + Storage::ReplDataManager::MetaInfo meta; auto meta_file = "meta/" + std::to_string(meta_id); DLOG(INFO) << "[meta] id: " << meta_id; // Save the meta to tmp file - auto wf = NewTmpFile(storage, meta_file); + auto wf = NewTmpFile(storage, storage->config_->backup_dir, meta_file); auto data = evbuffer_pullup(evbuf, -1); wf->Append(rocksdb::Slice(reinterpret_cast(data), evbuffer_get_length(evbuf))); @@ -631,7 +744,7 @@ Storage::BackupManager::MetaInfo Storage::BackupManager::ParseMetaAndSave( meta.files.emplace_back(filename, crc32); free(line); } - SwapTmpFile(storage, meta_file); + SwapTmpFile(storage, storage->config_->backup_dir, meta_file); return meta; } @@ -651,21 +764,21 @@ Status MkdirRecursively(rocksdb::Env *env, const std::string &dir) { return Status(Status::NotOK); } -std::unique_ptr Storage::BackupManager::NewTmpFile( - Storage *storage, const std::string &rel_path) { - std::string tmp_path = storage->config_->backup_dir + "/" + rel_path + ".tmp"; - auto s = storage->backup_env_->FileExists(tmp_path); +std::unique_ptr Storage::ReplDataManager::NewTmpFile( + Storage *storage, const std::string &dir, const std::string &repl_file) { + std::string tmp_file = dir + "/" + repl_file + ".tmp"; + auto s = storage->backup_env_->FileExists(tmp_file); if (s.ok()) { LOG(ERROR) << "[storage] Data file exists, override"; - storage->backup_env_->DeleteFile(tmp_path); + storage->backup_env_->DeleteFile(tmp_file); } // Create directory if missing - auto abs_dir = tmp_path.substr(0, tmp_path.rfind('/')); + auto abs_dir = tmp_file.substr(0, tmp_file.rfind('/')); if (!MkdirRecursively(storage->backup_env_, abs_dir).IsOK()) { return nullptr; } std::unique_ptr wf; - s = storage->backup_env_->NewWritableFile(tmp_path, &wf, rocksdb::EnvOptions()); + s = storage->backup_env_->NewWritableFile(tmp_file, &wf, rocksdb::EnvOptions()); if (!s.ok()) { LOG(ERROR) << "[storage] Failed to create data file: " << s.ToString(); return nullptr; @@ -673,23 +786,27 @@ std::unique_ptr Storage::BackupManager::NewTmpFile( return wf; } -Status Storage::BackupManager::SwapTmpFile(Storage *storage, - const std::string &rel_path) { - std::string tmp_path = storage->config_->backup_dir + "/" + rel_path + ".tmp"; - std::string orig_path = storage->config_->backup_dir + "/" + rel_path; - if (!storage->backup_env_->RenameFile(tmp_path, orig_path).ok()) { - return Status(Status::NotOK, "unable to rename: "+tmp_path); +Status Storage::ReplDataManager::SwapTmpFile(Storage *storage, + const std::string &dir, const std::string &repl_file) { + std::string tmp_file = dir + "/" + repl_file + ".tmp"; + std::string orig_file = dir + "/" + repl_file; + if (!storage->backup_env_->RenameFile(tmp_file, orig_file).ok()) { + return Status(Status::NotOK, "unable to rename: "+tmp_file); } return Status::OK(); } -bool Storage::BackupManager::FileExists(Storage *storage, const std::string &rel_path, uint32_t crc) { +bool Storage::ReplDataManager::FileExists(Storage *storage, const std::string &dir, + const std::string &repl_file, uint32_t crc) { if (storage->IsClosing()) return false; - auto file_path = storage->config_->backup_dir + "/" + rel_path; + auto file_path = dir + "/" + repl_file; auto s = storage->backup_env_->FileExists(file_path); if (!s.ok()) return false; + // If crc is 0, we needn't verify, return true directly. + if (crc == 0) return true; + std::unique_ptr src_file; const rocksdb::EnvOptions soptions; s = storage->GetDB()->GetEnv()->NewSequentialFile(file_path, &src_file, soptions); diff --git a/src/storage.h b/src/storage.h index d9f0cd95091..aff96ae9c78 100644 --- a/src/storage.h +++ b/src/storage.h @@ -37,6 +37,7 @@ class Storage { Status Open(bool read_only); Status Open(); + Status Reopen(); Status OpenForReadOnly(); void CloseDB(); void InitOptions(rocksdb::Options *options); @@ -47,6 +48,7 @@ class Storage { Status CreateBackup(); Status DestroyBackup(); Status RestoreFromBackup(); + Status RestoreFromCheckpoint(); Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr *iter); Status WriteBatch(std::string &&raw_batch); @@ -82,15 +84,19 @@ class Storage { Storage(const Storage &) = delete; Storage &operator=(const Storage &) = delete; - class BackupManager { + // Full replication data files manager + class ReplDataManager { public: // Master side - static Status OpenLatestMeta(Storage *storage, - int *fd, - rocksdb::BackupID *meta_id, - uint64_t *file_size); - static int OpenDataFile(Storage *storage, const std::string &rel_path, + static Status GetFullReplDataInfo(Storage *storage, std::string *files); + static int OpenDataFile(Storage *storage, const std::string &rel_file, uint64_t *file_size); + static Status CleanInvalidFiles(Storage *storage, + const std::string &dir, std::vector &valid_files); + struct CheckpointInfo { + std::atomic create_time; + std::atomic access_time; + }; // Slave side struct MetaInfo { @@ -104,11 +110,19 @@ class Storage { rocksdb::BackupID meta_id, evbuffer *evbuf); static std::unique_ptr NewTmpFile( - Storage *storage, const std::string &rel_path); - static Status SwapTmpFile(Storage *storage, const std::string &rel_path); - static bool FileExists(Storage *storage, const std::string &rel_path, uint32_t crc); + Storage *storage, const std::string &dir, const std::string &repl_file); + static Status SwapTmpFile(Storage *storage, const std::string &dir, + const std::string &repl_file); + static bool FileExists(Storage *storage, const std::string &dir, + const std::string &repl_file, uint32_t crc); }; + bool ExistCheckpoint() { return backup_env_->FileExists(config_->checkpoint_dir).ok(); } + void SetCheckpointCreateTime(time_t t) { checkpoint_info_.create_time = t; } + time_t GetCheckpointCreateTime() { return checkpoint_info_.create_time; } + void SetCheckpointAccessTime(time_t t) { checkpoint_info_.access_time = t; } + time_t GetCheckpointAccessTime() { return checkpoint_info_.access_time; } + private: rocksdb::DB *db_ = nullptr; std::mutex backup_mu_; @@ -116,6 +130,7 @@ class Storage { rocksdb::Env *backup_env_; std::shared_ptr sst_file_manager_; std::shared_ptr rate_limiter_; + ReplDataManager::CheckpointInfo checkpoint_info_; Config *config_ = nullptr; std::vector cf_handles_; LockManager lock_mgr_; From 44dcd1d17a9b2e19cb582160c36eeca0e49c95f0 Mon Sep 17 00:00:00 2001 From: wangyuan21 Date: Thu, 18 Mar 2021 19:26:44 +0800 Subject: [PATCH 02/11] Fix code styles --- src/redis_cmd.cc | 2 +- src/replication.cc | 4 ++-- src/replication.h | 2 +- src/server.cc | 2 +- src/storage.cc | 4 ++-- src/storage.h | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc index 2edd8600b05..d244c16ffb3 100644 --- a/src/redis_cmd.cc +++ b/src/redis_cmd.cc @@ -4076,7 +4076,7 @@ class CommandFetchMeta : public Commander { const char *message = "-ERR can't create db checkpoint"; write(repl_fd, message, strlen(message)); LOG(WARNING) << "[replication] Failed to get full data file info," - << " error: " << s.Msg(); + << " error: " << s.Msg(); close(repl_fd); return; } diff --git a/src/replication.cc b/src/replication.cc index 3c55135f180..1ced8346302 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -277,7 +277,7 @@ Status ReplicationThread::Start(std::function &&pre_fullsync_cb, pre_fullsync_cb_ = std::move(pre_fullsync_cb); post_fullsync_cb_ = std::move(post_fullsync_cb); - // Clean synced checkpoint fronm old master because replica starts to flow new master + // Clean synced checkpoint fronm old master because replica starts to follow new master auto s = rocksdb::DestroyDB(srv_->GetConfig()->sync_checkpoint_dir, rocksdb::Options()); if (!s.ok()) { LOG(WARNING) << "Can't clean synced checkpoint from master, error: " << s.ToString(); @@ -654,7 +654,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, } Status ReplicationThread::parallelFetchFile(const std::string &dir, - std::vector> &files) { + const std::vector> &files) { size_t concurrency = 1; if (files.size() > 20) { // Use 4 threads to download files in parallel diff --git a/src/replication.h b/src/replication.h index 3f5d6a17980..40704547dee 100644 --- a/src/replication.h +++ b/src/replication.h @@ -170,7 +170,7 @@ class ReplicationThread { const std::vector &crcs, fetch_file_callback fn); Status parallelFetchFile(const std::string &dir, - std::vector> &files); + const std::vector> &files); static bool isRestoringError(const char *err); static void EventTimerCB(int, int16_t, void *ctx); diff --git a/src/server.cc b/src/server.cc index aa88840e52d..a311713f8ce 100644 --- a/src/server.cc +++ b/src/server.cc @@ -508,7 +508,7 @@ void Server::cron() { time_t access_time = storage_->GetCheckpointAccessTime(); if (storage_->ExistCheckpoint()) { - // TODO: support to config the alive time of checkpoint + // TODO(shooterit): support to config the alive time of checkpoint if ((GetFetchFileThreadNum() == 0 && std::time(nullptr) - access_time > 30) || (std::time(nullptr) - create_time > 24 * 60 * 60)) { auto s = rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options()); diff --git a/src/storage.cc b/src/storage.cc index 9a14978a765..87b51fbd5d6 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -642,7 +642,7 @@ Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri } Status Storage::ReplDataManager::CleanInvalidFiles(Storage *storage, - const std::string &dir, std::vector &valid_files) { + const std::string &dir, std::vector valid_files) { if (!storage->backup_env_->FileExists(dir).ok()) { return Status::OK(); } @@ -679,7 +679,7 @@ Status Storage::ReplDataManager::CleanInvalidFiles(Storage *storage, } int Storage::ReplDataManager::OpenDataFile(Storage *storage, - const std::string &repl_file,uint64_t *file_size) { + const std::string &repl_file, uint64_t *file_size) { std::string abs_path = storage->config_->checkpoint_dir + "/" + repl_file; auto s = storage->backup_env_->FileExists(abs_path); if (!s.ok()) { diff --git a/src/storage.h b/src/storage.h index aff96ae9c78..a0caaf9baca 100644 --- a/src/storage.h +++ b/src/storage.h @@ -92,7 +92,7 @@ class Storage { static int OpenDataFile(Storage *storage, const std::string &rel_file, uint64_t *file_size); static Status CleanInvalidFiles(Storage *storage, - const std::string &dir, std::vector &valid_files); + const std::string &dir, std::vector valid_files); struct CheckpointInfo { std::atomic create_time; std::atomic access_time; From 5abfe12d937cc82b44ff70d700921108358d23a1 Mon Sep 17 00:00:00 2001 From: wangyuan21 Date: Thu, 18 Mar 2021 22:13:28 +0800 Subject: [PATCH 03/11] Reset checkpoint time, fix invalid file path --- src/storage.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/storage.cc b/src/storage.cc index 87b51fbd5d6..5099d7f4c0c 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -44,6 +44,8 @@ Storage::Storage(Config *config) lock_mgr_(16) { InitCRC32Table(); Metadata::InitVersionCounter(); + SetCheckpointCreateTime(0); + SetCheckpointAccessTime(0); } Storage::~Storage() { @@ -665,7 +667,7 @@ Status Storage::ReplDataManager::CleanInvalidFiles(Storage *storage, Status ret; invalid_files.resize(it - invalid_files.begin()); for (it = invalid_files.begin(); it != invalid_files.end(); ++it) { - auto s = storage->backup_env_->DeleteFile(dir + *it); + auto s = storage->backup_env_->DeleteFile(dir + "/" + *it); if (!s.ok()) { ret = Status(Status::NotOK, s.ToString()); LOG(INFO) << "[storage] Fail to delete invalid file " From a42bb154f3784170701b428ecfae62ea5325dc8e Mon Sep 17 00:00:00 2001 From: wangyuan21 Date: Thu, 18 Mar 2021 22:19:21 +0800 Subject: [PATCH 04/11] Set max share checkpoint time --- src/storage.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage.cc b/src/storage.cc index 5099d7f4c0c..2e759cb1fc5 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -624,6 +624,7 @@ Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri // Replicas can share checkpiont to replication if the checkpoint existing // time is less half of WAL ttl. int64_t can_shared_time = storage->config_->RocksDB.WAL_ttl_seconds / 2; + if (can_shared_time > 60 * 60) can_shared_time = 60 * 60; if (can_shared_time < 10 * 60) can_shared_time = 10 * 60; if (std::time(nullptr) - storage->GetCheckpointCreateTime() > can_shared_time) { LOG(WARNING) << "Can't use current checkpoint, waiting next checkpoint"; From a358ddd6efba881e7025aa4a2e0af1778fde1d6e Mon Sep 17 00:00:00 2001 From: wangyuan21 Date: Fri, 19 Mar 2021 13:17:56 +0800 Subject: [PATCH 05/11] Fix typo --- src/storage.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage.cc b/src/storage.cc index 2e759cb1fc5..91bab3737c3 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -617,7 +617,7 @@ Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri std::unique_ptr checkpoint_guard(checkpoint); s = checkpoint->CreateCheckpoint(data_files_dir); if (!s.ok()) { - LOG(WARNING) << "ail to create checkpoint, error:" << s.ToString(); + LOG(WARNING) << "Fail to create checkpoint, error:" << s.ToString(); return Status(Status::NotOK, s.ToString()); } } else { From ff4187b0662274076e2bf0f140242811ff048ca0 Mon Sep 17 00:00:00 2001 From: wangyuan21 Date: Fri, 19 Mar 2021 16:39:15 +0800 Subject: [PATCH 06/11] Guarantee kvrocks doesn't delete checkpoint that is creating --- src/server.cc | 5 ++++- src/storage.cc | 11 ++++++----- src/storage.h | 3 +++ 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/server.cc b/src/server.cc index a311713f8ce..58317b8293e 100644 --- a/src/server.cc +++ b/src/server.cc @@ -507,7 +507,10 @@ void Server::cron() { time_t create_time = storage_->GetCheckpointCreateTime(); time_t access_time = storage_->GetCheckpointAccessTime(); - if (storage_->ExistCheckpoint()) { + // Maybe creating checkpoint costs much time if target dir is on another + // disk partition, so when we want to clean up checkpoint, we should guarantee + // that kvrocks is not creating checkpoint even if there is a checkpoint. + if (storage_->ExistCheckpoint() && storage_->IsCreatingCheckpoint() == false) { // TODO(shooterit): support to config the alive time of checkpoint if ((GetFetchFileThreadNum() == 0 && std::time(nullptr) - access_time > 30) || (std::time(nullptr) - create_time > 24 * 60 * 60)) { diff --git a/src/storage.cc b/src/storage.cc index 91bab3737c3..7a7b5f7cce3 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -44,6 +44,7 @@ Storage::Storage(Config *config) lock_mgr_(16) { InitCRC32Table(); Metadata::InitVersionCounter(); + SetCreatingCheckpoint(false); SetCheckpointCreateTime(0); SetCheckpointAccessTime(0); } @@ -608,14 +609,14 @@ Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri LOG(WARNING) << "Fail to create checkpoint, error:" << s.ToString(); return Status(Status::NotOK, s.ToString()); } - - // Set checkpoint time - storage->SetCheckpointCreateTime(std::time(nullptr)); - storage->SetCheckpointAccessTime(std::time(nullptr)); + std::unique_ptr checkpoint_guard(checkpoint); // Create checkpoint of rocksdb - std::unique_ptr checkpoint_guard(checkpoint); + storage->SetCreatingCheckpoint(true); s = checkpoint->CreateCheckpoint(data_files_dir); + storage->SetCheckpointCreateTime(std::time(nullptr)); + storage->SetCheckpointAccessTime(std::time(nullptr)); + storage->SetCreatingCheckpoint(false); if (!s.ok()) { LOG(WARNING) << "Fail to create checkpoint, error:" << s.ToString(); return Status(Status::NotOK, s.ToString()); diff --git a/src/storage.h b/src/storage.h index a0caaf9baca..0aeb39dcfd0 100644 --- a/src/storage.h +++ b/src/storage.h @@ -94,6 +94,7 @@ class Storage { static Status CleanInvalidFiles(Storage *storage, const std::string &dir, std::vector valid_files); struct CheckpointInfo { + std::atomic is_creating; std::atomic create_time; std::atomic access_time; }; @@ -118,6 +119,8 @@ class Storage { }; bool ExistCheckpoint() { return backup_env_->FileExists(config_->checkpoint_dir).ok(); } + void SetCreatingCheckpoint(bool yes_or_no) { checkpoint_info_.is_creating = yes_or_no; } + bool IsCreatingCheckpoint() { return checkpoint_info_.is_creating; } void SetCheckpointCreateTime(time_t t) { checkpoint_info_.create_time = t; } time_t GetCheckpointCreateTime() { return checkpoint_info_.create_time; } void SetCheckpointAccessTime(time_t t) { checkpoint_info_.access_time = t; } From a84174ad6036efc2936f59ac38bc8087911b057b Mon Sep 17 00:00:00 2001 From: wangyuan21 Date: Fri, 19 Mar 2021 19:17:25 +0800 Subject: [PATCH 07/11] Restart full sync if failing to reopen db --- src/replication.cc | 2 +- src/storage.cc | 17 +++-------------- src/storage.h | 1 - 3 files changed, 4 insertions(+), 16 deletions(-) diff --git a/src/replication.cc b/src/replication.cc index 1ced8346302..716872f8308 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -277,7 +277,7 @@ Status ReplicationThread::Start(std::function &&pre_fullsync_cb, pre_fullsync_cb_ = std::move(pre_fullsync_cb); post_fullsync_cb_ = std::move(post_fullsync_cb); - // Clean synced checkpoint fronm old master because replica starts to follow new master + // Clean synced checkpoint from old master because replica starts to follow new master auto s = rocksdb::DestroyDB(srv_->GetConfig()->sync_checkpoint_dir, rocksdb::Options()); if (!s.ok()) { LOG(WARNING) << "Can't clean synced checkpoint from master, error: " << s.ToString(); diff --git a/src/storage.cc b/src/storage.cc index 7a7b5f7cce3..552bf52bd5a 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -318,17 +318,6 @@ Status Storage::RestoreFromBackup() { return s.ok() ? Status::OK() : Status(Status::DBBackupErr, s.ToString()); } -Status Storage::Reopen() { - auto s = Open(); - if (!s.IsOK()) { - LOG(ERROR) << "[storage] Fail to reopen db, error: " << s.Msg(); - LOG(ERROR) << "[storage] Exiting..."; - exit(1); - } - LOG(INFO) << "[storage] Succeed reopening db"; - return Status::OK(); -} - Status Storage::RestoreFromCheckpoint() { std::string dir = config_->sync_checkpoint_dir; std::string tmp_dir = config_->db_dir + ".tmp"; @@ -345,14 +334,14 @@ Status Storage::RestoreFromCheckpoint() { // But only try best effort to make data safe auto s = backup_env_->RenameFile(config_->db_dir, tmp_dir); if (!s.ok()) { - Reopen(); + if (!Open().IsOK()) LOG(ERROR) << "[storage] Fail to reopen db"; return Status(Status::NotOK, "Fail to rename db dir, error: " + s.ToString()); } // Rename checkpoint dir to db dir if (!(s = backup_env_->RenameFile(dir, config_->db_dir)).ok()) { backup_env_->RenameFile(tmp_dir, config_->db_dir); - Reopen(); + if (!Open().IsOK()) LOG(ERROR) << "[storage] Fail to reopen db"; return Status(Status::NotOK, "Fail to rename checkpoint dir, error: " + s.ToString()); } @@ -362,7 +351,7 @@ Status Storage::RestoreFromCheckpoint() { LOG(WARNING) << "[storage] Fail to open master checkpoint, error: " << s2.Msg(); rocksdb::DestroyDB(config_->db_dir, rocksdb::Options()); backup_env_->RenameFile(tmp_dir, config_->db_dir); - Reopen(); + if (!Open().IsOK()) LOG(ERROR) << "[storage] Fail to reopen db"; return Status(Status::DBOpenErr, "Fail to open master checkpoint, error: " + s2.Msg()); } diff --git a/src/storage.h b/src/storage.h index 0aeb39dcfd0..a55f1b11890 100644 --- a/src/storage.h +++ b/src/storage.h @@ -37,7 +37,6 @@ class Storage { Status Open(bool read_only); Status Open(); - Status Reopen(); Status OpenForReadOnly(); void CloseDB(); void InitOptions(rocksdb::Options *options); From a0c6b18530114890e1c3f7750ca503a706322f1b Mon Sep 17 00:00:00 2001 From: Wang Yuan Date: Sat, 20 Mar 2021 21:29:08 +0800 Subject: [PATCH 08/11] Apply suggestions from code review Co-authored-by: hulk --- src/replication.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/replication.cc b/src/replication.cc index 716872f8308..97213eae76f 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -606,12 +606,12 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, auto s = Engine::Storage::ReplDataManager::CleanInvalidFiles( self->storage_, target_dir, need_files); if (!s.IsOK()) { - LOG(WARNING) << "[replication] Fail to clean up invalid files of old checkpoint," + LOG(WARNING) << "[replication] Failed to clean up invalid files of the old checkpoint," << " error: " << s.Msg(); LOG(WARNING) << "[replication] Try to clean all checkpoint files"; auto s = rocksdb::DestroyDB(target_dir, rocksdb::Options()); if (!s.ok()) { - LOG(WARNING) << "[replication] Fail to clean all checkpoint files, error: " + LOG(WARNING) << "[replication] Failed to clean all checkpoint files, error: " << s.ToString(); } } From f3855a34182bcc8a5db133b3d819c0e95e6023b9 Mon Sep 17 00:00:00 2001 From: wangyuan21 Date: Sat, 20 Mar 2021 23:27:04 +0800 Subject: [PATCH 09/11] Fix multi slaves full sync condition race --- src/storage.cc | 10 ++++-- src/storage.h | 1 + tests/tcl/tests/integration/replication.tcl | 38 +++++++++++++++++++-- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/src/storage.cc b/src/storage.cc index 552bf52bd5a..f11deb4c29d 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -589,6 +589,7 @@ Status Storage::DecrDBRefs() { Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::string *files) { std::string data_files_dir = storage->config_->checkpoint_dir; + std::unique_lock ulm(storage->checkpoint_mu_); // Create checkpoint if not exist if (!storage->backup_env_->FileExists(data_files_dir).ok()) { @@ -607,9 +608,10 @@ Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri storage->SetCheckpointAccessTime(std::time(nullptr)); storage->SetCreatingCheckpoint(false); if (!s.ok()) { - LOG(WARNING) << "Fail to create checkpoint, error:" << s.ToString(); + LOG(WARNING) << "[storage] Fail to create checkpoint, error:" << s.ToString(); return Status(Status::NotOK, s.ToString()); } + LOG(INFO) << "[storage] Create checkpoint successfully"; } else { // Replicas can share checkpiont to replication if the checkpoint existing // time is less half of WAL ttl. @@ -617,10 +619,12 @@ Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri if (can_shared_time > 60 * 60) can_shared_time = 60 * 60; if (can_shared_time < 10 * 60) can_shared_time = 10 * 60; if (std::time(nullptr) - storage->GetCheckpointCreateTime() > can_shared_time) { - LOG(WARNING) << "Can't use current checkpoint, waiting next checkpoint"; - return Status(Status::NotOK, "Can't use current checkpoint, waiting next checkpoint"); + LOG(WARNING) << "[storage] Can't use current checkpoint, waiting next checkpoint"; + return Status(Status::NotOK, "Can't use current checkpoint, waiting for next checkpoint"); } + LOG(INFO) << "[storage] Use current existing checkpoint"; } + ulm.unlock(); // Get checkpoint file list std::vector result; diff --git a/src/storage.h b/src/storage.h index a55f1b11890..afcae392cfb 100644 --- a/src/storage.h +++ b/src/storage.h @@ -133,6 +133,7 @@ class Storage { std::shared_ptr sst_file_manager_; std::shared_ptr rate_limiter_; ReplDataManager::CheckpointInfo checkpoint_info_; + std::mutex checkpoint_mu_; Config *config_ = nullptr; std::vector cf_handles_; LockManager lock_mgr_; diff --git a/tests/tcl/tests/integration/replication.tcl b/tests/tcl/tests/integration/replication.tcl index cf8c0235284..7db4c1330f3 100644 --- a/tests/tcl/tests/integration/replication.tcl +++ b/tests/tcl/tests/integration/replication.tcl @@ -30,9 +30,7 @@ start_server {tags {"repl"}} { start_server {} { test {Second server should have role master at first} { # Can't statify partial replication - for {set i 0} {$i < 1000} {incr i} { - r set $i $i - } + populate 100 "" 10 s role } {master} @@ -94,3 +92,37 @@ start_server {tags {"repl"}} { } } } + +start_server {tags {"repl"}} { + set A [srv 0 client] + populate 100 "" 10 + + start_server {} { + set B [srv 0 client] + populate 100 "" 10 + + start_server {} { + set C [srv 0 client] + set C_host [srv 0 host] + set C_port [srv 0 port] + populate 50 "" 10 + + test {Multi slaves full sync with master at the same time} { + $A slaveof $C_host $C_port + $B slaveof $C_host $C_port + + # Wait for finishing full replication + wait_for_condition 500 100 { + [string match {*connected*} [$A role]] && + [string match {*connected*} [$B role]] + } else { + fail "Slaves can't sync with master" + } + after 50000 + # Only 2 full sync + assert_equal 2 [s sync_full] + } + } + + } +} From a1094fe630624144ff0a3099d527933bf0e4982b Mon Sep 17 00:00:00 2001 From: wangyuan21 Date: Sat, 20 Mar 2021 23:34:12 +0800 Subject: [PATCH 10/11] Remove debug time --- tests/tcl/tests/integration/replication.tcl | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/tcl/tests/integration/replication.tcl b/tests/tcl/tests/integration/replication.tcl index 7db4c1330f3..63ab60afb45 100644 --- a/tests/tcl/tests/integration/replication.tcl +++ b/tests/tcl/tests/integration/replication.tcl @@ -118,7 +118,6 @@ start_server {tags {"repl"}} { } else { fail "Slaves can't sync with master" } - after 50000 # Only 2 full sync assert_equal 2 [s sync_full] } From 9b72875bf46fa9d77f780284ce092a6479117e3e Mon Sep 17 00:00:00 2001 From: Wang Yuan Date: Sat, 20 Mar 2021 23:38:17 +0800 Subject: [PATCH 11/11] Remove empty line --- tests/tcl/tests/integration/replication.tcl | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/tcl/tests/integration/replication.tcl b/tests/tcl/tests/integration/replication.tcl index 63ab60afb45..c19d23a29b6 100644 --- a/tests/tcl/tests/integration/replication.tcl +++ b/tests/tcl/tests/integration/replication.tcl @@ -122,6 +122,5 @@ start_server {tags {"repl"}} { assert_equal 2 [s sync_full] } } - } }