Skip to content

Commit

Permalink
feat: add diskrecovery command (OpenAtomFoundation#1843)
Browse files Browse the repository at this point in the history
* Add diskrecovery command

* Add diskrecovery command
  • Loading branch information
Mixficsol authored Aug 3, 2023
1 parent a310b1d commit 9c67ca7
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 54 deletions.
5 changes: 4 additions & 1 deletion docs/ops/APIDifference.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,7 @@ slaveof命令允许通过指定write2file(binlog)的文件名称及同步位置
* field_end:返回的结束Field, 空字符串表示 +inf(无限大)

### pkhrscanrange key field_start field_end [MATCH pattern] [LIMIT limit]
类似于pkhscanrange, 逆序
类似于pkhscanrange, 逆序

### diskrecovery
Pika 原创命令,功能为当磁盘意外写满后,RocksDB 会进入写保护状态,当我们将空间调整为充足空间时,这个命令可以将 RocksDB 的写保护状态解除,变为可以继续写的状态, 避免了 Pika 因为磁盘写满后需要重启才能恢复写的情况,执行成功时返回 OK,如果当前磁盘空间依然不足,执行这个命令返回`"The available disk capacity is insufficient`,该命令执行时不需要额外参数,只需要执行 diskrecovery 即可。
13 changes: 13 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,19 @@ class HelloCmd : public Cmd {
void DoInitial() override;
};

class DiskRecoveryCmd : public Cmd {
public:
DiskRecoveryCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new DiskRecoveryCmd(*this); }

private:
void DoInitial() override;
std::map<std::string, uint64_t> background_errors_;
};

#ifdef WITH_COMMAND_DOCS
class CommandCmd : public Cmd {
public:
Expand Down
1 change: 1 addition & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const std::string kCmdDummy = "dummy";
const std::string kCmdNameQuit = "quit";
const std::string kCmdNameHello = "hello";
const std::string kCmdNameCommand = "command";
const std::string kCmdNameDiskRecovery = "diskrecovery";

// Migrate slot
const std::string kCmdNameSlotsMgrtSlot = "slotsmgrtslot";
Expand Down
5 changes: 4 additions & 1 deletion include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
bool FlushSlotDB();
bool FlushSlotSubDB(const std::string& db_name);
void SetBinlogIoError();
void SetBinlogIoErrorrelieve();
bool IsBinlogIoError();
uint32_t SlotNum();
void GetAllSlots(std::set<uint32_t>& slot_ids);
Expand Down Expand Up @@ -56,7 +57,9 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
bool DBIsEmpty();
pstd::Status MovetoToTrash(const std::string& path);
pstd::Status Leave();

std::map<uint32_t, std::shared_ptr<Slot>> GetSlots() {
return slots_;
}
private:
std::string db_name_;
uint32_t slot_num_ = 0;
Expand Down
7 changes: 7 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,13 @@ class PikaServer : public pstd::noncopyable {
*/
std::unique_ptr<Instant> instant_;

/*
* Diskrecovery used
*/
std::map<std::string, std::shared_ptr<DB>> GetDB() {
return dbs_;
}

friend class Cmd;
friend class InfoCmd;
friend class PikaReplClientConn;
Expand Down
53 changes: 53 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <sys/time.h>
#include <sys/utsname.h>
#include <sys/statvfs.h>

#include <algorithm>
#include <unordered_map>
Expand Down Expand Up @@ -2546,6 +2547,58 @@ void HelloCmd::Do(std::shared_ptr<Slot> slot) {
res_.AppendStringRaw(raw);
}

void DiskRecoveryCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameDiskRecovery);
return;
}
}

void DiskRecoveryCmd::Do(std::shared_ptr<Slot> slot) {
struct statvfs disk_info;
int ret = statvfs(g_pika_conf->db_path().c_str(), &disk_info);
if (ret == -1) {
std::stringstream tmp_stream;
tmp_stream << "statvfs error:" << strerror(errno);
const std::string res = tmp_stream.str();
res_.SetRes(CmdRes::kErrOther, res);
return;
}
int64_t least_free_size = g_pika_conf->least_resume_free_disk_size();
uint64_t free_size = disk_info.f_bsize * disk_info.f_bfree;
if (free_size < least_free_size) {
res_.SetRes(CmdRes::kErrOther, "The available disk capacity is insufficient");
return;
}
std::shared_mutex slots_rw;
std::shared_mutex dbs_rw;
std::shared_lock db_rwl(dbs_rw);
// loop every db
for (const auto& db_item : g_pika_server->GetDB()) {
if (!db_item.second) {
continue;
}
db_item.second->SetBinlogIoErrorrelieve();
std::shared_lock slot_rwl(slots_rw);
// loop every slot
for (const auto &slot_item: db_item.second->GetSlots()) {
background_errors_.clear();
slot_item.second->DbRWLockReader();
slot_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors_);
slot_item.second->DbRWUnLock();
for (const auto &item: background_errors_) {
if (item.second != 0) {
rocksdb::Status s = slot_item.second->db()->GetDBByType(item.first)->Resume();
if (!s.ok()) {
res_.SetRes(CmdRes::kErrOther, "The restore operation failed.");
}
}
}
}
}
res_.SetRes(CmdRes::kOk, "The disk error has been recovered");
}

