From 0046247c045a4b67b9540a61dc07a36ead5d510d Mon Sep 17 00:00:00 2001 From: Yaroslav Stepanchuk Date: Thu, 5 Jan 2023 00:18:22 +0200 Subject: [PATCH 1/3] Improve error messages on kvrocks configuration loading --- src/config/config.cc | 119 +++++++++++++++++++-------------------- src/config/config_type.h | 8 +-- src/main.cc | 3 +- 3 files changed, 63 insertions(+), 67 deletions(-) diff --git a/src/config/config.cc b/src/config/config.cc index 8a8cb810506..9cc9f9d3ca8 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -20,18 +20,14 @@ #include "config.h" -#include #include #include -#include -#include #include #include #include #include #include -#include #include #include #include @@ -41,18 +37,15 @@ #include "fmt/format.h" #include "parse_util.h" #include "server/server.h" -#include "server/tls_util.h" #include "status.h" const char *kDefaultNamespace = "__namespace"; +const char *kDefaultBindAddress = "127.0.0.1"; -const char *errNotEnableBlobDB = "Must set rocksdb.enable_blob_files to yes first."; - -const char *errNotSetLevelCompactionDynamicLevelBytes = +const char *errBlobDbNotEnabled = "Must set rocksdb.enable_blob_files to yes first."; +const char *errLevelCompactionDynamicLevelBytesNotSet = "Must set rocksdb.level_compaction_dynamic_level_bytes yes first."; -const char *kDefaultBindAddress = "127.0.0.1"; - configEnum compression_type_enum[] = { {"no", rocksdb::CompressionType::kNoCompression}, {"snappy", rocksdb::CompressionType::kSnappyCompression}, {"lz4", rocksdb::CompressionType::kLZ4Compression}, {"zstd", rocksdb::CompressionType::kZSTD}, @@ -65,13 +58,13 @@ configEnum supervised_mode_enum[] = {{"no", kSupervisedNone}, {nullptr, 0}}; std::string trimRocksDBPrefix(std::string s) { - if (strncasecmp(s.data(), "rocksdb.", 8)) return s; + if (strncasecmp(s.data(), "rocksdb.", 8) != 0) return s; return s.substr(8, s.size() - 8); } int configEnumGetValue(configEnum *ce, const char *name) { while (ce->name != nullptr) { - if (!strcasecmp(ce->name, name)) return ce->val; + if (strcasecmp(ce->name, name) == 0) return ce->val; ce++; } return INT_MIN; @@ -228,17 +221,17 @@ void Config::initFieldValidator() { {"requirepass", [this](const std::string &k, const std::string &v) -> Status { if (v.empty() && !tokens.empty()) { - return Status(Status::NotOK, "requirepass empty not allowed while the namespace exists"); + return {Status::NotOK, "requirepass empty not allowed while the namespace exists"}; } if (tokens.find(v) != tokens.end()) { - return Status(Status::NotOK, "requirepass is duplicated with namespace tokens"); + return {Status::NotOK, "requirepass is duplicated with namespace tokens"}; } return Status::OK(); }}, {"masterauth", [this](const std::string &k, const std::string &v) -> Status { if (tokens.find(v) != tokens.end()) { - return Status(Status::NotOK, "masterauth is duplicated with namespace tokens"); + return {Status::NotOK, "masterauth is duplicated with namespace tokens"}; } return Status::OK(); }}, @@ -276,17 +269,17 @@ void Config::initFieldValidator() { for (auto &p : all_args) { std::vector args = Util::Split(p, " \t"); if (args.size() != 2) { - return Status(Status::NotOK, "Invalid rename-command format"); + return {Status::NotOK, "Invalid rename-command format"}; } auto commands = Redis::GetCommands(); auto cmd_iter = commands->find(Util::ToLower(args[0])); if (cmd_iter == commands->end()) { - return Status(Status::NotOK, "No such command in rename-command"); + return {Status::NotOK, "No such command in rename-command"}; } if (args[1] != "\"\"") { auto new_command_name = Util::ToLower(args[1]); if (commands->find(new_command_name) != commands->end()) { - return Status(Status::NotOK, "Target command name already exists"); + return {Status::NotOK, "Target command name already exists"}; } (*commands)[new_command_name] = cmd_iter->second; } @@ -366,7 +359,6 @@ void Config::initFieldCallback() { }}, {"bind", [this](Server *srv, const std::string &k, const std::string &v) -> Status { - trimRocksDBPrefix(k); std::vector args = Util::Split(v, " \t"); binds = std::move(args); return Status::OK(); @@ -383,12 +375,12 @@ void Config::initFieldCallback() { return Status::OK(); } std::vector args = Util::Split(v, " \t"); - if (args.size() != 2) return Status(Status::NotOK, "wrong number of arguments"); + if (args.size() != 2) return {Status::NotOK, "wrong number of arguments"}; if (args[0] != "no" && args[1] != "one") { master_host = args[0]; auto parse_result = ParseInt(args[1].c_str(), NumericRange{1, PORT_LIMIT - 1}, 10); if (!parse_result) { - return Status(Status::NotOK, "should be between 0 and 65535"); + return {Status::NotOK, "should be between 0 and 65535"}; } master_port = *parse_result; } @@ -406,7 +398,7 @@ void Config::initFieldCallback() { return Status::OK(); } if (!Redis::IsCommandExists(cmd)) { - return Status(Status::NotOK, cmd + " is not Kvrocks supported command"); + return {Status::NotOK, cmd + " is not Kvrocks supported command"}; } // profiling_sample_commands use command's original name, regardless of rename-command directive profiling_sample_commands.insert(cmd); @@ -502,7 +494,7 @@ void Config::initFieldCallback() { [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); if (!RocksDB.enable_blob_files) { - return Status(Status::NotOK, errNotEnableBlobDB); + return {Status::NotOK, errBlobDbNotEnabled}; } return srv->storage_->SetColumnFamilyOption(trimRocksDBPrefix(k), v); }}, @@ -510,7 +502,7 @@ void Config::initFieldCallback() { [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); if (!RocksDB.enable_blob_files) { - return Status(Status::NotOK, errNotEnableBlobDB); + return {Status::NotOK, errBlobDbNotEnabled}; } return srv->storage_->SetColumnFamilyOption(trimRocksDBPrefix(k), std::to_string(RocksDB.blob_file_size)); }}, @@ -518,7 +510,7 @@ void Config::initFieldCallback() { [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); if (!RocksDB.enable_blob_files) { - return Status(Status::NotOK, errNotEnableBlobDB); + return {Status::NotOK, errBlobDbNotEnabled}; } std::string enable_blob_garbage_collection = v == "yes" ? "true" : "false"; return srv->storage_->SetColumnFamilyOption(trimRocksDBPrefix(k), enable_blob_garbage_collection); @@ -527,16 +519,16 @@ void Config::initFieldCallback() { [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); if (!RocksDB.enable_blob_files) { - return Status(Status::NotOK, errNotEnableBlobDB); + return {Status::NotOK, errBlobDbNotEnabled}; } int val = 0; auto parse_result = ParseInt(v, 10); if (!parse_result) { - return Status(Status::NotOK, "Illegal blob_garbage_collection_age_cutoff value."); + return {Status::NotOK, "Illegal blob_garbage_collection_age_cutoff value."}; } val = *parse_result; if (val < 0 || val > 100) { - return Status(Status::NotOK, "blob_garbage_collection_age_cutoff must >= 0 and <= 100."); + return {Status::NotOK, "blob_garbage_collection_age_cutoff must >= 0 and <= 100."}; } double cutoff = val / 100.0; @@ -552,7 +544,7 @@ void Config::initFieldCallback() { [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); if (!RocksDB.level_compaction_dynamic_level_bytes) { - return Status(Status::NotOK, errNotSetLevelCompactionDynamicLevelBytes); + return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet}; } return srv->storage_->SetColumnFamilyOption(trimRocksDBPrefix(k), std::to_string(RocksDB.max_bytes_for_level_base)); @@ -561,7 +553,7 @@ void Config::initFieldCallback() { [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); if (!RocksDB.level_compaction_dynamic_level_bytes) { - return Status(Status::NotOK, errNotSetLevelCompactionDynamicLevelBytes); + return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet}; } return srv->storage_->SetColumnFamilyOption(trimRocksDBPrefix(k), v); }}, @@ -628,7 +620,7 @@ Status Config::parseConfigFromPair(const std::pair &in std::string field_key = Util::ToLower(input.first); const char ns_str[] = "namespace."; size_t ns_str_size = sizeof(ns_str) - 1; - if (!strncasecmp(input.first.data(), ns_str, ns_str_size)) { + if (strncasecmp(input.first.data(), ns_str, ns_str_size) == 0) { // namespace should keep key case-sensitive field_key = input.first; tokens[input.second] = input.first.substr(ns_str_size); @@ -638,14 +630,14 @@ Status Config::parseConfigFromPair(const std::pair &in auto &field = iter->second; field->line_number = line_number; auto s = field->Set(input.second); - if (!s.IsOK()) return s; + if (!s.IsOK()) return s.Prefixed(fmt::format("failed to set value of field '{}'", field_key)); } return Status::OK(); } Status Config::parseConfigFromString(const std::string &input, int line_number) { auto parsed = ParseConfigLine(input); - if (!parsed) return parsed.ToStatus(); + if (!parsed) return parsed.ToStatus().Prefixed("malformed line"); auto kv = std::move(*parsed); @@ -656,21 +648,21 @@ Status Config::parseConfigFromString(const std::string &input, int line_number) Status Config::finish() { if (requirepass.empty() && !tokens.empty()) { - return Status(Status::NotOK, "requirepass empty wasn't allowed while the namespace exists"); + return {Status::NotOK, "requirepass empty wasn't allowed while the namespace exists"}; } if ((cluster_enabled) && !tokens.empty()) { - return Status(Status::NotOK, "enabled cluster mode wasn't allowed while the namespace exists"); + return {Status::NotOK, "enabled cluster mode wasn't allowed while the namespace exists"}; } if (unixsocket.empty() && binds.size() == 0) { binds.emplace_back(kDefaultBindAddress); } if (cluster_enabled && binds.size() == 0) { - return Status(Status::NotOK, - "node is in cluster mode, but TCP listen address " - "wasn't specified via configuration file"); + return {Status::NotOK, + "node is in cluster mode, but TCP listen address " + "wasn't specified via configuration file"}; } if (master_port != 0 && binds.size() == 0) { - return Status(Status::NotOK, "replication doesn't supports unix socket"); + return {Status::NotOK, "replication doesn't support unix socket"}; } if (db_dir.empty()) db_dir = dir + "/db"; if (backup_dir.empty()) backup_dir = dir + "/backup"; @@ -679,7 +671,7 @@ Status Config::finish() { std::vector createDirs = {dir}; for (const auto &name : createDirs) { auto s = rocksdb::Env::Default()->CreateDirIfMissing(name); - if (!s.ok()) return Status(Status::NotOK, s.ToString()); + if (!s.ok()) return {Status::NotOK, s.ToString()}; } return Status::OK(); } @@ -693,7 +685,10 @@ Status Config::Load(const CLIOptions &opts) { } else { path_ = opts.conf_file; file.open(path_); - if (!file.is_open()) return Status::FromErrno(); + if (!file.is_open()) { + return {Status::NotOK, fmt::format("failed to open file '{}': {}", path_, strerror(errno))}; + } + in = &file; } @@ -701,15 +696,15 @@ Status Config::Load(const CLIOptions &opts) { int line_num = 1; while (!in->eof()) { std::getline(*in, line); - if (auto s = parseConfigFromString(line, line_num); !s) { - return {Status::NotOK, fmt::format("at line: #L{}, err: {}", line_num, s.Msg())}; + if (auto s = parseConfigFromString(line, line_num); !s.IsOK()) { + return s.Prefixed(fmt::format("at line #L{}", line_num)); } + line_num++; } } else { - std::cout << "Warn: no config file specified, using the default config. " - "In order to specify a config file use kvrocks -c /path/to/kvrocks.conf" - << std::endl; + std::cout << "WARNING: No config file specified, using the default configuration. " + << "In order to specify a config file use 'kvrocks -c /path/to/kvrocks.conf'" << std::endl; } for (const auto &opt : opts.cli_options) { @@ -722,8 +717,7 @@ Status Config::Load(const CLIOptions &opts) { if (iter.second->line_number != 0 && iter.second->validate) { auto s = iter.second->validate(iter.first, iter.second->ToString()); if (!s.IsOK()) { - return {Status::NotOK, - fmt::format("at line: #L{}, {} is invalid: {}", iter.second->line_number, iter.first, s.Msg())}; + return s.Prefixed(fmt::format("at line #L{}: {} is invalid", iter.second->line_number, iter.first)); } } } @@ -732,7 +726,7 @@ Status Config::Load(const CLIOptions &opts) { if (iter.second->callback) { auto s = iter.second->callback(nullptr, iter.first, iter.second->ToString()); if (!s.IsOK()) { - return {Status::NotOK, fmt::format("{} in key '{}'", s.Msg(), iter.first)}; + return s.Prefixed(fmt::format("while changing key '{}'", iter.first)); } } } @@ -760,7 +754,7 @@ Status Config::Set(Server *svr, std::string key, const std::string &value) { key = Util::ToLower(key); auto iter = fields_.find(key); if (iter == fields_.end() || iter->second->readonly) { - return Status(Status::NotOK, "Unsupported CONFIG parameter: " + key); + return {Status::NotOK, "Unsupported CONFIG parameter: " + key}; } auto &field = iter->second; if (field->validate) { @@ -779,6 +773,7 @@ Status Config::Rewrite() { if (path_.empty()) { return {Status::NotOK, "the server is running without a config file"}; } + std::vector lines; std::map new_config; for (const auto &iter : fields_) { @@ -848,19 +843,19 @@ Status Config::GetNamespace(const std::string &ns, std::string *token) { return Status::OK(); } } - return Status(Status::NotFound); + return {Status::NotFound}; } Status Config::SetNamespace(const std::string &ns, const std::string &token) { if (ns == kDefaultNamespace) { - return Status(Status::NotOK, "forbidden to update the default namespace"); + return {Status::NotOK, "forbidden to update the default namespace"}; } if (tokens.find(token) != tokens.end()) { - return Status(Status::NotOK, "the token has already exists"); + return {Status::NotOK, "the token has already exists"}; } if (token == requirepass || token == masterauth) { - return Status(Status::NotOK, "the token is duplicated with requirepass or masterauth"); + return {Status::NotOK, "the token is duplicated with requirepass or masterauth"}; } for (const auto &iter : tokens) { @@ -876,32 +871,32 @@ Status Config::SetNamespace(const std::string &ns, const std::string &token) { return s; } } - return Status(Status::NotOK, "the namespace was not found"); + return {Status::NotOK, "the namespace was not found"}; } Status Config::AddNamespace(const std::string &ns, const std::string &token) { if (requirepass.empty()) { - return Status(Status::NotOK, "forbidden to add namespace when requirepass was empty"); + return {Status::NotOK, "forbidden to add namespace when requirepass was empty"}; } if (cluster_enabled) { - return Status(Status::NotOK, "forbidden to add namespace when cluster mode was enabled"); + return {Status::NotOK, "forbidden to add namespace when cluster mode was enabled"}; } if (ns == kDefaultNamespace) { - return Status(Status::NotOK, "forbidden to add the default namespace"); + return {Status::NotOK, "forbidden to add the default namespace"}; } auto s = isNamespaceLegal(ns); if (!s.IsOK()) return s; if (tokens.find(token) != tokens.end()) { - return Status(Status::NotOK, "the token has already exists"); + return {Status::NotOK, "the token has already exists"}; } if (token == requirepass || token == masterauth) { - return Status(Status::NotOK, "the token is duplicated with requirepass or masterauth"); + return {Status::NotOK, "the token is duplicated with requirepass or masterauth"}; } for (const auto &iter : tokens) { if (iter.second == ns) { - return Status(Status::NotOK, "the namespace has already exists"); + return {Status::NotOK, "the namespace has already exists"}; } } tokens[token] = ns; @@ -915,7 +910,7 @@ Status Config::AddNamespace(const std::string &ns, const std::string &token) { Status Config::DelNamespace(const std::string &ns) { if (ns == kDefaultNamespace) { - return Status(Status::NotOK, "forbidden to delete the default namespace"); + return {Status::NotOK, "forbidden to delete the default namespace"}; } for (const auto &iter : tokens) { if (iter.second == ns) { diff --git a/src/config/config_type.h b/src/config/config_type.h index 566f699454c..5d6c7b6961a 100644 --- a/src/config/config_type.h +++ b/src/config/config_type.h @@ -62,8 +62,8 @@ class ConfigField { virtual ~ConfigField() = default; virtual std::string ToString() = 0; virtual Status Set(const std::string &v) = 0; - virtual Status ToNumber(int64_t *n) { return Status(Status::NotOK, "not supported"); } - virtual Status ToBool(bool *b) { return Status(Status::NotOK, "not supported"); } + virtual Status ToNumber(int64_t *n) { return {Status::NotOK, "not supported"}; } + virtual Status ToBool(bool *b) { return {Status::NotOK, "not supported"}; } virtual configType GetConfigType() { return config_type; } virtual bool IsMultiConfig() { return config_type == configType::MultiConfig; } virtual bool IsSingleConfig() { return config_type == configType::SingleConfig; } @@ -176,7 +176,7 @@ class YesNoField : public ConfigField { } else if (strcasecmp(v.data(), "no") == 0) { *receiver_ = false; } else { - return Status(Status::NotOK, "argument must be 'yes' or 'no'"); + return {Status::NotOK, "argument must be 'yes' or 'no'"}; } return Status::OK(); } @@ -197,7 +197,7 @@ class EnumField : public ConfigField { Status Set(const std::string &v) override { int e = configEnumGetValue(enums_, v.c_str()); if (e == INT_MIN) { - return Status(Status::NotOK, "invalid enum option"); + return {Status::NotOK, "invalid enum option"}; } *receiver_ = e; return Status::OK(); diff --git a/src/main.cc b/src/main.cc index 8efd6a9999c..12b797f7672 100644 --- a/src/main.cc +++ b/src/main.cc @@ -306,9 +306,10 @@ int main(int argc, char *argv[]) { Config config; Status s = config.Load(opts); if (!s.IsOK()) { - std::cout << "Failed to load config, err: " << s.Msg() << std::endl; + std::cout << "Failed to load config. Error: " << s.Msg() << std::endl; return 1; } + initGoogleLog(&config); printVersion(LOG(INFO)); // Tricky: We don't expect that different instances running on the same port, From 5bb1ad1727c1b408e4d9e05f4a40e23d8091c6d9 Mon Sep 17 00:00:00 2001 From: Yaroslav Stepanchuk Date: Thu, 5 Jan 2023 13:53:46 +0200 Subject: [PATCH 2/3] Improve error messages on kvrocks2redis configuration loading --- src/config/config.cc | 2 +- utils/kvrocks2redis/config.cc | 61 ++++++++++++++++++-------- utils/kvrocks2redis/config.h | 1 + utils/kvrocks2redis/kvrocks2redis.conf | 10 ++++- utils/kvrocks2redis/main.cc | 23 ++++++---- utils/kvrocks2redis/sync.cc | 17 +++---- 6 files changed, 75 insertions(+), 39 deletions(-) diff --git a/src/config/config.cc b/src/config/config.cc index 9cc9f9d3ca8..e0cf0f8c275 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -20,6 +20,7 @@ #include "config.h" +#include #include #include @@ -34,7 +35,6 @@ #include "config_type.h" #include "config_util.h" -#include "fmt/format.h" #include "parse_util.h" #include "server/server.h" #include "status.h" diff --git a/utils/kvrocks2redis/config.cc b/utils/kvrocks2redis/config.cc index 82e87f202bd..109ef0acc47 100644 --- a/utils/kvrocks2redis/config.cc +++ b/utils/kvrocks2redis/config.cc @@ -20,6 +20,7 @@ #include "config.h" +#include #include #include @@ -52,16 +53,17 @@ Status Config::parseConfigFromString(const std::string &input) { args[0] = Util::ToLower(args[0]); size_t size = args.size(); if (size == 2 && args[0] == "daemonize") { - int i = 0; - if ((i = yesnotoi(args[1])) == -1) { - return Status(Status::NotOK, "argument must be 'yes' or 'no'"); + if (int i = yesnotoi(args[1]); i == -1) { + return {Status::NotOK, "the value of 'daemonize' must be 'yes' or 'no'"}; + } else { + daemonize = (i == 1); } - daemonize = (i == 1); } else if (size == 2 && args[0] == "data-dir") { data_dir = args[1]; if (data_dir.empty()) { - return Status(Status::NotOK, "data_dir is empty"); + return {Status::NotOK, "'data-dir' was not specified"}; } + if (data_dir.back() != '/') { data_dir += "/"; } @@ -69,8 +71,9 @@ Status Config::parseConfigFromString(const std::string &input) { } else if (size == 2 && args[0] == "output-dir") { output_dir = args[1]; if (output_dir.empty()) { - return Status(Status::NotOK, "output-dir is empty"); + return {Status::NotOK, "'output-dir' was not specified"}; } + if (output_dir.back() != '/') { output_dir += "/"; } @@ -90,31 +93,38 @@ Status Config::parseConfigFromString(const std::string &input) { // In new versions, we don't use extra port to implement replication kvrocks_port = std::stoi(args[2]); if (kvrocks_port <= 0 || kvrocks_port > 65535) { - return Status(Status::NotOK, "kvrocks port range should be between 0 and 65535"); + return {Status::NotOK, "Kvrocks port value should be between 0 and 65535"}; } + if (size == 4) { kvrocks_auth = args[3]; } } else if (size == 2 && args[0] == "cluster-enable") { - int i = 0; - if ((i = yesnotoi(args[1])) == -1) { - return Status(Status::NotOK, "argument must be 'yes' or 'no'"); + if (int i = yesnotoi(args[1]); i == -1) { + return {Status::NotOK, "the value of 'cluster-enable' must be 'yes' or 'no'"}; + } else { + cluster_enable = (i == 1); } - cluster_enable = (i == 1); - } else if (size >= 3 && !strncasecmp(args[0].data(), "namespace.", 10)) { + } else if (size >= 3 && strncasecmp(args[0].data(), "namespace.", 10) == 0) { std::string ns = args[0].substr(10, args.size() - 10); if (ns.size() > INT8_MAX) { - return Status(Status::NotOK, std::string("namespace size exceed limit ") + std::to_string(INT8_MAX)); + return {Status::NotOK, std::string("namespace size exceed limit ") + std::to_string(INT8_MAX)}; } + tokens[ns].host = args[1]; tokens[ns].port = std::stoi(args[2]); + if (tokens[ns].port <= 0 || tokens[ns].port > 65535) { + return {Status::NotOK, "Redis port value should be between 0 and 65535"}; + } + if (size >= 4) { tokens[ns].auth = args[3]; } tokens[ns].db_number = size == 5 ? std::atoi(args[4].c_str()) : 0; } else { - return Status(Status::NotOK, "Bad directive or wrong number of arguments"); + return {Status::NotOK, "unknown configuration directive or wrong number of arguments"}; } + return Status::OK(); } @@ -122,7 +132,7 @@ Status Config::Load(std::string path) { path_ = std::move(path); std::ifstream file(path_); if (!file.is_open()) { - return Status(Status::NotOK, strerror(errno)); + return {Status::NotOK, fmt::format("failed to open file '{}': {}", path_, strerror(errno))}; } std::string line; @@ -131,16 +141,29 @@ Status Config::Load(std::string path) { std::getline(file, line); Status s = parseConfigFromString(line); if (!s.IsOK()) { - file.close(); - return Status(Status::NotOK, "at line: #L" + std::to_string(line_num) + ", err: " + s.Msg()); + return s.Prefixed(fmt::format("at line #L{}", line_num)); } + line_num++; } auto s = rocksdb::Env::Default()->FileExists(data_dir); - if (!s.ok()) return Status(Status::NotOK, s.ToString()); + if (!s.ok()) { + if (s.IsNotFound()) { + return {Status::NotOK, fmt::format("the specified Kvrocks working directory '{}' doesn't exist", data_dir)}; + } + return {Status::NotOK, s.ToString()}; + } + + s = rocksdb::Env::Default()->FileExists(output_dir); + if (!s.ok()) { + if (s.IsNotFound()) { + return {Status::NotOK, + fmt::format("the specified directory '{}' for intermediate files doesn't exist", output_dir)}; + } + return {Status::NotOK, s.ToString()}; + } - file.close(); return Status::OK(); } diff --git a/utils/kvrocks2redis/config.h b/utils/kvrocks2redis/config.h index b5444d75482..ea1e729cdec 100644 --- a/utils/kvrocks2redis/config.h +++ b/utils/kvrocks2redis/config.h @@ -34,6 +34,7 @@ struct redis_server { std::string auth; int db_number; }; + struct Config { public: int loglevel = 0; diff --git a/utils/kvrocks2redis/kvrocks2redis.conf b/utils/kvrocks2redis/kvrocks2redis.conf index 92aca9ed0b9..26fe7742f3e 100644 --- a/utils/kvrocks2redis/kvrocks2redis.conf +++ b/utils/kvrocks2redis/kvrocks2redis.conf @@ -2,18 +2,24 @@ # The value should be INFO, WARNING, ERROR, FATAL. # -# Default is INFO +# Default: INFO loglevel INFO # By default kvrocks2redis does not run as a daemon. Use 'yes' if you need it. # Note that kvrocks2redis will write a pid file in /var/run/kvrocks2redis.pid when daemonized. +# +# Default: no daemonize no # The kvrocks working directory. # Note that you must specify a directory here, not a file name. +# +# Default: ./data data-dir ./data # Intermediate files are output to this directory when the kvrocks2redis program runs. +# +# Default: ./ output-dir ./ # Sync kvrocks node. Use the node's Psync command to get the newest wal raw write_batch. @@ -23,7 +29,7 @@ kvrocks 127.0.0.1 6666 # Enable cluster mode. # -# Default: yes +# Default: no cluster-enable yes ################################ NAMESPACE AND Sync Target Redis ##################################### diff --git a/utils/kvrocks2redis/main.cc b/utils/kvrocks2redis/main.cc index 415901082bd..0752a8cf482 100644 --- a/utils/kvrocks2redis/main.cc +++ b/utils/kvrocks2redis/main.cc @@ -85,13 +85,15 @@ static void initGoogleLog(const Kvrocks2redis::Config *config) { static Status createPidFile(const std::string &path) { int fd = open(path.data(), O_RDWR | O_CREAT | O_EXCL, 0660); if (fd < 0) { - return Status(Status::NotOK, strerror(errno)); + return {Status::NotOK, strerror(errno)}; } + std::string pid_str = std::to_string(getpid()); auto s = Util::Write(fd, pid_str); if (!s.IsOK()) { return s.Prefixed("failed to write to PID-file"); } + close(fd); return Status::OK(); } @@ -99,20 +101,20 @@ static Status createPidFile(const std::string &path) { static void removePidFile(const std::string &path) { std::remove(path.data()); } static void daemonize() { - pid_t pid = 0; - - pid = fork(); + pid_t pid = fork(); if (pid < 0) { - LOG(ERROR) << "Failed to fork the process, err: " << strerror(errno); + LOG(ERROR) << "Failed to fork the process. Error: " << strerror(errno); exit(1); } + if (pid > 0) exit(EXIT_SUCCESS); // parent process // change the file mode umask(0); if (setsid() < 0) { - LOG(ERROR) << "Failed to setsid, err: %s" << strerror(errno); + LOG(ERROR) << "Failed to setsid. Error: " << strerror(errno); exit(1); } + close(STDIN_FILENO); close(STDOUT_FILENO); close(STDERR_FILENO); @@ -136,15 +138,17 @@ int main(int argc, char *argv[]) { Kvrocks2redis::Config config; Status s = config.Load(config_file_path); if (!s.IsOK()) { - std::cout << "Failed to load config, err: " << s.Msg() << std::endl; + std::cout << "Failed to load config. Error: " << s.Msg() << std::endl; exit(1); } + initGoogleLog(&config); if (config.daemonize) daemonize(); + s = createPidFile(config.pidfile); if (!s.IsOK()) { - LOG(ERROR) << "Failed to create pidfile: " << s.Msg(); + LOG(ERROR) << "Failed to create pidfile '" << config.pidfile << "': " << s.Msg(); exit(1); } @@ -155,7 +159,7 @@ int main(int argc, char *argv[]) { Engine::Storage storage(&kvrocks_config); s = storage.Open(true); if (!s.IsOK()) { - LOG(ERROR) << "Failed to open: " << s.Msg(); + LOG(ERROR) << "Failed to open Kvrocks storage: " << s.Msg(); exit(1); } @@ -170,6 +174,7 @@ int main(int argc, char *argv[]) { } }; sync.Start(); + removePidFile(config.pidfile); return 0; } diff --git a/utils/kvrocks2redis/sync.cc b/utils/kvrocks2redis/sync.cc index bda9d89978e..e70c2b59b39 100644 --- a/utils/kvrocks2redis/sync.cc +++ b/utils/kvrocks2redis/sync.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -67,10 +66,12 @@ void Sync::Start() { while (!IsStopped()) { auto sock_fd = Util::SockConnect(config_->kvrocks_host, config_->kvrocks_port); if (!sock_fd) { - LOG(ERROR) << sock_fd.Msg(); + LOG(ERROR) << fmt::format("Failed to connect to Kvrocks on {}:{}. Error: {}", config_->kvrocks_host, + config_->kvrocks_port, sock_fd.Msg()); usleep(10000); continue; } + sock_fd_ = *sock_fd; s = auth(); if (!s.IsOK()) { @@ -78,6 +79,7 @@ void Sync::Start() { usleep(10000); continue; } + while (!IsStopped()) { s = tryPSync(); if (!s.IsOK()) { @@ -135,7 +137,7 @@ Status Sync::tryPSync() { " last_next_seq config file, and restart kvrocks2redis, redis reply: " + std::string(line); stop_flag_ = true; - return Status(Status::NotOK, error_msg); + return {Status::NotOK, error_msg}; } // PSYNC isn't OK, we should use parseAllLocalStorage // Switch to parseAllLocalStorage @@ -149,12 +151,11 @@ Status Sync::tryPSync() { Status Sync::incrementBatchLoop() { std::cout << "Start parse increment batch ..." << std::endl; - char *bulk_data = nullptr; evbuffer *evbuf = evbuffer_new(); while (!IsStopped()) { if (evbuffer_read(evbuf, sock_fd_, -1) <= 0) { evbuffer_free(evbuf); - return Status(Status::NotOK, std::string("[kvrocks2redis] read increament batch err: ") + strerror(errno)); + return {Status::NotOK, std::string("[kvrocks2redis] read increament batch err: ") + strerror(errno)}; } if (incr_state_ == IncrementBatchLoopState::Incr_batch_size) { // Read bulk length @@ -165,7 +166,7 @@ Status Sync::incrementBatchLoop() { } incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, nullptr, 10) : 0; if (incr_bulk_len_ == 0) { - return Status(Status::NotOK, "[kvrocks2redis] Invalid increment data size"); + return {Status::NotOK, "[kvrocks2redis] Invalid increment data size"}; } incr_state_ = Incr_batch_data; } @@ -173,7 +174,7 @@ Status Sync::incrementBatchLoop() { if (incr_state_ == IncrementBatchLoopState::Incr_batch_data) { // Read bulk data (batch data) if (incr_bulk_len_ + 2 <= evbuffer_get_length(evbuf)) { // We got enough data - bulk_data = reinterpret_cast(evbuffer_pullup(evbuf, static_cast(incr_bulk_len_) + 2)); + char *bulk_data = reinterpret_cast(evbuffer_pullup(evbuf, static_cast(incr_bulk_len_) + 2)); std::string bulk_data_str = std::string(bulk_data, incr_bulk_len_); // Skip the ping packet if (bulk_data_str != "ping") { @@ -227,7 +228,7 @@ Status Sync::updateNextSeq(rocksdb::SequenceNumber seq) { Status Sync::readNextSeqFromFile(rocksdb::SequenceNumber *seq) { next_seq_fd_ = open(config_->next_seq_file_path.data(), O_RDWR | O_CREAT, 0666); if (next_seq_fd_ < 0) { - return Status(Status::NotOK, std::string("Failed to open next seq file :") + strerror(errno)); + return {Status::NotOK, std::string("Failed to open next seq file :") + strerror(errno)}; } *seq = 0; From f16649c5d3fd440ce8a704d1d2768854b2d14e66 Mon Sep 17 00:00:00 2001 From: Yaroslav Stepanchuk Date: Fri, 6 Jan 2023 07:55:26 +0200 Subject: [PATCH 3/3] Set the `clister-enable` option in kvrocks2redis config to `no` as default value --- utils/kvrocks2redis/kvrocks2redis.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/kvrocks2redis/kvrocks2redis.conf b/utils/kvrocks2redis/kvrocks2redis.conf index 26fe7742f3e..3e3e4258d8c 100644 --- a/utils/kvrocks2redis/kvrocks2redis.conf +++ b/utils/kvrocks2redis/kvrocks2redis.conf @@ -30,7 +30,7 @@ kvrocks 127.0.0.1 6666 # Enable cluster mode. # # Default: no -cluster-enable yes +cluster-enable no ################################ NAMESPACE AND Sync Target Redis ##################################### # Synchronize the specified namespace data to the specified Redis DB.