From 41828fbc8272331935468f016dd305cbb1af1faf Mon Sep 17 00:00:00 2001 From: wuxianrong Date: Fri, 1 Mar 2024 13:19:40 +0800 Subject: [PATCH] fix ci replication go test failed problem --- tests/integration/cache_test.go | 2 +- tests/integration/csanning_test.go | 2 +- tests/integration/geo_test.go | 2 +- tests/integration/hash_test.go | 2 +- tests/integration/hyperloglog_test.go | 2 +- tests/integration/list_test.go | 8 +- tests/integration/options.go | 26 +++--- tests/integration/pubsub_test.go | 4 +- tests/integration/replication_test.go | 8 +- tests/integration/server_test.go | 2 +- tests/integration/set_test.go | 6 +- tests/integration/slowlog_test.go | 2 +- tests/integration/start_master_and_slave.sh | 9 +- tests/integration/stream_test.go | 97 ++++++++++----------- tests/integration/string_test.go | 2 +- tests/integration/txn_test.go | 4 +- tests/integration/zset_test.go | 4 +- 17 files changed, 92 insertions(+), 90 deletions(-) diff --git a/tests/integration/cache_test.go b/tests/integration/cache_test.go index c074760665..89a2aa2633 100644 --- a/tests/integration/cache_test.go +++ b/tests/integration/cache_test.go @@ -14,7 +14,7 @@ var _ = Describe("Cache test", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/csanning_test.go b/tests/integration/csanning_test.go index 43c5e59b29..67bc89f739 100644 --- a/tests/integration/csanning_test.go +++ b/tests/integration/csanning_test.go @@ -15,7 +15,7 @@ var _ = Describe("Csanning Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/geo_test.go b/tests/integration/geo_test.go index 199f52113e..7b5a4e62b3 100644 --- a/tests/integration/geo_test.go +++ b/tests/integration/geo_test.go @@ -14,7 +14,7 @@ var _ = Describe("Geo Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/hash_test.go b/tests/integration/hash_test.go index aab30f3e3d..b7938d733b 100644 --- a/tests/integration/hash_test.go +++ b/tests/integration/hash_test.go @@ -16,7 +16,7 @@ var _ = Describe("Hash Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/hyperloglog_test.go b/tests/integration/hyperloglog_test.go index 3b9217c0cb..59d86e69d7 100644 --- a/tests/integration/hyperloglog_test.go +++ b/tests/integration/hyperloglog_test.go @@ -14,7 +14,7 @@ var _ = Describe("Hyperloglog Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/list_test.go b/tests/integration/list_test.go index 043d52cfad..c62d00111c 100644 --- a/tests/integration/list_test.go +++ b/tests/integration/list_test.go @@ -40,7 +40,7 @@ var _ = Describe("List Commands", func() { var blockedLock sync.Mutex BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) @@ -916,8 +916,8 @@ var _ = Describe("List Commands", func() { Expect(lRange.Err()).NotTo(HaveOccurred()) Expect(lRange.Val()).To(Equal([]string{"two", "three"})) - err := client.Do(ctx, "LPOP", "list", 1, 2).Err() - Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command"))) + err := client.Do(ctx, "LPOP", "list", 1, 2).Err() + Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command"))) }) It("should LPopCount", func() { @@ -1162,7 +1162,7 @@ var _ = Describe("List Commands", func() { Expect(lRange.Val()).To(Equal([]string{"one", "two"})) err := client.Do(ctx, "RPOP", "list", 1, 2).Err() - Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command"))) + Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command"))) }) It("should RPopCount", func() { diff --git a/tests/integration/options.go b/tests/integration/options.go index dc7a0bb8ed..bf11eadf48 100644 --- a/tests/integration/options.go +++ b/tests/integration/options.go @@ -6,6 +6,15 @@ import ( "github.com/redis/go-redis/v9" ) +const ( + LOCALHOST = "127.0.0.1" + SLAVEPORT = "9231" + MASTERPORT = "9241" + SINGLEADDR = "127.0.0.1:9221" + SLAVEADDR = "127.0.0.1:9231" + MASTERADDR = "127.0.0.1:9241" +) + type TimeValue struct { time.Time } @@ -15,22 +24,9 @@ func (t *TimeValue) ScanRedis(s string) (err error) { return } -func pikaOptions1() *redis.Options { - return &redis.Options{ - Addr: "127.0.0.1:9221", - DB: 0, - DialTimeout: 10 * time.Second, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - MaxRetries: -1, - PoolSize: 30, - PoolTimeout: 60 * time.Second, - } -} - -func pikaOptions2() *redis.Options { +func PikaOption(addr string) *redis.Options { return &redis.Options{ - Addr: "127.0.0.1:9231", + Addr: addr, DB: 0, DialTimeout: 10 * time.Second, ReadTimeout: 30 * time.Second, diff --git a/tests/integration/pubsub_test.go b/tests/integration/pubsub_test.go index 198c77f035..560188ea77 100644 --- a/tests/integration/pubsub_test.go +++ b/tests/integration/pubsub_test.go @@ -21,8 +21,8 @@ var _ = Describe("PubSub", func() { ctx := context.TODO() BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) - client2 = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) + client2 = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(client2.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(2 * time.Second) diff --git a/tests/integration/replication_test.go b/tests/integration/replication_test.go index 49b23ab7d3..98dfba89b0 100644 --- a/tests/integration/replication_test.go +++ b/tests/integration/replication_test.go @@ -371,8 +371,8 @@ var _ = Describe("should replication ", func() { var clientMaster *redis.Client BeforeEach(func() { - clientMaster = redis.NewClient(pikaOptions1()) - clientSlave = redis.NewClient(pikaOptions2()) + clientMaster = redis.NewClient(PikaOption(MASTERADDR)) + clientSlave = redis.NewClient(PikaOption(SLAVEADDR)) cleanEnv(ctx, clientMaster, clientSlave) Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred()) @@ -395,11 +395,11 @@ var _ = Describe("should replication ", func() { infoRes = clientMaster.Info(ctx, "replication") Expect(infoRes.Err()).NotTo(HaveOccurred()) Expect(infoRes.Val()).To(ContainSubstring("role:master")) - Expect(clientSlave.Do(ctx, "slaveof", "127.0.0.1", "9231").Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same")) + Expect(clientSlave.Do(ctx, "slaveof", LOCALHOST, SLAVEPORT).Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same")) var count = 0 for { - res := trySlave(ctx, clientSlave, "127.0.0.1", "9221") + res := trySlave(ctx, clientSlave, LOCALHOST, MASTERPORT) if res { break } else if count > 4 { diff --git a/tests/integration/server_test.go b/tests/integration/server_test.go index 9c21767928..95685d2fdf 100644 --- a/tests/integration/server_test.go +++ b/tests/integration/server_test.go @@ -14,7 +14,7 @@ var _ = Describe("Server", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/set_test.go b/tests/integration/set_test.go index 07a568b0c7..dfe39408fb 100644 --- a/tests/integration/set_test.go +++ b/tests/integration/set_test.go @@ -14,7 +14,7 @@ var _ = Describe("Set Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) @@ -276,8 +276,8 @@ var _ = Describe("Set Commands", func() { Expect(sMembers.Err()).NotTo(HaveOccurred()) Expect(sMembers.Val()).To(HaveLen(3)) - err := client.Do(ctx, "SPOP", "set", 1, 2).Err() - Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command"))) + err := client.Do(ctx, "SPOP", "set", 1, 2).Err() + Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command"))) }) It("should SPopN", func() { diff --git a/tests/integration/slowlog_test.go b/tests/integration/slowlog_test.go index fa6f96a7c9..cdb00cd2b4 100644 --- a/tests/integration/slowlog_test.go +++ b/tests/integration/slowlog_test.go @@ -18,7 +18,7 @@ var _ = Describe("Slowlog Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/start_master_and_slave.sh b/tests/integration/start_master_and_slave.sh index 6c6e79107e..05863dff08 100755 --- a/tests/integration/start_master_and_slave.sh +++ b/tests/integration/start_master_and_slave.sh @@ -2,11 +2,18 @@ # This script is used by .github/workflows/pika.yml, Do not modify this file unless you know what you are doing. # it's used to start pika master and slave, running path: build cp ../../output/pika ./pika +cp ../conf/pika.conf ./pika_single.conf cp ../conf/pika.conf ./pika_master.conf cp ../conf/pika.conf ./pika_slave.conf +# Create folders for storing data on the primary and secondary nodes +mkdir master_data mkdir slave_data -sed -i '' -e 's|databases : 1|databases : 2|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_master.conf +# Example Change the location for storing data on primary and secondary nodes in the configuration file +sed -i '' -e 's|databases : 1|databases : 2|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_single.conf +sed -i '' -e 's|databases : 1|databases : 2|' -e 's|port : 9221|port : 9241|' -e 's|log-path : ./log/|log-path : ./master_data/log/|' -e 's|db-path : ./db/|db-path : ./master_data/db/|' -e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' -e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_master.conf sed -i '' -e 's|databases : 1|databases : 2|' -e 's|port : 9221|port : 9231|' -e 's|log-path : ./log/|log-path : ./slave_data/log/|' -e 's|db-path : ./db/|db-path : ./slave_data/db/|' -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_slave.conf +# Start three nodes +./pika -c ./pika_single.conf ./pika -c ./pika_master.conf ./pika -c ./pika_slave.conf #ensure both master and slave are ready diff --git a/tests/integration/stream_test.go b/tests/integration/stream_test.go index 3c49ad9973..00016e14aa 100644 --- a/tests/integration/stream_test.go +++ b/tests/integration/stream_test.go @@ -6,13 +6,13 @@ package pika_integration import ( - "sync" "context" - "sync/atomic" "fmt" "math/rand" "strconv" "strings" + "sync" + "sync/atomic" . "github.com/bsm/ginkgo/v2" . "github.com/bsm/gomega" @@ -120,7 +120,7 @@ func parseStreamEntryID(id string) (ts int64, seqNum int64) { var _ = Describe("Stream Commands", func() { ctx := context.TODO() var client *redis.Client - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) client.FlushDB(ctx) BeforeEach(func() { @@ -140,20 +140,20 @@ var _ = Describe("Stream Commands", func() { const numWriters = 10 const numReaders = 10 const messagesPerWriter = 20 - + createClient := func() *redis.Client { - return redis.NewClient(pikaOptions1()) + return redis.NewClient(PikaOption(SINGLEADDR)) } - + var messageCount int32 - + // Start writer goroutines for i := 0; i < numWriters; i++ { go func(writerIndex int) { defer GinkgoRecover() writerClient := createClient() defer writerClient.Close() - + for j := 0; j < messagesPerWriter; j++ { _, err := writerClient.XAdd(ctx, &redis.XAddArgs{ Stream: streamKey, @@ -164,41 +164,42 @@ var _ = Describe("Stream Commands", func() { } }(i) } - + // Start reader goroutines var wg sync.WaitGroup for i := 0; i < numReaders; i++ { - wg.Add(1) - go func() { - defer GinkgoRecover() - defer wg.Done() - readerClient := createClient() - defer readerClient.Close() - - lastID := "0" - readMessages := 0 - for readMessages < totalMessages { - items, err := readerClient.XRead(ctx, &redis.XReadArgs{ - Streams: []string{streamKey, lastID}, - Block: 0, - }).Result() - if (err != nil) { - continue - } - - // Check if items slice is not empty - if len(items) > 0 && len(items[0].Messages) > 0 { - lastMessageIndex := len(items[0].Messages) - 1 - lastID = items[0].Messages[lastMessageIndex].ID - readMessages += len(items[0].Messages) - } - // Optionally add a short delay here if needed - } - Expect(readMessages).To(BeNumerically(">=", totalMessages)) + wg.Add(1) + go func() { + readerClient := createClient() + defer func() { + GinkgoRecover() + wg.Done() + readerClient.Close() }() + + lastID := "0" + readMessages := 0 + for readMessages < totalMessages { + items, err := readerClient.XRead(ctx, &redis.XReadArgs{ + Streams: []string{streamKey, lastID}, + Block: 0, + }).Result() + if err != nil { + continue + } + + // Check if items slice is not empty + if len(items) > 0 && len(items[0].Messages) > 0 { + lastMessageIndex := len(items[0].Messages) - 1 + lastID = items[0].Messages[lastMessageIndex].ID + readMessages += len(items[0].Messages) + } + // Optionally add a short delay here if needed + } + Expect(readMessages).To(BeNumerically(">=", totalMessages)) + }() } - wg.Wait() Eventually(func() int32 { return atomic.LoadInt32(&messageCount) @@ -209,29 +210,27 @@ var _ = Describe("Stream Commands", func() { Expect(client.Del(ctx, "mystream").Err()).NotTo(HaveOccurred()) // Creating a stream and adding entries _, err := client.XAdd(ctx, &redis.XAddArgs{ - Stream: "mystream", - ID: "*", - Values: map[string]interface{}{"key1": "value1", "key2": "value2"}, + Stream: "mystream", + ID: "*", + Values: map[string]interface{}{"key1": "value1", "key2": "value2"}, }).Result() Expect(err).NotTo(HaveOccurred()) - + // Using keys * to find all keys including the stream keys, err := client.Keys(ctx, "*").Result() Expect(err).NotTo(HaveOccurred()) - + // Checking if the stream 'mystream' exists in the returned keys found := false for _, key := range keys { - if key == "mystream" { - found = true - break - } + if key == "mystream" { + found = true + break + } } Expect(found).To(BeTrue(), "Stream 'mystream' should exist in keys") }) - - - + It("XADD wrong number of args", func() { _, err := client.Do(ctx, "XADD", "mystream").Result() Expect(err).To(HaveOccurred()) diff --git a/tests/integration/string_test.go b/tests/integration/string_test.go index b2f357af41..fed216d5a2 100644 --- a/tests/integration/string_test.go +++ b/tests/integration/string_test.go @@ -16,7 +16,7 @@ var _ = Describe("String Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) diff --git a/tests/integration/txn_test.go b/tests/integration/txn_test.go index 0d3e219cdc..450c235a74 100644 --- a/tests/integration/txn_test.go +++ b/tests/integration/txn_test.go @@ -28,8 +28,8 @@ var _ = Describe("Text Txn", func() { var cmdCost time.Duration BeforeEach(func() { - txnClient = redis.NewClient(pikaOptions1()) - cmdClient = redis.NewClient(pikaOptions1()) + txnClient = redis.NewClient(PikaOption(SINGLEADDR)) + cmdClient = redis.NewClient(PikaOption(SINGLEADDR)) }) Describe("test watch", func() { It("basic watch", func() { diff --git a/tests/integration/zset_test.go b/tests/integration/zset_test.go index eb6817ba0f..abd9fb35b9 100644 --- a/tests/integration/zset_test.go +++ b/tests/integration/zset_test.go @@ -15,7 +15,7 @@ var _ = Describe("Zset Commands", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(pikaOptions1()) + client = redis.NewClient(PikaOption(SINGLEADDR)) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) }) @@ -1086,7 +1086,7 @@ var _ = Describe("Zset Commands", func() { Member: "three", }})) err = client.Do(ctx, "ZPOPMIN", "zset", 1, 2).Err() - Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'zpopmin' command"))) + Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'zpopmin' command"))) }) It("should ZRange", func() {