Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use checkpoint to implement data snapshot when full replication #205

Merged
merged 11 commits into from
Mar 22, 2021
2 changes: 2 additions & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ void Config::initFieldCallback() {
db_dir = dir + "/db";
if (backup_dir.empty()) backup_dir = dir + "/backup";
if (log_dir.empty()) log_dir = dir;
checkpoint_dir = dir + "/checkpoint";
sync_checkpoint_dir = dir + "/sync_checkpoint";
return Status::OK();
}},
{"bind", [this](Server* srv, const std::string &k, const std::string& v)->Status {
Expand Down
2 changes: 2 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ struct Config{
std::string dir;
std::string db_dir;
std::string backup_dir;
std::string checkpoint_dir;
std::string sync_checkpoint_dir;
std::string log_dir;
std::string pidfile;
std::string db_name;
Expand Down
32 changes: 14 additions & 18 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4068,31 +4068,26 @@ class CommandFetchMeta : public Commander {

// Feed-replica-meta thread
std::thread t = std::thread([svr, repl_fd, ip]() {
Util::ThreadSetName("feed-replica-meta");
int fd;
uint64_t file_size;
rocksdb::BackupID meta_id;
auto s = Engine::Storage::BackupManager::OpenLatestMeta(
svr->storage_, &fd, &meta_id, &file_size);
Util::ThreadSetName("feed-replica-data-info");
std::string files;
auto s = Engine::Storage::ReplDataManager::GetFullReplDataInfo(
svr->storage_, &files);
if (!s.IsOK()) {
const char *message = "-ERR can't create db backup";
const char *message = "-ERR can't create db checkpoint";
write(repl_fd, message, strlen(message));
LOG(ERROR) << "[replication] Failed to open latest meta, err: "
<< s.Msg();
LOG(WARNING) << "[replication] Failed to get full data file info,"
<< " error: " << s.Msg();
close(repl_fd);
return;
}
// Send the meta ID, meta file size and content
if (Util::SockSend(repl_fd, std::to_string(meta_id)+CRLF).IsOK() &&
Util::SockSend(repl_fd, std::to_string(file_size)+CRLF).IsOK() &&
Util::SockSendFile(repl_fd, fd, file_size).IsOK()) {
LOG(INFO) << "[replication] Succeed sending backup meta " << meta_id
<< " to " << ip;
// Send full data file info
if (Util::SockSend(repl_fd, files+CRLF).IsOK()) {
LOG(INFO) << "[replication] Succeed sending full data file info to " << ip;
} else {
LOG(WARNING) << "[replication] Fail to send backup meta" << meta_id
LOG(WARNING) << "[replication] Fail to send full data file info "
<< ip << ", error: " << strerror(errno);
}
close(fd);
svr->storage_->SetCheckpointAccessTime(std::time(nullptr));
close(repl_fd);
});
t.detach();
Expand Down Expand Up @@ -4132,7 +4127,7 @@ class CommandFetchFile : public Commander {
svr->GetFetchFileThreadNum();
}
auto start = std::chrono::high_resolution_clock::now();
auto fd = Engine::Storage::BackupManager::OpenDataFile(svr->storage_,
auto fd = Engine::Storage::ReplDataManager::OpenDataFile(svr->storage_,
file, &file_size);
if (fd < 0) break;

Expand Down Expand Up @@ -4160,6 +4155,7 @@ class CommandFetchFile : public Commander {
usleep(shortest - duration);
}
}
svr->storage_->SetCheckpointAccessTime(std::time(nullptr));
svr->DecrFetchFileThread();
close(repl_fd);
});
Expand Down
96 changes: 76 additions & 20 deletions src/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ Status ReplicationThread::Start(std::function<void()> &&pre_fullsync_cb,
pre_fullsync_cb_ = std::move(pre_fullsync_cb);
post_fullsync_cb_ = std::move(post_fullsync_cb);

// Clean synced checkpoint from old master because replica starts to follow new master
auto s = rocksdb::DestroyDB(srv_->GetConfig()->sync_checkpoint_dir, rocksdb::Options());
if (!s.ok()) {
LOG(WARNING) << "Can't clean synced checkpoint from master, error: " << s.ToString();
}

// cleanup the old backups, so we can start replication in a clean state
storage_->PurgeOldBackups(0, 0);

Expand Down Expand Up @@ -532,6 +538,11 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev,
auto input = bufferevent_get_input(bev);
switch (self->fullsync_state_) {
case kFetchMetaID:
// New version master only sends meta file content
if (!self->srv_->GetConfig()->master_use_repl_port) {
self->fullsync_state_ = kFetchMetaContent;
return CBState::AGAIN;
}
line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
Expand Down Expand Up @@ -566,17 +577,51 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev,
self->fullsync_state_ = kFetchMetaContent;
LOG(INFO) << "[replication] Succeed fetching meta size: " << self->fullsync_filesize_;
case kFetchMetaContent:
if (evbuffer_get_length(input) < self->fullsync_filesize_) {
return CBState::AGAIN;
std::string target_dir;
Engine::Storage::ReplDataManager::MetaInfo meta;
// Master using old version
if (self->srv_->GetConfig()->master_use_repl_port) {
if (evbuffer_get_length(input) < self->fullsync_filesize_) {
return CBState::AGAIN;
}
meta = Engine::Storage::ReplDataManager::ParseMetaAndSave(
self->storage_, self->fullsync_meta_id_, input);
target_dir = self->srv_->GetConfig()->backup_dir;
} else {
// Master using new version
line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
LOG(ERROR) << "[replication] Failed to fetch meta info: " << line;
free(line);
return CBState::RESTART;
}
std::vector<std::string> need_files;
Util::Split(std::string(line), ",", &need_files);
for (auto f : need_files) {
meta.files.emplace_back(f, 0);
}
target_dir = self->srv_->GetConfig()->sync_checkpoint_dir;
// Clean invaild files of checkpoint
auto s = Engine::Storage::ReplDataManager::CleanInvalidFiles(
self->storage_, target_dir, need_files);
if (!s.IsOK()) {
LOG(WARNING) << "[replication] Failed to clean up invalid files of the old checkpoint,"
<< " error: " << s.Msg();
LOG(WARNING) << "[replication] Try to clean all checkpoint files";
auto s = rocksdb::DestroyDB(target_dir, rocksdb::Options());
if (!s.ok()) {
LOG(WARNING) << "[replication] Failed to clean all checkpoint files, error: "
<< s.ToString();
}
}
}
auto meta = Engine::Storage::BackupManager::ParseMetaAndSave(
self->storage_, self->fullsync_meta_id_, input);
assert(evbuffer_get_length(input) == 0);
self->fullsync_state_ = kFetchMetaID;

LOG(INFO) << "[replication] Succeeded fetching meta file, fetching files in parallel";
LOG(INFO) << "[replication] Succeeded fetching full data files info, fetching files in parallel";
self->repl_state_ = kReplFetchSST;
auto s = self->parallelFetchFile(meta.files);
auto s = self->parallelFetchFile(target_dir, meta.files);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to parallel fetch files while " + s.Msg();
return CBState::RESTART;
Expand All @@ -585,7 +630,12 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev,

// Restore DB from backup
self->pre_fullsync_cb_();
s = self->storage_->RestoreFromBackup();
// For old version, master uses rocksdb backup to implement data snapshot
if (self->srv_->GetConfig()->master_use_repl_port) {
s = self->storage_->RestoreFromBackup();
} else {
s = self->storage_->RestoreFromCheckpoint();
}
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to restore backup while " + s.Msg() + ", restart fullsync";
return CBState::RESTART;
Expand All @@ -603,7 +653,8 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev,
return CBState::QUIT;
}

Status ReplicationThread::parallelFetchFile(const std::vector<std::pair<std::string, uint32_t>> &files) {
Status ReplicationThread::parallelFetchFile(const std::string &dir,
const std::vector<std::pair<std::string, uint32_t>> &files) {
size_t concurrency = 1;
if (files.size() > 20) {
// Use 4 threads to download files in parallel
Expand All @@ -614,7 +665,7 @@ Status ReplicationThread::parallelFetchFile(const std::vector<std::pair<std::str
std::vector<std::future<Status>> results;
for (size_t tid = 0; tid < concurrency; ++tid) {
results.push_back(std::async(
std::launch::async, [this, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status {
std::launch::async, [this, dir, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status {
if (this->stop_flag_) {
return Status(Status::NotOK, "replication thread was stopped");
}
Expand All @@ -637,7 +688,7 @@ Status ReplicationThread::parallelFetchFile(const std::vector<std::pair<std::str
const auto &f_name = files[f_idx].first;
const auto &f_crc = files[f_idx].second;
// Don't fetch existing files
if (Engine::Storage::BackupManager::FileExists(this->storage_, f_name, f_crc)) {
if (Engine::Storage::ReplDataManager::FileExists(this->storage_, dir, f_name, f_crc)) {
skip_cnt.fetch_add(1);
uint32_t cur_skip_cnt = skip_cnt.load();
uint32_t cur_fetch_cnt = fetch_cnt.load();
Expand All @@ -663,11 +714,13 @@ Status ReplicationThread::parallelFetchFile(const std::vector<std::pair<std::str
// command, so we need to fetch all files by multiple command interactions.
if (srv_->GetConfig()->master_use_repl_port) {
for (unsigned i = 0; i < fetch_files.size(); i++) {
s = this->fetchFiles(sock_fd, {fetch_files[i]}, {crcs[i]}, fn);
s = this->fetchFiles(sock_fd, dir, {fetch_files[i]}, {crcs[i]}, fn);
if (!s.IsOK()) break;
}
} else {
if (!fetch_files.empty()) s = this->fetchFiles(sock_fd, fetch_files, crcs, fn);
if (!fetch_files.empty()) {
s = this->fetchFiles(sock_fd, dir, fetch_files, crcs, fn);
}
}
close(sock_fd);
return s;
Expand Down Expand Up @@ -711,8 +764,9 @@ Status ReplicationThread::sendAuth(int sock_fd) {
return Status::OK();
}

Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf,
std::string file, uint32_t crc, fetch_file_callback fn) {
Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf,
const std::string &dir, std::string file,
uint32_t crc, fetch_file_callback fn) {
size_t line_len, file_size;

// Read file size line
Expand All @@ -735,7 +789,7 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf,
}

// Write to tmp file
auto tmp_file = Engine::Storage::BackupManager::NewTmpFile(storage_, file);
auto tmp_file = Engine::Storage::ReplDataManager::NewTmpFile(storage_, dir, file);
if (!tmp_file) {
return Status(Status::NotOK, "unable to create tmp file");
}
Expand All @@ -759,22 +813,24 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf,
}
}
}
if (crc != tmp_crc) {
// Verify file crc checksum if crc is not 0
if (crc && crc != tmp_crc) {
char err_buf[64];
snprintf(err_buf, sizeof(err_buf), "CRC mismatched, %u was expected but got %u", crc, tmp_crc);
return Status(Status::NotOK, err_buf);
}
// File is OK, rename to formal name
auto s = Engine::Storage::BackupManager::SwapTmpFile(storage_, file);
auto s = Engine::Storage::ReplDataManager::SwapTmpFile(storage_, dir, file);
if (!s.IsOK()) return s;

// Call fetch file callback function
fn(file, crc);
return Status::OK();
}

Status ReplicationThread::fetchFiles(int sock_fd, const std::vector<std::string> &files,
const std::vector<uint32_t> &crcs, fetch_file_callback fn) {
Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir,
const std::vector<std::string> &files, const std::vector<uint32_t> &crcs,
fetch_file_callback fn) {
std::string files_str;
for (auto file : files) {
files_str += file;
Expand All @@ -789,7 +845,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::vector<std::string>
evbuffer *evbuf = evbuffer_new();
for (unsigned i = 0; i < files.size(); i++) {
DLOG(INFO) << "[fetch] Start to fetch file " << files[i];
s = fetchFile(sock_fd, evbuf, files[i], crcs[i], fn);
s = fetchFile(sock_fd, evbuf, dir, files[i], crcs[i], fn);
if (!s.IsOK()) {
s = Status(Status::NotOK, "fetch file err: " + s.Msg());
LOG(WARNING) << "[fetch] Fail to fetch file " << files[i] << ", err: " << s.Msg();
Expand Down
13 changes: 8 additions & 5 deletions src/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,14 @@ class ReplicationThread {

// Synchronized-Blocking ops
Status sendAuth(int sock_fd);
Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string file,
uint32_t crc, fetch_file_callback fn);
Status fetchFiles(int sock_fd, const std::vector<std::string> &files,
const std::vector<uint32_t> &crcs, fetch_file_callback fn);
Status parallelFetchFile(const std::vector<std::pair<std::string, uint32_t>> &files);
Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir,
const std::string file, uint32_t crc, fetch_file_callback fn);
Status fetchFiles(int sock_fd, const std::string &dir,
const std::vector<std::string> &files,
const std::vector<uint32_t> &crcs,
fetch_file_callback fn);
Status parallelFetchFile(const std::string &dir,
const std::vector<std::pair<std::string, uint32_t>> &files);
static bool isRestoringError(const char *err);

static void EventTimerCB(int, int16_t, void *ctx);
Expand Down
20 changes: 20 additions & 0 deletions src/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,26 @@ void Server::cron() {
Status s = dynamicResizeBlockAndSST();
LOG(INFO) << "[server] Schedule to dynamic resize block and sst, result: " << s.Msg();
}

// No replica uses this checkpoint, we can remove it.
if (counter != 0 && counter % 10 == 0) {
time_t create_time = storage_->GetCheckpointCreateTime();
time_t access_time = storage_->GetCheckpointAccessTime();

// Maybe creating checkpoint costs much time if target dir is on another
// disk partition, so when we want to clean up checkpoint, we should guarantee
// that kvrocks is not creating checkpoint even if there is a checkpoint.
if (storage_->ExistCheckpoint() && storage_->IsCreatingCheckpoint() == false) {
// TODO(shooterit): support to config the alive time of checkpoint
if ((GetFetchFileThreadNum() == 0 && std::time(nullptr) - access_time > 30) ||
(std::time(nullptr) - create_time > 24 * 60 * 60)) {
auto s = rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options());
if (!s.ok()) {
LOG(WARNING) << "Fail to clean checkpoint, error: " << s.ToString();
}
}
}
}
cleanupExitedSlaves();
counter++;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Expand Down
Loading