Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make config load errors more friendly #1217

Merged
merged 3 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 58 additions & 63 deletions src/config/config.cc

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions src/config/config_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ class ConfigField {
virtual ~ConfigField() = default;
virtual std::string ToString() = 0;
virtual Status Set(const std::string &v) = 0;
virtual Status ToNumber(int64_t *n) { return Status(Status::NotOK, "not supported"); }
virtual Status ToBool(bool *b) { return Status(Status::NotOK, "not supported"); }
virtual Status ToNumber(int64_t *n) { return {Status::NotOK, "not supported"}; }
virtual Status ToBool(bool *b) { return {Status::NotOK, "not supported"}; }
virtual configType GetConfigType() { return config_type; }
virtual bool IsMultiConfig() { return config_type == configType::MultiConfig; }
virtual bool IsSingleConfig() { return config_type == configType::SingleConfig; }
Expand Down Expand Up @@ -176,7 +176,7 @@ class YesNoField : public ConfigField {
} else if (strcasecmp(v.data(), "no") == 0) {
*receiver_ = false;
} else {
return Status(Status::NotOK, "argument must be 'yes' or 'no'");
return {Status::NotOK, "argument must be 'yes' or 'no'"};
}
return Status::OK();
}
Expand All @@ -197,7 +197,7 @@ class EnumField : public ConfigField {
Status Set(const std::string &v) override {
int e = configEnumGetValue(enums_, v.c_str());
if (e == INT_MIN) {
return Status(Status::NotOK, "invalid enum option");
return {Status::NotOK, "invalid enum option"};
}
*receiver_ = e;
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,10 @@ int main(int argc, char *argv[]) {
Config config;
Status s = config.Load(opts);
if (!s.IsOK()) {
std::cout << "Failed to load config, err: " << s.Msg() << std::endl;
std::cout << "Failed to load config. Error: " << s.Msg() << std::endl;
return 1;
}

initGoogleLog(&config);
printVersion(LOG(INFO));
// Tricky: We don't expect that different instances running on the same port,
Expand Down
61 changes: 42 additions & 19 deletions utils/kvrocks2redis/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "config.h"

#include <fmt/format.h>
#include <rocksdb/env.h>
#include <strings.h>

Expand Down Expand Up @@ -52,25 +53,27 @@ Status Config::parseConfigFromString(const std::string &input) {
args[0] = Util::ToLower(args[0]);
size_t size = args.size();
if (size == 2 && args[0] == "daemonize") {
int i = 0;
if ((i = yesnotoi(args[1])) == -1) {
return Status(Status::NotOK, "argument must be 'yes' or 'no'");
if (int i = yesnotoi(args[1]); i == -1) {
return {Status::NotOK, "the value of 'daemonize' must be 'yes' or 'no'"};
} else {
daemonize = (i == 1);
}
daemonize = (i == 1);
} else if (size == 2 && args[0] == "data-dir") {
data_dir = args[1];
if (data_dir.empty()) {
return Status(Status::NotOK, "data_dir is empty");
return {Status::NotOK, "'data-dir' was not specified"};
}

if (data_dir.back() != '/') {
data_dir += "/";
}
db_dir = data_dir + "db";
} else if (size == 2 && args[0] == "output-dir") {
output_dir = args[1];
if (output_dir.empty()) {
return Status(Status::NotOK, "output-dir is empty");
return {Status::NotOK, "'output-dir' was not specified"};
}

if (output_dir.back() != '/') {
output_dir += "/";
}
Expand All @@ -90,39 +93,46 @@ Status Config::parseConfigFromString(const std::string &input) {
// In new versions, we don't use extra port to implement replication
kvrocks_port = std::stoi(args[2]);
if (kvrocks_port <= 0 || kvrocks_port > 65535) {
return Status(Status::NotOK, "kvrocks port range should be between 0 and 65535");
return {Status::NotOK, "Kvrocks port value should be between 0 and 65535"};
}

if (size == 4) {
kvrocks_auth = args[3];
}
} else if (size == 2 && args[0] == "cluster-enable") {
int i = 0;
if ((i = yesnotoi(args[1])) == -1) {
return Status(Status::NotOK, "argument must be 'yes' or 'no'");
if (int i = yesnotoi(args[1]); i == -1) {
return {Status::NotOK, "the value of 'cluster-enable' must be 'yes' or 'no'"};
} else {
cluster_enable = (i == 1);
}
cluster_enable = (i == 1);
} else if (size >= 3 && !strncasecmp(args[0].data(), "namespace.", 10)) {
} else if (size >= 3 && strncasecmp(args[0].data(), "namespace.", 10) == 0) {
std::string ns = args[0].substr(10, args.size() - 10);
if (ns.size() > INT8_MAX) {
return Status(Status::NotOK, std::string("namespace size exceed limit ") + std::to_string(INT8_MAX));
return {Status::NotOK, std::string("namespace size exceed limit ") + std::to_string(INT8_MAX)};
}

tokens[ns].host = args[1];
tokens[ns].port = std::stoi(args[2]);
if (tokens[ns].port <= 0 || tokens[ns].port > 65535) {
return {Status::NotOK, "Redis port value should be between 0 and 65535"};
}

if (size >= 4) {
tokens[ns].auth = args[3];
}
tokens[ns].db_number = size == 5 ? std::atoi(args[4].c_str()) : 0;
} else {
return Status(Status::NotOK, "Bad directive or wrong number of arguments");
return {Status::NotOK, "unknown configuration directive or wrong number of arguments"};
}

return Status::OK();
}

Status Config::Load(std::string path) {
path_ = std::move(path);
std::ifstream file(path_);
if (!file.is_open()) {
return Status(Status::NotOK, strerror(errno));
return {Status::NotOK, fmt::format("failed to open file '{}': {}", path_, strerror(errno))};
}

std::string line;
Expand All @@ -131,16 +141,29 @@ Status Config::Load(std::string path) {
std::getline(file, line);
Status s = parseConfigFromString(line);
if (!s.IsOK()) {
file.close();
return Status(Status::NotOK, "at line: #L" + std::to_string(line_num) + ", err: " + s.Msg());
return s.Prefixed(fmt::format("at line #L{}", line_num));
}

line_num++;
}

auto s = rocksdb::Env::Default()->FileExists(data_dir);
if (!s.ok()) return Status(Status::NotOK, s.ToString());
if (!s.ok()) {
if (s.IsNotFound()) {
return {Status::NotOK, fmt::format("the specified Kvrocks working directory '{}' doesn't exist", data_dir)};
}
return {Status::NotOK, s.ToString()};
}

s = rocksdb::Env::Default()->FileExists(output_dir);
if (!s.ok()) {
if (s.IsNotFound()) {
return {Status::NotOK,
fmt::format("the specified directory '{}' for intermediate files doesn't exist", output_dir)};
}
return {Status::NotOK, s.ToString()};
}

file.close();
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions utils/kvrocks2redis/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct redis_server {
std::string auth;
int db_number;
};

struct Config {
public:
int loglevel = 0;
Expand Down
12 changes: 9 additions & 3 deletions utils/kvrocks2redis/kvrocks2redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@

# The value should be INFO, WARNING, ERROR, FATAL.
#
# Default is INFO
# Default: INFO
loglevel INFO

# By default kvrocks2redis does not run as a daemon. Use 'yes' if you need it.
# Note that kvrocks2redis will write a pid file in /var/run/kvrocks2redis.pid when daemonized.
#
# Default: no
daemonize no

# The kvrocks working directory.
# Note that you must specify a directory here, not a file name.
#
# Default: ./data
data-dir ./data

# Intermediate files are output to this directory when the kvrocks2redis program runs.
#
# Default: ./
output-dir ./

# Sync kvrocks node. Use the node's Psync command to get the newest wal raw write_batch.
Expand All @@ -23,8 +29,8 @@ kvrocks 127.0.0.1 6666

# Enable cluster mode.
#
# Default: yes
cluster-enable yes
# Default: no
cluster-enable no

################################ NAMESPACE AND Sync Target Redis #####################################
# Synchronize the specified namespace data to the specified Redis DB.
Expand Down
23 changes: 14 additions & 9 deletions utils/kvrocks2redis/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,34 +85,36 @@ static void initGoogleLog(const Kvrocks2redis::Config *config) {
static Status createPidFile(const std::string &path) {
int fd = open(path.data(), O_RDWR | O_CREAT | O_EXCL, 0660);
if (fd < 0) {
return Status(Status::NotOK, strerror(errno));
return {Status::NotOK, strerror(errno)};
}

std::string pid_str = std::to_string(getpid());
auto s = Util::Write(fd, pid_str);
if (!s.IsOK()) {
return s.Prefixed("failed to write to PID-file");
}

close(fd);
return Status::OK();
}

static void removePidFile(const std::string &path) { std::remove(path.data()); }

static void daemonize() {
pid_t pid = 0;

pid = fork();
pid_t pid = fork();
if (pid < 0) {
LOG(ERROR) << "Failed to fork the process, err: " << strerror(errno);
LOG(ERROR) << "Failed to fork the process. Error: " << strerror(errno);
exit(1);
}

if (pid > 0) exit(EXIT_SUCCESS); // parent process
// change the file mode
umask(0);
if (setsid() < 0) {
LOG(ERROR) << "Failed to setsid, err: %s" << strerror(errno);
LOG(ERROR) << "Failed to setsid. Error: " << strerror(errno);
exit(1);
}

close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
Expand All @@ -136,15 +138,17 @@ int main(int argc, char *argv[]) {
Kvrocks2redis::Config config;
Status s = config.Load(config_file_path);
if (!s.IsOK()) {
std::cout << "Failed to load config, err: " << s.Msg() << std::endl;
std::cout << "Failed to load config. Error: " << s.Msg() << std::endl;
exit(1);
}

initGoogleLog(&config);

if (config.daemonize) daemonize();

s = createPidFile(config.pidfile);
if (!s.IsOK()) {
LOG(ERROR) << "Failed to create pidfile: " << s.Msg();
LOG(ERROR) << "Failed to create pidfile '" << config.pidfile << "': " << s.Msg();
exit(1);
}

Expand All @@ -155,7 +159,7 @@ int main(int argc, char *argv[]) {
Engine::Storage storage(&kvrocks_config);
s = storage.Open(true);
if (!s.IsOK()) {
LOG(ERROR) << "Failed to open: " << s.Msg();
LOG(ERROR) << "Failed to open Kvrocks storage: " << s.Msg();
exit(1);
}

Expand All @@ -170,6 +174,7 @@ int main(int argc, char *argv[]) {
}
};
sync.Start();

removePidFile(config.pidfile);
return 0;
}
17 changes: 9 additions & 8 deletions utils/kvrocks2redis/sync.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <fcntl.h>
#include <glog/logging.h>
#include <rocksdb/write_batch.h>
Expand Down Expand Up @@ -67,17 +66,20 @@ void Sync::Start() {
while (!IsStopped()) {
auto sock_fd = Util::SockConnect(config_->kvrocks_host, config_->kvrocks_port);
if (!sock_fd) {
LOG(ERROR) << sock_fd.Msg();
LOG(ERROR) << fmt::format("Failed to connect to Kvrocks on {}:{}. Error: {}", config_->kvrocks_host,
config_->kvrocks_port, sock_fd.Msg());
usleep(10000);
continue;
}

sock_fd_ = *sock_fd;
s = auth();
if (!s.IsOK()) {
LOG(ERROR) << s.Msg();
usleep(10000);
continue;
}

while (!IsStopped()) {
s = tryPSync();
if (!s.IsOK()) {
Expand Down Expand Up @@ -135,7 +137,7 @@ Status Sync::tryPSync() {
" last_next_seq config file, and restart kvrocks2redis, redis reply: " +
std::string(line);
stop_flag_ = true;
return Status(Status::NotOK, error_msg);
return {Status::NotOK, error_msg};
}
// PSYNC isn't OK, we should use parseAllLocalStorage
// Switch to parseAllLocalStorage
Expand All @@ -149,12 +151,11 @@ Status Sync::tryPSync() {

Status Sync::incrementBatchLoop() {
std::cout << "Start parse increment batch ..." << std::endl;
char *bulk_data = nullptr;
evbuffer *evbuf = evbuffer_new();
while (!IsStopped()) {
if (evbuffer_read(evbuf, sock_fd_, -1) <= 0) {
evbuffer_free(evbuf);
return Status(Status::NotOK, std::string("[kvrocks2redis] read increament batch err: ") + strerror(errno));
return {Status::NotOK, std::string("[kvrocks2redis] read increament batch err: ") + strerror(errno)};
}
if (incr_state_ == IncrementBatchLoopState::Incr_batch_size) {
// Read bulk length
Expand All @@ -165,15 +166,15 @@ Status Sync::incrementBatchLoop() {
}
incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, nullptr, 10) : 0;
if (incr_bulk_len_ == 0) {
return Status(Status::NotOK, "[kvrocks2redis] Invalid increment data size");
return {Status::NotOK, "[kvrocks2redis] Invalid increment data size"};
}
incr_state_ = Incr_batch_data;
}

if (incr_state_ == IncrementBatchLoopState::Incr_batch_data) {
// Read bulk data (batch data)
if (incr_bulk_len_ + 2 <= evbuffer_get_length(evbuf)) { // We got enough data
bulk_data = reinterpret_cast<char *>(evbuffer_pullup(evbuf, static_cast<ssize_t>(incr_bulk_len_) + 2));
char *bulk_data = reinterpret_cast<char *>(evbuffer_pullup(evbuf, static_cast<ssize_t>(incr_bulk_len_) + 2));
std::string bulk_data_str = std::string(bulk_data, incr_bulk_len_);
// Skip the ping packet
if (bulk_data_str != "ping") {
Expand Down Expand Up @@ -227,7 +228,7 @@ Status Sync::updateNextSeq(rocksdb::SequenceNumber seq) {
Status Sync::readNextSeqFromFile(rocksdb::SequenceNumber *seq) {
next_seq_fd_ = open(config_->next_seq_file_path.data(), O_RDWR | O_CREAT, 0666);
if (next_seq_fd_ < 0) {
return Status(Status::NotOK, std::string("Failed to open next seq file :") + strerror(errno));
return {Status::NotOK, std::string("Failed to open next seq file :") + strerror(errno)};
}

*seq = 0;
Expand Down