Skip to content

Commit

Permalink
Improve error messages on kvrocks2redis configuration loading
Browse files Browse the repository at this point in the history
  • Loading branch information
torwig committed Jan 5, 2023
1 parent 0046247 commit 5bb1ad1
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "config.h"

#include <fmt/format.h>
#include <rocksdb/env.h>
#include <strings.h>

Expand All @@ -34,7 +35,6 @@

#include "config_type.h"
#include "config_util.h"
#include "fmt/format.h"
#include "parse_util.h"
#include "server/server.h"
#include "status.h"
Expand Down
61 changes: 42 additions & 19 deletions utils/kvrocks2redis/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "config.h"

#include <fmt/format.h>
#include <rocksdb/env.h>
#include <strings.h>

Expand Down Expand Up @@ -52,25 +53,27 @@ Status Config::parseConfigFromString(const std::string &input) {
args[0] = Util::ToLower(args[0]);
size_t size = args.size();
if (size == 2 && args[0] == "daemonize") {
int i = 0;
if ((i = yesnotoi(args[1])) == -1) {
return Status(Status::NotOK, "argument must be 'yes' or 'no'");
if (int i = yesnotoi(args[1]); i == -1) {
return {Status::NotOK, "the value of 'daemonize' must be 'yes' or 'no'"};
} else {
daemonize = (i == 1);
}
daemonize = (i == 1);
} else if (size == 2 && args[0] == "data-dir") {
data_dir = args[1];
if (data_dir.empty()) {
return Status(Status::NotOK, "data_dir is empty");
return {Status::NotOK, "'data-dir' was not specified"};
}

if (data_dir.back() != '/') {
data_dir += "/";
}
db_dir = data_dir + "db";
} else if (size == 2 && args[0] == "output-dir") {
output_dir = args[1];
if (output_dir.empty()) {
return Status(Status::NotOK, "output-dir is empty");
return {Status::NotOK, "'output-dir' was not specified"};
}

if (output_dir.back() != '/') {
output_dir += "/";
}
Expand All @@ -90,39 +93,46 @@ Status Config::parseConfigFromString(const std::string &input) {
// In new versions, we don't use extra port to implement replication
kvrocks_port = std::stoi(args[2]);
if (kvrocks_port <= 0 || kvrocks_port > 65535) {
return Status(Status::NotOK, "kvrocks port range should be between 0 and 65535");
return {Status::NotOK, "Kvrocks port value should be between 0 and 65535"};
}

if (size == 4) {
kvrocks_auth = args[3];
}
} else if (size == 2 && args[0] == "cluster-enable") {
int i = 0;
if ((i = yesnotoi(args[1])) == -1) {
return Status(Status::NotOK, "argument must be 'yes' or 'no'");
if (int i = yesnotoi(args[1]); i == -1) {
return {Status::NotOK, "the value of 'cluster-enable' must be 'yes' or 'no'"};
} else {
cluster_enable = (i == 1);
}
cluster_enable = (i == 1);
} else if (size >= 3 && !strncasecmp(args[0].data(), "namespace.", 10)) {
} else if (size >= 3 && strncasecmp(args[0].data(), "namespace.", 10) == 0) {
std::string ns = args[0].substr(10, args.size() - 10);
if (ns.size() > INT8_MAX) {
return Status(Status::NotOK, std::string("namespace size exceed limit ") + std::to_string(INT8_MAX));
return {Status::NotOK, std::string("namespace size exceed limit ") + std::to_string(INT8_MAX)};
}

tokens[ns].host = args[1];
tokens[ns].port = std::stoi(args[2]);
if (tokens[ns].port <= 0 || tokens[ns].port > 65535) {
return {Status::NotOK, "Redis port value should be between 0 and 65535"};
}

if (size >= 4) {
tokens[ns].auth = args[3];
}
tokens[ns].db_number = size == 5 ? std::atoi(args[4].c_str()) : 0;
} else {
return Status(Status::NotOK, "Bad directive or wrong number of arguments");
return {Status::NotOK, "unknown configuration directive or wrong number of arguments"};
}

return Status::OK();
}

Status Config::Load(std::string path) {
path_ = std::move(path);
std::ifstream file(path_);
if (!file.is_open()) {
return Status(Status::NotOK, strerror(errno));
return {Status::NotOK, fmt::format("failed to open file '{}': {}", path_, strerror(errno))};
}

std::string line;
Expand All @@ -131,16 +141,29 @@ Status Config::Load(std::string path) {
std::getline(file, line);
Status s = parseConfigFromString(line);
if (!s.IsOK()) {
file.close();
return Status(Status::NotOK, "at line: #L" + std::to_string(line_num) + ", err: " + s.Msg());
return s.Prefixed(fmt::format("at line #L{}", line_num));
}

line_num++;
}

auto s = rocksdb::Env::Default()->FileExists(data_dir);
if (!s.ok()) return Status(Status::NotOK, s.ToString());
if (!s.ok()) {
if (s.IsNotFound()) {
return {Status::NotOK, fmt::format("the specified Kvrocks working directory '{}' doesn't exist", data_dir)};
}
return {Status::NotOK, s.ToString()};
}

s = rocksdb::Env::Default()->FileExists(output_dir);
if (!s.ok()) {
if (s.IsNotFound()) {
return {Status::NotOK,
fmt::format("the specified directory '{}' for intermediate files doesn't exist", output_dir)};
}
return {Status::NotOK, s.ToString()};
}

file.close();
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions utils/kvrocks2redis/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct redis_server {
std::string auth;
int db_number;
};

struct Config {
public:
int loglevel = 0;
Expand Down
10 changes: 8 additions & 2 deletions utils/kvrocks2redis/kvrocks2redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@

# The value should be INFO, WARNING, ERROR, FATAL.
#
# Default is INFO
# Default: INFO
loglevel INFO

# By default kvrocks2redis does not run as a daemon. Use 'yes' if you need it.
# Note that kvrocks2redis will write a pid file in /var/run/kvrocks2redis.pid when daemonized.
#
# Default: no
daemonize no

# The kvrocks working directory.
# Note that you must specify a directory here, not a file name.
#
# Default: ./data
data-dir ./data

# Intermediate files are output to this directory when the kvrocks2redis program runs.
#
# Default: ./
output-dir ./

# Sync kvrocks node. Use the node's Psync command to get the newest wal raw write_batch.
Expand All @@ -23,7 +29,7 @@ kvrocks 127.0.0.1 6666

# Enable cluster mode.
#
# Default: yes
# Default: no
cluster-enable yes

################################ NAMESPACE AND Sync Target Redis #####################################
Expand Down
23 changes: 14 additions & 9 deletions utils/kvrocks2redis/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,34 +85,36 @@ static void initGoogleLog(const Kvrocks2redis::Config *config) {
static Status createPidFile(const std::string &path) {
int fd = open(path.data(), O_RDWR | O_CREAT | O_EXCL, 0660);
if (fd < 0) {
return Status(Status::NotOK, strerror(errno));
return {Status::NotOK, strerror(errno)};
}

std::string pid_str = std::to_string(getpid());
auto s = Util::Write(fd, pid_str);
if (!s.IsOK()) {
return s.Prefixed("failed to write to PID-file");
}

close(fd);
return Status::OK();
}

static void removePidFile(const std::string &path) { std::remove(path.data()); }

static void daemonize() {
pid_t pid = 0;

pid = fork();
pid_t pid = fork();
if (pid < 0) {
LOG(ERROR) << "Failed to fork the process, err: " << strerror(errno);
LOG(ERROR) << "Failed to fork the process. Error: " << strerror(errno);
exit(1);
}

if (pid > 0) exit(EXIT_SUCCESS); // parent process
// change the file mode
umask(0);
if (setsid() < 0) {
LOG(ERROR) << "Failed to setsid, err: %s" << strerror(errno);
LOG(ERROR) << "Failed to setsid. Error: " << strerror(errno);
exit(1);
}

close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
Expand All @@ -136,15 +138,17 @@ int main(int argc, char *argv[]) {
Kvrocks2redis::Config config;
Status s = config.Load(config_file_path);
if (!s.IsOK()) {
std::cout << "Failed to load config, err: " << s.Msg() << std::endl;
std::cout << "Failed to load config. Error: " << s.Msg() << std::endl;
exit(1);
}

initGoogleLog(&config);

if (config.daemonize) daemonize();

s = createPidFile(config.pidfile);
if (!s.IsOK()) {
LOG(ERROR) << "Failed to create pidfile: " << s.Msg();
LOG(ERROR) << "Failed to create pidfile '" << config.pidfile << "': " << s.Msg();
exit(1);
}

Expand All @@ -155,7 +159,7 @@ int main(int argc, char *argv[]) {
Engine::Storage storage(&kvrocks_config);
s = storage.Open(true);
if (!s.IsOK()) {
LOG(ERROR) << "Failed to open: " << s.Msg();
LOG(ERROR) << "Failed to open Kvrocks storage: " << s.Msg();
exit(1);
}

Expand All @@ -170,6 +174,7 @@ int main(int argc, char *argv[]) {
}
};
sync.Start();

removePidFile(config.pidfile);
return 0;
}
17 changes: 9 additions & 8 deletions utils/kvrocks2redis/sync.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <fcntl.h>
#include <glog/logging.h>
#include <rocksdb/write_batch.h>
Expand Down Expand Up @@ -67,17 +66,20 @@ void Sync::Start() {
while (!IsStopped()) {
auto sock_fd = Util::SockConnect(config_->kvrocks_host, config_->kvrocks_port);
if (!sock_fd) {
LOG(ERROR) << sock_fd.Msg();
LOG(ERROR) << fmt::format("Failed to connect to Kvrocks on {}:{}. Error: {}", config_->kvrocks_host,
config_->kvrocks_port, sock_fd.Msg());
usleep(10000);
continue;
}

sock_fd_ = *sock_fd;
s = auth();
if (!s.IsOK()) {
LOG(ERROR) << s.Msg();
usleep(10000);
continue;
}

while (!IsStopped()) {
s = tryPSync();
if (!s.IsOK()) {
Expand Down Expand Up @@ -135,7 +137,7 @@ Status Sync::tryPSync() {
" last_next_seq config file, and restart kvrocks2redis, redis reply: " +
std::string(line);
stop_flag_ = true;
return Status(Status::NotOK, error_msg);
return {Status::NotOK, error_msg};
}
// PSYNC isn't OK, we should use parseAllLocalStorage
// Switch to parseAllLocalStorage
Expand All @@ -149,12 +151,11 @@ Status Sync::tryPSync() {

Status Sync::incrementBatchLoop() {
std::cout << "Start parse increment batch ..." << std::endl;
char *bulk_data = nullptr;
evbuffer *evbuf = evbuffer_new();
while (!IsStopped()) {
if (evbuffer_read(evbuf, sock_fd_, -1) <= 0) {
evbuffer_free(evbuf);
return Status(Status::NotOK, std::string("[kvrocks2redis] read increament batch err: ") + strerror(errno));
return {Status::NotOK, std::string("[kvrocks2redis] read increament batch err: ") + strerror(errno)};
}
if (incr_state_ == IncrementBatchLoopState::Incr_batch_size) {
// Read bulk length
Expand All @@ -165,15 +166,15 @@ Status Sync::incrementBatchLoop() {
}
incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, nullptr, 10) : 0;
if (incr_bulk_len_ == 0) {
return Status(Status::NotOK, "[kvrocks2redis] Invalid increment data size");
return {Status::NotOK, "[kvrocks2redis] Invalid increment data size"};
}
incr_state_ = Incr_batch_data;
}

if (incr_state_ == IncrementBatchLoopState::Incr_batch_data) {
// Read bulk data (batch data)
if (incr_bulk_len_ + 2 <= evbuffer_get_length(evbuf)) { // We got enough data
bulk_data = reinterpret_cast<char *>(evbuffer_pullup(evbuf, static_cast<ssize_t>(incr_bulk_len_) + 2));
char *bulk_data = reinterpret_cast<char *>(evbuffer_pullup(evbuf, static_cast<ssize_t>(incr_bulk_len_) + 2));
std::string bulk_data_str = std::string(bulk_data, incr_bulk_len_);
// Skip the ping packet
if (bulk_data_str != "ping") {
Expand Down Expand Up @@ -227,7 +228,7 @@ Status Sync::updateNextSeq(rocksdb::SequenceNumber seq) {
Status Sync::readNextSeqFromFile(rocksdb::SequenceNumber *seq) {
next_seq_fd_ = open(config_->next_seq_file_path.data(), O_RDWR | O_CREAT, 0666);
if (next_seq_fd_ < 0) {
return Status(Status::NotOK, std::string("Failed to open next seq file :") + strerror(errno));
return {Status::NotOK, std::string("Failed to open next seq file :") + strerror(errno)};
}

*seq = 0;
Expand Down

0 comments on commit 5bb1ad1

Please sign in to comment.