From 37e94592742010476fb0d1ede0f478df26ccdf1f Mon Sep 17 00:00:00 2001 From: git-hulk Date: Thu, 4 Apr 2024 18:59:59 +0800 Subject: [PATCH 1/2] Add the reproduce test codes for #2214 --- .../replication/replication_test.go | 58 +++++++++++++++++++ tests/gocase/util/server.go | 3 + 2 files changed, 61 insertions(+) diff --git a/tests/gocase/integration/replication/replication_test.go b/tests/gocase/integration/replication/replication_test.go index 20de35819b0..71e08c77f61 100644 --- a/tests/gocase/integration/replication/replication_test.go +++ b/tests/gocase/integration/replication/replication_test.go @@ -32,6 +32,64 @@ import ( "github.com/stretchr/testify/require" ) +func TestClusterReplication(t *testing.T) { + ctx := context.Background() + + masterSrv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { masterSrv.Close() }() + masterClient := masterSrv.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterNodeID).Err()) + + replicaSrv := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + // enabled the replication namespace to reproduce the issue #2214 + "repl-namespace-enabled": "yes", + }) + defer func() { replicaSrv.Close() }() + replicaClient := replicaSrv.NewClient() + // allow to run the read-only command in the replica + require.NoError(t, replicaClient.ReadOnly(ctx).Err()) + defer func() { require.NoError(t, replicaClient.Close()) }() + replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID", replicaNodeID).Err()) + + clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", masterNodeID, masterSrv.Port()) + clusterNodes = fmt.Sprintf("%s\n%s 127.0.0.1 %d slave %s", clusterNodes, replicaNodeID, replicaSrv.Port(), masterNodeID) + + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + t.Run("Cluster replication should work", func(t *testing.T) { + util.WaitForSync(t, replicaClient) + require.Equal(t, "slave", util.FindInfoEntry(replicaClient, "role")) + masterClient.Set(ctx, "k0", "v0", 0) + masterClient.LPush(ctx, "k1", "e0", "e1", "e2") + util.WaitForOffsetSync(t, masterClient, replicaClient) + + require.Equal(t, "v0", replicaClient.Get(ctx, "k0").Val()) + require.Equal(t, []string{"e2", "e1", "e0"}, replicaClient.LRange(ctx, "k1", 0, -1).Val()) + }) + + t.Run("Cluster replication should work normally after restart(issue #2214)", func(t *testing.T) { + replicaSrv.Close() + masterClient.Set(ctx, "k0", "v1", 0) + masterClient.HSet(ctx, "k2", "f0", "v0", "f1", "v1") + + // start the replica server again + replicaSrv.Start() + _ = replicaClient.Close() + replicaClient = replicaSrv.NewClient() + // allow to run the read-only command in the replica + require.NoError(t, replicaClient.ReadOnly(ctx).Err()) + + util.WaitForOffsetSync(t, masterClient, replicaClient) + require.Equal(t, "v1", replicaClient.Get(ctx, "k0").Val()) + require.Equal(t, map[string]string{"f0": "v0", "f1": "v1"}, replicaClient.HGetAll(ctx, "k2").Val()) + }) +} + func TestReplicationWithHostname(t *testing.T) { srvA := util.StartServer(t, map[string]string{}) defer srvA.Close() diff --git a/tests/gocase/util/server.go b/tests/gocase/util/server.go index a3f4314e342..849b47ad102 100644 --- a/tests/gocase/util/server.go +++ b/tests/gocase/util/server.go @@ -136,7 +136,10 @@ func (s *KvrocksServer) close(keepDir bool) { func (s *KvrocksServer) Restart() { s.close(true) + s.Start() +} +func (s *KvrocksServer) Start() { b := *binPath require.NotEmpty(s.t, b, "please set the binary path by `-binPath`") cmd := exec.Command(b) From 7efcf6d3332dd4abb197f5fe3a6cab87e5eb3bd7 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Thu, 4 Apr 2024 13:58:40 +0800 Subject: [PATCH 2/2] Don't try to rewrite the namespace in the cluster mode The namespaace mechanism is NOT allowed in cluster mode, so it's unnecessary to rewrite while the cluster mode was enabled. By the way, Kvrocks depends on the rocksdb's sequence while replicating, any writes to rocksdb should be forbidden includes the internal behaviors. --- src/server/namespace.cc | 3 +++ src/storage/redis_pubsub.cc | 3 +++ src/storage/storage.cc | 3 +++ 3 files changed, 9 insertions(+) diff --git a/src/server/namespace.cc b/src/server/namespace.cc index 01a44dcfe40..504ad523681 100644 --- a/src/server/namespace.cc +++ b/src/server/namespace.cc @@ -53,6 +53,9 @@ bool Namespace::IsAllowModify() const { Status Namespace::LoadAndRewrite() { auto config = storage_->GetConfig(); + // Namespace is NOT allowed in the cluster mode, so we don't need to rewrite here. + if (config->cluster_enabled) return Status::OK(); + // Load from the configuration file first tokens_ = config->load_tokens; diff --git a/src/storage/redis_pubsub.cc b/src/storage/redis_pubsub.cc index 52264ff9203..6ca153b70be 100644 --- a/src/storage/redis_pubsub.cc +++ b/src/storage/redis_pubsub.cc @@ -23,6 +23,9 @@ namespace redis { rocksdb::Status PubSub::Publish(const Slice &channel, const Slice &value) { + if (storage_->GetConfig()->IsSlave()) { + return rocksdb::Status::NotSupported("can't publish to db in slave mode"); + } auto batch = storage_->GetWriteBatchBase(); batch->Put(pubsub_cf_handle_, channel, value); return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); diff --git a/src/storage/storage.cc b/src/storage/storage.cc index 3074664e7ac..2133645c40c 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -863,6 +863,9 @@ ObserverOrUniquePtr Storage::GetWriteBatchBase() { } Status Storage::WriteToPropagateCF(const std::string &key, const std::string &value) { + if (config_->IsSlave()) { + return {Status::NotOK, "cannot write to propagate column family in slave mode"}; + } auto batch = GetWriteBatchBase(); auto cf = GetCFHandle(kPropagateColumnFamilyName); batch->Put(cf, key, value);