Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(replication): didn't resume the db status after restarting full sync #2549

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ ReplicationThread::ReplicationThread(std::string host, uint32_t port, Server *sr
CallbackType{CallbacksStateMachine::WRITE, "fullsync write", &ReplicationThread::fullSyncWriteCB},
CallbackType{CallbacksStateMachine::READ, "fullsync read", &ReplicationThread::fullSyncReadCB}}) {}

Status ReplicationThread::Start(std::function<void()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb) {
Status ReplicationThread::Start(std::function<bool()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb) {
pre_fullsync_cb_ = std::move(pre_fullsync_cb);
post_fullsync_cb_ = std::move(post_fullsync_cb);

Expand Down Expand Up @@ -700,25 +700,28 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
fullsync_state_ = kFetchMetaID;
LOG(INFO) << "[replication] Succeeded fetching full data files info, fetching files in parallel";

bool pre_fullsync_done = false;
// If 'slave-empty-db-before-fullsync' is yes, we call 'pre_fullsync_cb_'
// just like reloading database. And we don't want slave to occupy too much
// disk space, so we just empty entire database rudely.
if (srv_->GetConfig()->slave_empty_db_before_fullsync) {
pre_fullsync_cb_();
if (!pre_fullsync_cb_()) return CBState::RESTART;
pre_fullsync_done = true;
storage_->EmptyDB();
}

repl_state_.store(kReplFetchSST, std::memory_order_relaxed);
auto s = parallelFetchFile(target_dir, meta.files);
if (!s.IsOK()) {
if (pre_fullsync_done) post_fullsync_cb_();
LOG(ERROR) << "[replication] Failed to parallel fetch files while " + s.Msg();
return CBState::RESTART;
}
LOG(INFO) << "[replication] Succeeded fetching files in parallel, restoring the backup";

// Restore DB from backup
// We already call 'pre_fullsync_cb_' if 'slave-empty-db-before-fullsync' is yes
if (!srv_->GetConfig()->slave_empty_db_before_fullsync) pre_fullsync_cb_();
// Don't need to call 'pre_fullsync_cb_' again if it was called before
if (!pre_fullsync_done && !pre_fullsync_cb_()) return CBState::RESTART;

// For old version, master uses rocksdb backup to implement data snapshot
if (srv_->GetConfig()->master_use_repl_port) {
s = storage_->RestoreFromBackup();
Expand All @@ -727,6 +730,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
}
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to restore backup while " + s.Msg() + ", restart fullsync";
post_fullsync_cb_();
return CBState::RESTART;
}
LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was finish";
Expand Down
5 changes: 3 additions & 2 deletions src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ class FeedSlaveThread {
class ReplicationThread : private EventCallbackBase<ReplicationThread> {
public:
explicit ReplicationThread(std::string host, uint32_t port, Server *srv);
Status Start(std::function<void()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb);
Status Start(std::function<bool()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb);
void Stop();
bool IsStopped() const { return stop_flag_; }
ReplState State() { return repl_state_.load(std::memory_order_relaxed); }
int64_t LastIOTimeSecs() const { return last_io_time_secs_.load(std::memory_order_relaxed); }

Expand Down Expand Up @@ -159,7 +160,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
bool next_try_old_psync_ = false;
bool next_try_without_announce_ip_address_ = false;

std::function<void()> pre_fullsync_cb_;
std::function<bool()> pre_fullsync_cb_;
std::function<void()> post_fullsync_cb_;

// Internal states managed by FullSync procedure
Expand Down
28 changes: 18 additions & 10 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ Status Server::AddMaster(const std::string &host, uint32_t port, bool force_reco
if (GetConfig()->master_use_repl_port) master_listen_port += 1;

replication_thread_ = std::make_unique<ReplicationThread>(host, master_listen_port, this);
auto s = replication_thread_->Start([this]() { PrepareRestoreDB(); },
auto s = replication_thread_->Start([this]() { return PrepareRestoreDB(); },
[this]() {
this->is_loading_ = false;
if (auto s = task_runner_.Start(); !s) {
Expand Down Expand Up @@ -1336,18 +1336,11 @@ std::string Server::GetRocksDBStatsJson() const {
// This function is called by replication thread when finished fetching all files from its master.
// Before restoring the db from backup or checkpoint, we should
// guarantee other threads don't access DB and its column families, then close db.
void Server::PrepareRestoreDB() {
bool Server::PrepareRestoreDB() {
// Stop feeding slaves thread
LOG(INFO) << "[server] Disconnecting slaves...";
DisconnectSlaves();

// Stop task runner
LOG(INFO) << "[server] Stopping the task runner and clear task queue...";
task_runner_.Cancel();
if (auto s = task_runner_.Join(); !s) {
LOG(WARNING) << "[server] " << s.Msg();
}

// If the DB is restored, the object 'db_' will be destroyed, but
// 'db_' will be accessed in data migration task. To avoid wrong
// accessing, data migration task should be stopped before restoring DB
Expand All @@ -1362,12 +1355,27 @@ void Server::PrepareRestoreDB() {
// ASAP to avoid user can't receive responses for long time, because the following
// 'CloseDB' may cost much time to acquire DB mutex.
LOG(INFO) << "[server] Waiting workers for finishing executing commands...";
{ auto exclusivity = WorkExclusivityGuard(); }
while (!works_concurrency_rw_lock_.try_lock()) {
if (replication_thread_->IsStopped()) {
is_loading_ = false;
return false;
}
usleep(1000);
}
works_concurrency_rw_lock_.unlock();

// Stop task runner
LOG(INFO) << "[server] Stopping the task runner and clear task queue...";
task_runner_.Cancel();
if (auto s = task_runner_.Join(); !s) {
LOG(WARNING) << "[server] " << s.Msg();
}

// Cron thread, compaction checker thread, full synchronization thread
// may always run in the background, we need to close db, so they don't actually work.
LOG(INFO) << "[server] Waiting for closing DB...";
storage->CloseDB();
return true;
}

void Server::WaitNoMigrateProcessing() {
Expand Down
2 changes: 1 addition & 1 deletion src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ class Server {
std::string GetRocksDBStatsJson() const;
ReplState GetReplicationState();

void PrepareRestoreDB();
bool PrepareRestoreDB();
void WaitNoMigrateProcessing();
Status AsyncCompactDB(const std::string &begin_key = "", const std::string &end_key = "");
Status AsyncBgSaveDB();
Expand Down
1 change: 1 addition & 0 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ Status Storage::RestoreFromCheckpoint() {
// Clean old backups and checkpoints because server will work on the new db
PurgeOldBackups(0, 0);
rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options());
rocksdb::DestroyDB(tmp_dir, rocksdb::Options());

// Maybe there is no database directory
auto s = env_->CreateDirIfMissing(config_->db_dir);
Expand Down
Loading