diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index bde60fa5e53..114eee15f27 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -1275,7 +1275,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr("auth", 2, "read-only ok-loadin MakeCmdAttr("info", -1, "read-only ok-loading", 0, 0, 0), MakeCmdAttr("role", 1, "read-only ok-loading", 0, 0, 0), MakeCmdAttr("config", -2, "read-only", 0, 0, 0, GenerateConfigFlag), - MakeCmdAttr("namespace", -3, "read-only exclusive", 0, 0, 0), + MakeCmdAttr("namespace", -3, "read-only", 0, 0, 0), MakeCmdAttr("keys", 2, "read-only", 0, 0, 0), MakeCmdAttr("flushdb", 1, "write no-dbsize-check", 0, 0, 0), MakeCmdAttr("flushall", 1, "write no-dbsize-check", 0, 0, 0), diff --git a/src/config/config.cc b/src/config/config.cc index 500ac717008..4139dfc6cb4 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -885,11 +885,20 @@ Status Config::Set(Server *srv, std::string key, const std::string &value) { if (!s.IsOK()) return s.Prefixed("invalid value"); } + auto origin_value = field->ToString(); auto s = field->Set(value); if (!s.IsOK()) return s.Prefixed("failed to set new value"); if (field->callback) { - return field->callback(srv, key, value); + s = field->callback(srv, key, value); + if (!s.IsOK()) { + // rollback the value if the callback failed + auto set_status = field->Set(origin_value); + if (!set_status.IsOK()) { + return set_status.Prefixed("failed to rollback the value"); + } + } + return s; } return Status::OK(); diff --git a/src/server/namespace.cc b/src/server/namespace.cc index 504ad523681..31b169ed200 100644 --- a/src/server/namespace.cc +++ b/src/server/namespace.cc @@ -51,40 +51,53 @@ bool Namespace::IsAllowModify() const { return config->HasConfigFile() || config->repl_namespace_enabled; } +Status Namespace::loadFromDB(std::map* db_tokens) const { + std::string value; + auto s = storage_->Get(rocksdb::ReadOptions(), cf_, kNamespaceDBKey, &value); + if (!s.ok()) { + if (s.IsNotFound()) return Status::OK(); + return {Status::NotOK, s.ToString()}; + } + + jsoncons::json j = jsoncons::json::parse(value); + for (const auto& iter : j.object_range()) { + db_tokens->insert({iter.key(), iter.value().as_string()}); + } + return Status::OK(); +} + Status Namespace::LoadAndRewrite() { auto config = storage_->GetConfig(); // Namespace is NOT allowed in the cluster mode, so we don't need to rewrite here. if (config->cluster_enabled) return Status::OK(); - // Load from the configuration file first - tokens_ = config->load_tokens; + std::map db_tokens; + auto s = loadFromDB(&db_tokens); + if (!s.IsOK()) return s; - // We would like to load namespaces from db even if repl_namespace_enabled is false, - // this can avoid missing some namespaces when turn on/off repl_namespace_enabled. - std::string value; - auto s = storage_->Get(rocksdb::ReadOptions(), cf_, kNamespaceDBKey, &value); - if (!s.ok() && !s.IsNotFound()) { - return {Status::NotOK, s.ToString()}; + if (!db_tokens.empty() && !config->repl_namespace_enabled) { + return {Status::NotOK, "cannot switch off repl_namespace_enabled when namespaces exist in db"}; } - if (s.ok()) { - // The namespace db key is existed, so it doesn't allow to switch off repl_namespace_enabled - if (!config->repl_namespace_enabled) { - return {Status::NotOK, "cannot switch off repl_namespace_enabled when namespaces exist in db"}; - } - jsoncons::json j = jsoncons::json::parse(value); - for (const auto& iter : j.object_range()) { - if (tokens_.find(iter.key()) == tokens_.end()) { - // merge the namespace from db - tokens_[iter.key()] = iter.value().as(); - } + std::unique_lock lock(tokens_mu_); + // Load from the configuration file first + tokens_ = config->load_tokens; + // Merge the tokens from the database if the token is not in the configuration file + for (const auto& iter : db_tokens) { + if (tokens_.find(iter.first) == tokens_.end()) { + tokens_[iter.first] = iter.second; } } - return Rewrite(); + // The following rewrite is to remove namespace/token pairs from the configuration if the namespace replication + // is enabled. So we don't need to do that if no tokens are loaded or the namespace replication is disabled. + if (config->load_tokens.empty() || !config->repl_namespace_enabled) return Status::OK(); + + return Rewrite(tokens_); } -StatusOr Namespace::Get(const std::string& ns) const { +StatusOr Namespace::Get(const std::string& ns) { + std::shared_lock lock(tokens_mu_); for (const auto& iter : tokens_) { if (iter.second == ns) { return iter.first; @@ -93,7 +106,8 @@ StatusOr Namespace::Get(const std::string& ns) const { return {Status::NotFound}; } -StatusOr Namespace::GetByToken(const std::string& token) const { +StatusOr Namespace::GetByToken(const std::string& token) { + std::shared_lock lock(tokens_mu_); auto iter = tokens_.find(token); if (iter == tokens_.end()) { return {Status::NotFound}; @@ -121,6 +135,7 @@ Status Namespace::Set(const std::string& ns, const std::string& token) { return {Status::NotOK, kErrInvalidToken}; } + std::unique_lock lock(tokens_mu_); for (const auto& iter : tokens_) { if (iter.second == ns) { // need to delete the old token first tokens_.erase(iter.first); @@ -129,7 +144,7 @@ Status Namespace::Set(const std::string& ns, const std::string& token) { } tokens_[token] = ns; - s = Rewrite(); + s = Rewrite(tokens_); if (!s.IsOK()) { tokens_.erase(token); return s; @@ -138,17 +153,22 @@ Status Namespace::Set(const std::string& ns, const std::string& token) { } Status Namespace::Add(const std::string& ns, const std::string& token) { - // duplicate namespace - for (const auto& iter : tokens_) { - if (iter.second == ns) { - if (iter.first == token) return Status::OK(); - return {Status::NotOK, kErrNamespaceExists}; + { + std::shared_lock lock(tokens_mu_); + // duplicate namespace + for (const auto& iter : tokens_) { + if (iter.second == ns) { + if (iter.first == token) return Status::OK(); + return {Status::NotOK, kErrNamespaceExists}; + } + } + // duplicate token + if (tokens_.find(token) != tokens_.end()) { + return {Status::NotOK, kErrTokenExists}; } } - // duplicate token - if (tokens_.find(token) != tokens_.end()) { - return {Status::NotOK, kErrTokenExists}; - } + + // we don't need to lock the mutex here because the Set method will lock it return Set(ns, token); } @@ -160,10 +180,11 @@ Status Namespace::Del(const std::string& ns) { return {Status::NotOK, kErrCantModifyNamespace}; } + std::unique_lock lock(tokens_mu_); for (const auto& iter : tokens_) { if (iter.second == ns) { tokens_.erase(iter.first); - auto s = Rewrite(); + auto s = Rewrite(tokens_); if (!s.IsOK()) { tokens_[iter.first] = iter.second; return s; @@ -174,11 +195,11 @@ Status Namespace::Del(const std::string& ns) { return {Status::NotOK, kErrNamespaceNotFound}; } -Status Namespace::Rewrite() { +Status Namespace::Rewrite(const std::map& tokens) const { auto config = storage_->GetConfig(); // Rewrite the configuration file only if it's running with the configuration file if (config->HasConfigFile()) { - auto s = config->Rewrite(tokens_); + auto s = config->Rewrite(tokens); if (!s.IsOK()) { return s; } @@ -195,7 +216,7 @@ Status Namespace::Rewrite() { return Status::OK(); } jsoncons::json json; - for (const auto& iter : tokens_) { + for (const auto& iter : tokens) { json[iter.first] = iter.second; } return storage_->WriteToPropagateCF(kNamespaceDBKey, json.to_string()); diff --git a/src/server/namespace.h b/src/server/namespace.h index c0556eeb161..1eadd656857 100644 --- a/src/server/namespace.h +++ b/src/server/namespace.h @@ -34,17 +34,21 @@ class Namespace { Namespace &operator=(const Namespace &) = delete; Status LoadAndRewrite(); - StatusOr Get(const std::string &ns) const; - StatusOr GetByToken(const std::string &token) const; + StatusOr Get(const std::string &ns); + StatusOr GetByToken(const std::string &token); Status Set(const std::string &ns, const std::string &token); Status Add(const std::string &ns, const std::string &token); Status Del(const std::string &ns); const std::map &List() const { return tokens_; } - Status Rewrite(); + Status Rewrite(const std::map &tokens) const; bool IsAllowModify() const; private: engine::Storage *storage_; rocksdb::ColumnFamilyHandle *cf_ = nullptr; + + std::shared_mutex tokens_mu_; std::map tokens_; + + Status loadFromDB(std::map *db_tokens) const; }; diff --git a/tests/gocase/unit/namespace/namespace_test.go b/tests/gocase/unit/namespace/namespace_test.go index 1b80e5b94b0..755766bbcf2 100644 --- a/tests/gocase/unit/namespace/namespace_test.go +++ b/tests/gocase/unit/namespace/namespace_test.go @@ -238,7 +238,16 @@ func TestNamespaceReplicate(t *testing.T) { }) t.Run("Turn off namespace replication is not allowed", func(t *testing.T) { + r := masterRdb.Do(ctx, "NAMESPACE", "ADD", "test-ns", "ns-token") + require.NoError(t, r.Err()) + require.Equal(t, "OK", r.Val()) util.ErrorRegexp(t, masterRdb.ConfigSet(ctx, "repl-namespace-enabled", "no").Err(), ".*cannot switch off repl_namespace_enabled when namespaces exist in db.*") + + // it should be allowed after deleting all namespaces + r = masterRdb.Do(ctx, "NAMESPACE", "DEL", "test-ns") + require.NoError(t, r.Err()) + require.Equal(t, "OK", r.Val()) + require.NoError(t, masterRdb.ConfigSet(ctx, "repl-namespace-enabled", "no").Err()) }) }