Skip to content

Commit

Permalink
Support config set backup-dir new-dir (#1026)
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU authored Oct 24, 2022
1 parent 67c52be commit ce085aa
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 13 deletions.
26 changes: 23 additions & 3 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Config::Config() {
{"compaction-checker-range", false, new StringField(&compaction_checker_range_, "")},
{"db-name", true, new StringField(&db_name, "change.me.db")},
{"dir", true, new StringField(&dir, "/tmp/kvrocks")},
{"backup-dir", true, new StringField(&backup_dir, "")},
{"backup-dir", false, new StringField(&backup_dir, "")},
{"log-dir", true, new StringField(&log_dir, "")},
{"pidfile", true, new StringField(&pidfile, "")},
{"max-io-mb", false, new IntField(&max_io_mb, 500, 0, INT_MAX)},
Expand Down Expand Up @@ -302,7 +302,7 @@ void Config::initFieldValidator() {
}

// The callback function would be invoked after the field was set,
// it may change related fileds or re-format the field. for example,
// it may change related fields or re-format the field. for example,
// when the 'dir' was set, the db-dir or backup-dir should be reset as well.
void Config::initFieldCallback() {
auto set_db_option_cb = [](Server *srv, const std::string &k, const std::string &v) -> Status {
Expand All @@ -329,13 +329,33 @@ void Config::initFieldCallback() {
{"dir",
[this](Server *srv, const std::string &k, const std::string &v) -> Status {
db_dir = dir + "/db";
if (backup_dir.empty()) backup_dir = dir + "/backup";
{
std::lock_guard<std::mutex> lg(this->backup_mu_);
if (backup_dir.empty()) {
backup_dir = dir + "/backup";
}
}
if (log_dir.empty()) log_dir = dir;
checkpoint_dir = dir + "/checkpoint";
sync_checkpoint_dir = dir + "/sync_checkpoint";
backup_sync_dir = dir + "/backup_for_sync";
return Status::OK();
}},
{"backup-dir",
[this](Server *srv, const std::string &k, const std::string &v) -> Status {
std::string previous_backup;
{
// Note: currently, backup_mu_ may block by backing up or purging,
// the command may wait for seconds.
std::lock_guard<std::mutex> lg(this->backup_mu_);
previous_backup = std::move(backup_dir);
backup_dir = v;
}
if (!previous_backup.empty()) {
LOG(INFO) << "change backup dir from " << backup_dir << " to " << v;
}
return Status::OK();
}},
{"cluster-enabled",
[this](Server *srv, const std::string &k, const std::string &v) -> Status {
if (cluster_enabled) slot_id_encoded = true;
Expand Down
4 changes: 3 additions & 1 deletion src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ struct Config {
std::vector<std::string> binds;
std::string dir;
std::string db_dir;
std::string backup_dir;
std::string backup_dir; // GUARD_BY(backup_mu_)
std::string backup_sync_dir;
std::string checkpoint_dir;
std::string sync_checkpoint_dir;
Expand Down Expand Up @@ -182,6 +182,8 @@ struct Config {
} write_options;
} RocksDB;

mutable std::mutex backup_mu_;

public:
Status Rewrite();
Status Load(const std::string &path);
Expand Down
16 changes: 9 additions & 7 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,10 @@ Status Storage::OpenForReadOnly() { return Open(true); }

Status Storage::CreateBackup() {
LOG(INFO) << "[storage] Start to create new backup";
std::lock_guard<std::mutex> lg(backup_mu_);
std::lock_guard<std::mutex> lg(config_->backup_mu_);
std::string task_backup_dir = config_->backup_dir;

std::string tmpdir = config_->backup_dir + ".tmp";
std::string tmpdir = task_backup_dir + ".tmp";
// Maybe there is a dirty tmp checkpoint, try to clean it
rocksdb::DestroyDB(tmpdir, rocksdb::Options());

Expand All @@ -354,11 +355,11 @@ Status Storage::CreateBackup() {
}

// 2) Rename tmp backup to real backup dir
if (!(s = rocksdb::DestroyDB(config_->backup_dir, rocksdb::Options())).ok()) {
if (!(s = rocksdb::DestroyDB(task_backup_dir, rocksdb::Options())).ok()) {
LOG(WARNING) << "[storage] Fail to clean old backup, error:" << s.ToString();
return Status(Status::NotOK, s.ToString());
}
if (!(s = env_->RenameFile(tmpdir, config_->backup_dir)).ok()) {
if (!(s = env_->RenameFile(tmpdir, task_backup_dir)).ok()) {
LOG(WARNING) << "[storage] Fail to rename tmp backup, error:" << s.ToString();
// Just try best effort
if (!(s = rocksdb::DestroyDB(tmpdir, rocksdb::Options())).ok()) {
Expand Down Expand Up @@ -467,16 +468,17 @@ void Storage::EmptyDB() {

void Storage::PurgeOldBackups(uint32_t num_backups_to_keep, uint32_t backup_max_keep_hours) {
time_t now = time(nullptr);
std::lock_guard<std::mutex> lg(backup_mu_);
std::lock_guard<std::mutex> lg(config_->backup_mu_);
std::string task_backup_dir = config_->backup_dir;

// Return if there is no backup
auto s = env_->FileExists(config_->backup_dir);
auto s = env_->FileExists(task_backup_dir);
if (!s.ok()) return;

// No backup is needed to keep or the backup is expired, we will clean it.
if (num_backups_to_keep == 0 ||
(backup_max_keep_hours != 0 && backup_creating_time_ + backup_max_keep_hours * 3600 < now)) {
s = rocksdb::DestroyDB(config_->backup_dir, rocksdb::Options());
s = rocksdb::DestroyDB(task_backup_dir, rocksdb::Options());
if (s.ok()) {
LOG(INFO) << "[storage] Succeeded cleaning old backup that was born at " << backup_creating_time_;
} else {
Expand Down
1 change: 0 additions & 1 deletion src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ class Storage {
private:
rocksdb::DB *db_ = nullptr;
std::string replid_;
std::mutex backup_mu_;
time_t backup_creating_time_;
rocksdb::BackupEngine *backup_ = nullptr;
rocksdb::Env *env_;
Expand Down
2 changes: 1 addition & 1 deletion tests/cppunit/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ TEST(Config, GetAndSet) {
{"profiling-sample-record-max-len", "1"},
{"profiling-sample-record-threshold-ms", "50"},
{"profiling-sample-commands", "get,set"},
{"backup-dir", "test_dir/backup"},

{"rocksdb.compression", "no"},
{"rocksdb.max_open_files", "1234"},
Expand Down Expand Up @@ -114,7 +115,6 @@ TEST(Config, GetAndSet) {
{"slaveof", "no one"},
{"db-name", "test_dbname"},
{"dir", "test_dir"},
{"backup-dir", "test_dir/backup"},
{"pidfile", "test.pid"},
{"supervised", "no"},
{"rocksdb.block_size", "1234"},
Expand Down
57 changes: 57 additions & 0 deletions tests/gocase/unit/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ package config

import (
"context"
"log"
"os"
"testing"
"time"

"path/filepath"

"github.com/apache/incubator-kvrocks/tests/gocase/util"
"github.com/stretchr/testify/require"
Expand All @@ -41,3 +46,55 @@ func TestRenameCommand(t *testing.T) {
r := rdb.Do(ctx, "KEYSNEW", "*")
require.Equal(t, []interface{}{}, r.Val())
}

func TestSetConfigBackupDir(t *testing.T) {
configs := map[string]string{}
srv := util.StartServer(t, configs)
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

originBackupDir := filepath.Join(configs["dir"], "backup")

r := rdb.Do(ctx, "CONFIG", "GET", "backup-dir")
rList := r.Val().([]interface{})
require.EqualValues(t, rList[0], "backup-dir")
require.EqualValues(t, rList[1], originBackupDir)

hasCompactionFiles := func(dir string) bool {
if _, err := os.Stat(dir); os.IsNotExist(err) {
return false
}
files, err := os.ReadDir(dir)
if err != nil {
log.Fatal(err)
}
return len(files) != 0
}

require.False(t, hasCompactionFiles(originBackupDir))

require.NoError(t, rdb.Do(ctx, "bgsave").Err())
time.Sleep(2000 * time.Millisecond)

require.True(t, hasCompactionFiles(originBackupDir))

newBackupDir := filepath.Join(configs["dir"], "backup2")

require.False(t, hasCompactionFiles(newBackupDir))

require.NoError(t, rdb.Do(ctx, "CONFIG", "SET", "backup-dir", newBackupDir).Err())

r = rdb.Do(ctx, "CONFIG", "GET", "backup-dir")
rList = r.Val().([]interface{})
require.EqualValues(t, rList[0], "backup-dir")
require.EqualValues(t, rList[1], newBackupDir)

require.NoError(t, rdb.Do(ctx, "bgsave").Err())
time.Sleep(2000 * time.Millisecond)

require.True(t, hasCompactionFiles(newBackupDir))
require.True(t, hasCompactionFiles(originBackupDir))
}

0 comments on commit ce085aa

Please sign in to comment.