#ifdef WITH_COMMAND_DOCS

bool CommandCmd::CommandFieldCompare::operator()(const std::string& a, const std::string& b) const {
Expand Down
2 changes: 2 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdDummy, std::move(dummyptr)));
std::unique_ptr<Cmd> quitptr = std::make_unique<QuitCmd>(kCmdNameQuit, 1, kCmdFlagsRead);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameQuit, std::move(quitptr)));
std::unique_ptr<Cmd> diskrecoveryptr = std::make_unique<DiskRecoveryCmd>(kCmdNameDiskRecovery, 1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameDiskRecovery, std::move(diskrecoveryptr)));

#ifdef WITH_COMMAND_DOCS
std::unique_ptr<Cmd> commandptr = std::make_unique<CommandCmd>(kCmdNameCommand, -1, kCmdFlagsRead | kCmdFlagsAdmin);
Expand Down
2 changes: 1 addition & 1 deletion src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ bool DB::FlushSlotSubDB(const std::string& db_name) {
}

void DB::SetBinlogIoError() { return binlog_io_error_.store(true); }

void DB::SetBinlogIoErrorrelieve() { return binlog_io_error_.store(false); }
bool DB::IsBinlogIoError() { return binlog_io_error_.load(); }

uint32_t DB::SlotNum() { return slot_num_; }
Expand Down
51 changes: 0 additions & 51 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1285,8 +1285,6 @@ void PikaServer::PubSubNumSub(const std::vector<std::string>& channels,
/******************************* PRIVATE *******************************/

void PikaServer::DoTimingTask() {
// Resume DB if satisfy the condition
AutoResumeDB();
// Maybe schedule compactrange
AutoCompactRange();
// Purge log
Expand Down Expand Up @@ -1480,55 +1478,6 @@ void PikaServer::AutoKeepAliveRSync() {
}
}

void PikaServer::AutoResumeDB() {
int64_t interval = g_pika_conf->resume_interval();
int64_t least_free_size = g_pika_conf->least_resume_free_disk_size();
struct timeval now;
gettimeofday(&now, nullptr);
// first check or time interval between now and last check is larger than variable "interval"
if (last_check_resume_time_.tv_sec == 0 || now.tv_sec - last_check_resume_time_.tv_sec >= interval) {
struct statvfs disk_info;
int ret = statvfs(g_pika_conf->db_path().c_str(), &disk_info);
if (ret == -1) {
LOG(WARNING) << "statvfs error: " << strerror(errno);
return;
}
double min_check_resume_ratio = g_pika_conf->min_check_resume_ratio();
uint64_t free_size = disk_info.f_bsize * disk_info.f_bfree;
uint64_t total_size = disk_info.f_bsize * disk_info.f_blocks;
double disk_use_ratio = 1.0 - static_cast<double>(free_size) / static_cast<double>(total_size);
if (disk_use_ratio > min_check_resume_ratio && free_size > least_free_size) {
gettimeofday(&last_check_resume_time_, nullptr);

std::map<std::string, uint64_t> background_errors;
std::shared_lock db_rwl(g_pika_server->dbs_rw_);
// loop every db
for (const auto &db_item: g_pika_server->dbs_) {
if (!db_item.second) {
continue;
}
std::shared_lock slot_rwl(db_item.second->slots_rw_);
// loop every slot
for (const auto &slot_item: db_item.second->slots_) {
background_errors.clear();
slot_item.second->DbRWLockReader();
slot_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS,
&background_errors);
slot_item.second->DbRWUnLock();
for (const auto &item: background_errors) {
if (item.second != 0) {
rocksdb::Status s = slot_item.second->db()->GetDBByType(item.first)->Resume();
if (!s.ok()) {
LOG(WARNING) << s.ToString();
}
}
}
}
}
}
}
}

void PikaServer::AutoUpdateNetworkMetric() {
monotime current_time = getMonotonicUs();
size_t factor = 5e6; // us, 5s
Expand Down

0 comments on commit 9c67ca7

Please sign in to comment.