Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ci replication go test failed problem #2435

Merged
merged 1 commit into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/integration/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Cache test", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/csanning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var _ = Describe("Csanning Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/geo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Geo Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var _ = Describe("Hash Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hyperloglog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Hyperloglog Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ = Describe("List Commands", func() {
var blockedLock sync.Mutex

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down Expand Up @@ -916,8 +916,8 @@ var _ = Describe("List Commands", func() {
Expect(lRange.Err()).NotTo(HaveOccurred())
Expect(lRange.Val()).To(Equal([]string{"two", "three"}))

err := client.Do(ctx, "LPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command")))
err := client.Do(ctx, "LPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command")))
})

It("should LPopCount", func() {
Expand Down Expand Up @@ -1162,7 +1162,7 @@ var _ = Describe("List Commands", func() {
Expect(lRange.Val()).To(Equal([]string{"one", "two"}))

err := client.Do(ctx, "RPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command")))
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command")))
})

It("should RPopCount", func() {
Expand Down
26 changes: 11 additions & 15 deletions tests/integration/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ import (
"github.com/redis/go-redis/v9"
)

const (
LOCALHOST = "127.0.0.1"
SLAVEPORT = "9231"
MASTERPORT = "9241"
SINGLEADDR = "127.0.0.1:9221"
SLAVEADDR = "127.0.0.1:9231"
MASTERADDR = "127.0.0.1:9241"
)

type TimeValue struct {
time.Time
}
Expand All @@ -15,22 +24,9 @@ func (t *TimeValue) ScanRedis(s string) (err error) {
return
}

func pikaOptions1() *redis.Options {
return &redis.Options{
Addr: "127.0.0.1:9221",
DB: 0,
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
MaxRetries: -1,
PoolSize: 30,
PoolTimeout: 60 * time.Second,
}
}

func pikaOptions2() *redis.Options {
func PikaOption(addr string) *redis.Options {
return &redis.Options{
Addr: "127.0.0.1:9231",
Addr: addr,
DB: 0,
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ var _ = Describe("PubSub", func() {
ctx := context.TODO()

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client2 = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
client2 = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expect(client2.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(2 * time.Second)
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ var _ = Describe("should replication ", func() {
var clientMaster *redis.Client

BeforeEach(func() {
clientMaster = redis.NewClient(pikaOptions1())
clientSlave = redis.NewClient(pikaOptions2())
clientMaster = redis.NewClient(PikaOption(MASTERADDR))
clientSlave = redis.NewClient(PikaOption(SLAVEADDR))
cleanEnv(ctx, clientMaster, clientSlave)
Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expand All @@ -395,11 +395,11 @@ var _ = Describe("should replication ", func() {
infoRes = clientMaster.Info(ctx, "replication")
Expect(infoRes.Err()).NotTo(HaveOccurred())
Expect(infoRes.Val()).To(ContainSubstring("role:master"))
Expect(clientSlave.Do(ctx, "slaveof", "127.0.0.1", "9231").Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same"))
Expect(clientSlave.Do(ctx, "slaveof", LOCALHOST, SLAVEPORT).Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same"))

var count = 0
for {
res := trySlave(ctx, clientSlave, "127.0.0.1", "9221")
res := trySlave(ctx, clientSlave, LOCALHOST, MASTERPORT)
if res {
break
} else if count > 4 {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Server", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Set Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down Expand Up @@ -276,8 +276,8 @@ var _ = Describe("Set Commands", func() {
Expect(sMembers.Err()).NotTo(HaveOccurred())
Expect(sMembers.Val()).To(HaveLen(3))

err := client.Do(ctx, "SPOP", "set", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command")))
err := client.Do(ctx, "SPOP", "set", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command")))
})

It("should SPopN", func() {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/slowlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var _ = Describe("Slowlog Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
9 changes: 8 additions & 1 deletion tests/integration/start_master_and_slave.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@
# This script is used by .github/workflows/pika.yml, Do not modify this file unless you know what you are doing.
# it's used to start pika master and slave, running path: build
cp ../../output/pika ./pika
cp ../conf/pika.conf ./pika_single.conf
cp ../conf/pika.conf ./pika_master.conf
cp ../conf/pika.conf ./pika_slave.conf
# Create folders for storing data on the primary and secondary nodes
mkdir master_data
mkdir slave_data
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_master.conf
# Example Change the location for storing data on primary and secondary nodes in the configuration file
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_single.conf
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|port : 9221|port : 9241|' -e 's|log-path : ./log/|log-path : ./master_data/log/|' -e 's|db-path : ./db/|db-path : ./master_data/db/|' -e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' -e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_master.conf
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|port : 9221|port : 9231|' -e 's|log-path : ./log/|log-path : ./slave_data/log/|' -e 's|db-path : ./db/|db-path : ./slave_data/db/|' -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_slave.conf
# Start three nodes
./pika -c ./pika_single.conf
./pika -c ./pika_master.conf
./pika -c ./pika_slave.conf
#ensure both master and slave are ready
Expand Down
97 changes: 48 additions & 49 deletions tests/integration/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
package pika_integration

import (
"sync"
"context"
"sync/atomic"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"

. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
Expand Down Expand Up @@ -120,7 +120,7 @@ func parseStreamEntryID(id string) (ts int64, seqNum int64) {
var _ = Describe("Stream Commands", func() {
ctx := context.TODO()
var client *redis.Client
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
client.FlushDB(ctx)

BeforeEach(func() {
Expand All @@ -140,20 +140,20 @@ var _ = Describe("Stream Commands", func() {
const numWriters = 10
const numReaders = 10
const messagesPerWriter = 20

createClient := func() *redis.Client {
return redis.NewClient(pikaOptions1())
return redis.NewClient(PikaOption(SINGLEADDR))
}

var messageCount int32

// Start writer goroutines
for i := 0; i < numWriters; i++ {
go func(writerIndex int) {
defer GinkgoRecover()
writerClient := createClient()
defer writerClient.Close()

for j := 0; j < messagesPerWriter; j++ {
_, err := writerClient.XAdd(ctx, &redis.XAddArgs{
Stream: streamKey,
Expand All @@ -164,41 +164,42 @@ var _ = Describe("Stream Commands", func() {
}
}(i)
}

// Start reader goroutines
var wg sync.WaitGroup
for i := 0; i < numReaders; i++ {
wg.Add(1)
go func() {
defer GinkgoRecover()
defer wg.Done()
readerClient := createClient()
defer readerClient.Close()

lastID := "0"
readMessages := 0
for readMessages < totalMessages {
items, err := readerClient.XRead(ctx, &redis.XReadArgs{
Streams: []string{streamKey, lastID},
Block: 0,
}).Result()
if (err != nil) {
continue
}

// Check if items slice is not empty
if len(items) > 0 && len(items[0].Messages) > 0 {
lastMessageIndex := len(items[0].Messages) - 1
lastID = items[0].Messages[lastMessageIndex].ID
readMessages += len(items[0].Messages)
}
// Optionally add a short delay here if needed
}
Expect(readMessages).To(BeNumerically(">=", totalMessages))
wg.Add(1)
go func() {
readerClient := createClient()
defer func() {
GinkgoRecover()
wg.Done()
readerClient.Close()
}()

lastID := "0"
readMessages := 0
for readMessages < totalMessages {
items, err := readerClient.XRead(ctx, &redis.XReadArgs{
Streams: []string{streamKey, lastID},
Block: 0,
}).Result()
if err != nil {
continue
}

// Check if items slice is not empty
if len(items) > 0 && len(items[0].Messages) > 0 {
lastMessageIndex := len(items[0].Messages) - 1
lastID = items[0].Messages[lastMessageIndex].ID
readMessages += len(items[0].Messages)
}
// Optionally add a short delay here if needed
}
Expect(readMessages).To(BeNumerically(">=", totalMessages))
}()
}


wg.Wait()
Eventually(func() int32 {
return atomic.LoadInt32(&messageCount)
Expand All @@ -209,29 +210,27 @@ var _ = Describe("Stream Commands", func() {
Expect(client.Del(ctx, "mystream").Err()).NotTo(HaveOccurred())
// Creating a stream and adding entries
_, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream",
ID: "*",
Values: map[string]interface{}{"key1": "value1", "key2": "value2"},
Stream: "mystream",
ID: "*",
Values: map[string]interface{}{"key1": "value1", "key2": "value2"},
}).Result()
Expect(err).NotTo(HaveOccurred())

// Using keys * to find all keys including the stream
keys, err := client.Keys(ctx, "*").Result()
Expect(err).NotTo(HaveOccurred())

// Checking if the stream 'mystream' exists in the returned keys
found := false
for _, key := range keys {
if key == "mystream" {
found = true
break
}
if key == "mystream" {
found = true
break
}
}
Expect(found).To(BeTrue(), "Stream 'mystream' should exist in keys")
})




It("XADD wrong number of args", func() {
_, err := client.Do(ctx, "XADD", "mystream").Result()
Expect(err).To(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var _ = Describe("String Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ var _ = Describe("Text Txn", func() {
var cmdCost time.Duration

BeforeEach(func() {
txnClient = redis.NewClient(pikaOptions1())
cmdClient = redis.NewClient(pikaOptions1())
txnClient = redis.NewClient(PikaOption(SINGLEADDR))
cmdClient = redis.NewClient(PikaOption(SINGLEADDR))
})
Describe("test watch", func() {
It("basic watch", func() {
Expand Down
Loading
Loading