From 2a3de7e1fcc94fe6681dd087ecf3063e156adf7d Mon Sep 17 00:00:00 2001 From: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com> Date: Thu, 20 Jun 2024 00:08:35 +0300 Subject: [PATCH 1/6] Change redis version from 7.2 to 7.4 in makefile (#3034) * Change redis version from 7.2 to 7.4 * fix jsonGet test * Add 'watch' to client info * Remove jsonGet from Enterprise tests --- Makefile | 2 +- command.go | 3 +++ json_test.go | 6 +++--- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index ea5321f29..00cf1de58 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ build: testdata/redis: mkdir -p $@ - wget -qO- https://download.redis.io/releases/redis-7.2.1.tar.gz | tar xvz --strip-components=1 -C $@ + wget -qO- https://download.redis.io/releases/redis-7.4-rc1.tar.gz | tar xvz --strip-components=1 -C $@ testdata/redis/src/redis-server: testdata/redis cd $< && make all diff --git a/command.go b/command.go index c6cd9db6d..6bce5a344 100644 --- a/command.go +++ b/command.go @@ -4997,6 +4997,7 @@ type ClientInfo struct { PSub int // number of pattern matching subscriptions SSub int // redis version 7.0.3, number of shard channel subscriptions Multi int // number of commands in a MULTI/EXEC context + Watch int // redis version 7.4 RC1, number of keys this client is currently watching. QueryBuf int // qbuf, query buffer length (0 means no query pending) QueryBufFree int // qbuf-free, free space of the query buffer (0 means the buffer is full) ArgvMem int // incomplete arguments for the next command (already extracted from query buffer) @@ -5149,6 +5150,8 @@ func parseClientInfo(txt string) (info *ClientInfo, err error) { info.SSub, err = strconv.Atoi(val) case "multi": info.Multi, err = strconv.Atoi(val) + case "watch": + info.Watch, err = strconv.Atoi(val) case "qbuf": info.QueryBuf, err = strconv.Atoi(val) case "qbuf-free": diff --git a/json_test.go b/json_test.go index 4e9718a4e..d1ea24290 100644 --- a/json_test.go +++ b/json_test.go @@ -242,18 +242,18 @@ var _ = Describe("JSON Commands", Label("json"), func() { Expect(cmd.Val()).To(Equal("OK")) }) - It("should JSONGet", Label("json.get", "json"), func() { + It("should JSONGet", Label("json.get", "json", "NonRedisEnterprise"), func() { res, err := client.JSONSet(ctx, "get3", "$", `{"a": 1, "b": 2}`).Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal("OK")) res, err = client.JSONGetWithArgs(ctx, "get3", &redis.JSONGetArgs{Indent: "-"}).Result() Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(`[-{--"a":1,--"b":2-}]`)) + Expect(res).To(Equal(`{-"a":1,-"b":2}`)) res, err = client.JSONGetWithArgs(ctx, "get3", &redis.JSONGetArgs{Indent: "-", Newline: `~`, Space: `!`}).Result() Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(`[~-{~--"a":!1,~--"b":!2~-}~]`)) + Expect(res).To(Equal(`{~-"a":!1,~-"b":!2~}`)) }) It("should JSONMerge", Label("json.merge", "json"), func() { From a9b4c5cfe638df9fba2ca9cf00252f74e0f1f6dd Mon Sep 17 00:00:00 2001 From: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com> Date: Thu, 20 Jun 2024 00:59:27 +0300 Subject: [PATCH 2/6] Support Hash-field expiration commands (#2991) * Add HExpire command * Add HPExpire, HexpireAt, HPExpireAt, HTTL, HPTTL, HPersist,HExpireTime, HPExpireTIme, HGetF, HSetF commands * add docstring * add tests and fix commands * modify commands * api changes * fix tests * remove tests from RE --- commands_test.go | 148 ++++++++++++++++++++++++++++ hash_commands.go | 250 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 397 insertions(+), 1 deletion(-) diff --git a/commands_test.go b/commands_test.go index d30a9d8bb..b0224449b 100644 --- a/commands_test.go +++ b/commands_test.go @@ -2430,6 +2430,154 @@ var _ = Describe("Commands", func() { Equal([]redis.KeyValue{{Key: "key2", Value: "hello2"}}), )) }) + + It("should HExpire", Label("hash-expiration", "NonRedisEnterprise"), func() { + res, err := client.HExpire(ctx, "no_such_key", 10, "field1", "field2", "field3").Result() + Expect(err).To(BeNil()) + for i := 0; i < 100; i++ { + sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") + Expect(sadd.Err()).NotTo(HaveOccurred()) + } + + res, err = client.HExpire(ctx, "myhash", 10, "key1", "key2", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{1, 1, -2})) + }) + + It("should HPExpire", Label("hash-expiration", "NonRedisEnterprise"), func() { + _, err := client.HPExpire(ctx, "no_such_key", 10, "field1", "field2", "field3").Result() + Expect(err).To(BeNil()) + for i := 0; i < 100; i++ { + sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") + Expect(sadd.Err()).NotTo(HaveOccurred()) + } + + res, err := client.HPExpire(ctx, "myhash", 10, "key1", "key2", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{1, 1, -2})) + }) + + It("should HExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() { + + _, err := client.HExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result() + Expect(err).To(BeNil()) + for i := 0; i < 100; i++ { + sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") + Expect(sadd.Err()).NotTo(HaveOccurred()) + } + + res, err := client.HExpireAt(ctx, "myhash", time.Now().Add(10*time.Second), "key1", "key2", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{1, 1, -2})) + }) + + It("should HPExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() { + + _, err := client.HPExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result() + Expect(err).To(BeNil()) + for i := 0; i < 100; i++ { + sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") + Expect(sadd.Err()).NotTo(HaveOccurred()) + } + + res, err := client.HPExpireAt(ctx, "myhash", time.Now().Add(10*time.Second), "key1", "key2", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{1, 1, -2})) + }) + + It("should HPersist", Label("hash-expiration", "NonRedisEnterprise"), func() { + + _, err := client.HPersist(ctx, "no_such_key", "field1", "field2", "field3").Result() + Expect(err).To(BeNil()) + for i := 0; i < 100; i++ { + sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") + Expect(sadd.Err()).NotTo(HaveOccurred()) + } + + res, err := client.HPersist(ctx, "myhash", "key1", "key2", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{-1, -1, -2})) + + res, err = client.HExpire(ctx, "myhash", 10, "key1", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{1, -2})) + + res, err = client.HPersist(ctx, "myhash", "key1", "key2", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{1, -1, -2})) + }) + + It("should HExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() { + + _, err := client.HExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result() + Expect(err).To(BeNil()) + for i := 0; i < 100; i++ { + sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") + Expect(sadd.Err()).NotTo(HaveOccurred()) + } + + res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{1, -2})) + + res, err = client.HExpireTime(ctx, "myhash", "key1", "key2", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res[0]).To(BeNumerically("~", time.Now().Add(10*time.Second).Unix(), 1)) + }) + + It("should HPExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() { + + _, err := client.HPExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result() + Expect(err).To(BeNil()) + for i := 0; i < 100; i++ { + sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") + Expect(sadd.Err()).NotTo(HaveOccurred()) + } + + res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{1, -2})) + + res, err = client.HPExpireTime(ctx, "myhash", "key1", "key2", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo([]int64{time.Now().Add(10 * time.Second).UnixMilli(), -1, -2})) + }) + + It("should HTTL", Label("hash-expiration", "NonRedisEnterprise"), func() { + + _, err := client.HTTL(ctx, "no_such_key", "field1", "field2", "field3").Result() + Expect(err).To(BeNil()) + for i := 0; i < 100; i++ { + sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") + Expect(sadd.Err()).NotTo(HaveOccurred()) + } + + res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{1, -2})) + + res, err = client.HTTL(ctx, "myhash", "key1", "key2", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{10, -1, -2})) + }) + + It("should HPTTL", Label("hash-expiration", "NonRedisEnterprise"), func() { + + _, err := client.HPTTL(ctx, "no_such_key", "field1", "field2", "field3").Result() + Expect(err).To(BeNil()) + for i := 0; i < 100; i++ { + sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") + Expect(sadd.Err()).NotTo(HaveOccurred()) + } + + res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]int64{1, -2})) + + res, err = client.HPTTL(ctx, "myhash", "key1", "key2", "key200").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res[0]).To(BeNumerically("~", 10*time.Second.Milliseconds(), 1)) + }) }) Describe("hyperloglog", func() { diff --git a/hash_commands.go b/hash_commands.go index 2c62a75ad..59fbb8656 100644 --- a/hash_commands.go +++ b/hash_commands.go @@ -1,6 +1,9 @@ package redis -import "context" +import ( + "context" + "time" +) type HashCmdable interface { HDel(ctx context.Context, key string, fields ...string) *IntCmd @@ -172,3 +175,248 @@ func (c cmdable) HScan(ctx context.Context, key string, cursor uint64, match str _ = c(ctx, cmd) return cmd } + +type HExpireArgs struct { + NX bool + XX bool + GT bool + LT bool +} + +// HExpire - Sets the expiration time for specified fields in a hash in seconds. +// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields. +// For more information - https://redis.io/commands/hexpire/ +func (c cmdable) HExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd { + args := []interface{}{"HEXPIRE", key, expiration, "FIELDS", len(fields)} + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// HExpire - Sets the expiration time for specified fields in a hash in seconds. +// It requires a key, an expiration duration, a struct with boolean flags for conditional expiration settings (NX, XX, GT, LT), and a list of fields. +// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields. +// For more information - https://redis.io/commands/hexpire/ +func (c cmdable) HExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd { + args := []interface{}{"HEXPIRE", key, expiration} + + // only if one argument is true, we can add it to the args + // if more than one argument is true, it will cause an error + if expirationArgs.NX { + args = append(args, "NX") + } else if expirationArgs.XX { + args = append(args, "XX") + } else if expirationArgs.GT { + args = append(args, "GT") + } else if expirationArgs.LT { + args = append(args, "LT") + } + + args = append(args, "FIELDS", len(fields)) + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// HPExpire - Sets the expiration time for specified fields in a hash in milliseconds. +// Similar to HExpire, it accepts a key, an expiration duration in milliseconds, a struct with expiration condition flags, and a list of fields. +// The command modifies the standard time.Duration to milliseconds for the Redis command. +// For more information - https://redis.io/commands/hpexpire/ +func (c cmdable) HPExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd { + args := []interface{}{"HPEXPIRE", key, formatMs(ctx, expiration), "FIELDS", len(fields)} + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) HPExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd { + args := []interface{}{"HPEXPIRE", key, formatMs(ctx, expiration)} + + // only if one argument is true, we can add it to the args + // if more than one argument is true, it will cause an error + if expirationArgs.NX { + args = append(args, "NX") + } else if expirationArgs.XX { + args = append(args, "XX") + } else if expirationArgs.GT { + args = append(args, "GT") + } else if expirationArgs.LT { + args = append(args, "LT") + } + + args = append(args, "FIELDS", len(fields)) + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// HExpireAt - Sets the expiration time for specified fields in a hash to a UNIX timestamp in seconds. +// Takes a key, a UNIX timestamp, a struct of conditional flags, and a list of fields. +// The command sets absolute expiration times based on the UNIX timestamp provided. +// For more information - https://redis.io/commands/hexpireat/ +func (c cmdable) HExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd { + + args := []interface{}{"HEXPIREAT", key, tm.Unix(), "FIELDS", len(fields)} + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) HExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd { + args := []interface{}{"HEXPIREAT", key, tm.Unix()} + + // only if one argument is true, we can add it to the args + // if more than one argument is true, it will cause an error + if expirationArgs.NX { + args = append(args, "NX") + } else if expirationArgs.XX { + args = append(args, "XX") + } else if expirationArgs.GT { + args = append(args, "GT") + } else if expirationArgs.LT { + args = append(args, "LT") + } + + args = append(args, "FIELDS", len(fields)) + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// HPExpireAt - Sets the expiration time for specified fields in a hash to a UNIX timestamp in milliseconds. +// Similar to HExpireAt but for timestamps in milliseconds. It accepts the same parameters and adjusts the UNIX time to milliseconds. +// For more information - https://redis.io/commands/hpexpireat/ +func (c cmdable) HPExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd { + args := []interface{}{"HPEXPIREAT", key, tm.UnixNano() / int64(time.Millisecond), "FIELDS", len(fields)} + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) HPExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd { + args := []interface{}{"HPEXPIREAT", key, tm.UnixNano() / int64(time.Millisecond)} + + // only if one argument is true, we can add it to the args + // if more than one argument is true, it will cause an error + if expirationArgs.NX { + args = append(args, "NX") + } else if expirationArgs.XX { + args = append(args, "XX") + } else if expirationArgs.GT { + args = append(args, "GT") + } else if expirationArgs.LT { + args = append(args, "LT") + } + + args = append(args, "FIELDS", len(fields)) + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// HPersist - Removes the expiration time from specified fields in a hash. +// Accepts a key and the fields themselves. +// This command ensures that each field specified will have its expiration removed if present. +// For more information - https://redis.io/commands/hpersist/ +func (c cmdable) HPersist(ctx context.Context, key string, fields ...string) *IntSliceCmd { + args := []interface{}{"HPERSIST", key, "FIELDS", len(fields)} + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// HExpireTime - Retrieves the expiration time for specified fields in a hash as a UNIX timestamp in seconds. +// Requires a key and the fields themselves to fetch their expiration timestamps. +// This command returns the expiration times for each field or error/status codes for each field as specified. +// For more information - https://redis.io/commands/hexpiretime/ +func (c cmdable) HExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd { + args := []interface{}{"HEXPIRETIME", key, "FIELDS", len(fields)} + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// HPExpireTime - Retrieves the expiration time for specified fields in a hash as a UNIX timestamp in milliseconds. +// Similar to HExpireTime, adjusted for timestamps in milliseconds. It requires the same parameters. +// Provides the expiration timestamp for each field in milliseconds. +// For more information - https://redis.io/commands/hexpiretime/ +func (c cmdable) HPExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd { + args := []interface{}{"HPEXPIRETIME", key, "FIELDS", len(fields)} + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// HTTL - Retrieves the remaining time to live for specified fields in a hash in seconds. +// Requires a key and the fields themselves. It returns the TTL for each specified field. +// This command fetches the TTL in seconds for each field or returns error/status codes as appropriate. +// For more information - https://redis.io/commands/httl/ +func (c cmdable) HTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd { + args := []interface{}{"HTTL", key, "FIELDS", len(fields)} + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// HPTTL - Retrieves the remaining time to live for specified fields in a hash in milliseconds. +// Similar to HTTL, but returns the TTL in milliseconds. It requires a key and the specified fields. +// This command provides the TTL in milliseconds for each field or returns error/status codes as needed. +// For more information - https://redis.io/commands/hpttl/ +func (c cmdable) HPTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd { + args := []interface{}{"HPTTL", key, "FIELDS", len(fields)} + + for _, field := range fields { + args = append(args, field) + } + cmd := NewIntSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} From 0777247baa21ae915f02bdcd314d68c93b6b1e88 Mon Sep 17 00:00:00 2001 From: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com> Date: Thu, 20 Jun 2024 01:00:09 +0300 Subject: [PATCH 3/6] Added test case for CLIENT KILL with MAXAGE option (#2971) * Added test case for CLIENT KILL with MAXAGE option * Fixed sleep value * Added additional condition to kill specific connection * Test commit * Test commit * Updated test case to handle timeouts --------- Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com> --- commands_test.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/commands_test.go b/commands_test.go index b0224449b..5d7880434 100644 --- a/commands_test.go +++ b/commands_test.go @@ -193,6 +193,40 @@ var _ = Describe("Commands", func() { Expect(r.Val()).To(Equal(int64(0))) }) + It("should ClientKillByFilter with MAXAGE", func() { + var s []string + started := make(chan bool) + done := make(chan bool) + + go func() { + defer GinkgoRecover() + + started <- true + blpop := client.BLPop(ctx, 0, "list") + Expect(blpop.Val()).To(Equal(s)) + done <- true + }() + <-started + + select { + case <-done: + Fail("BLPOP is not blocked.") + case <-time.After(2 * time.Second): + // ok + } + + killed := client.ClientKillByFilter(ctx, "MAXAGE", "1") + Expect(killed.Err()).NotTo(HaveOccurred()) + Expect(killed.Val()).To(Equal(int64(2))) + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BLPOP is still blocked.") + } + }) + It("should ClientID", func() { err := client.ClientID(ctx).Err() Expect(err).NotTo(HaveOccurred()) From 097cddbeb0cd4436c2fd96ec147b1656bfab89a5 Mon Sep 17 00:00:00 2001 From: Gabriel Erzse Date: Thu, 20 Jun 2024 01:24:45 +0300 Subject: [PATCH 4/6] Support NOVALUES parameter for HSCAN (#2925) * Support NOVALUES parameter for HSCAN Issue #2919 The NOVALUES parameter instructs HSCAN to only return the hash keys, without values. * Update hash_commands.go --------- Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com> --- commands_test.go | 22 ++++++++++++++++++++-- hash_commands.go | 15 +++++++++++++++ iterator_test.go | 16 ++++++++++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/commands_test.go b/commands_test.go index 5d7880434..dea3d5f53 100644 --- a/commands_test.go +++ b/commands_test.go @@ -1134,8 +1134,26 @@ var _ = Describe("Commands", func() { keys, cursor, err := client.HScan(ctx, "myhash", 0, "", 0).Result() Expect(err).NotTo(HaveOccurred()) - Expect(keys).NotTo(BeEmpty()) - Expect(cursor).NotTo(BeZero()) + // If we don't get at least two items back, it's really strange. + Expect(cursor).To(BeNumerically(">=", 2)) + Expect(len(keys)).To(BeNumerically(">=", 2)) + Expect(keys[0]).To(HavePrefix("key")) + Expect(keys[1]).To(Equal("hello")) + }) + + It("should HScan without values", func() { + for i := 0; i < 1000; i++ { + sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") + Expect(sadd.Err()).NotTo(HaveOccurred()) + } + + keys, cursor, err := client.HScanNoValues(ctx, "myhash", 0, "", 0).Result() + Expect(err).NotTo(HaveOccurred()) + // If we don't get at least two items back, it's really strange. + Expect(cursor).To(BeNumerically(">=", 2)) + Expect(len(keys)).To(BeNumerically(">=", 2)) + Expect(keys[0]).To(HavePrefix("key")) + Expect(keys[1]).To(HavePrefix("key")) }) It("should ZScan", func() { diff --git a/hash_commands.go b/hash_commands.go index 59fbb8656..ef69064e0 100644 --- a/hash_commands.go +++ b/hash_commands.go @@ -19,6 +19,7 @@ type HashCmdable interface { HMSet(ctx context.Context, key string, values ...interface{}) *BoolCmd HSetNX(ctx context.Context, key, field string, value interface{}) *BoolCmd HScan(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd + HScanNoValues(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd HVals(ctx context.Context, key string) *StringSliceCmd HRandField(ctx context.Context, key string, count int) *StringSliceCmd HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd @@ -176,6 +177,20 @@ func (c cmdable) HScan(ctx context.Context, key string, cursor uint64, match str return cmd } +func (c cmdable) HScanNoValues(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd { + args := []interface{}{"hscan", key, cursor} + if match != "" { + args = append(args, "match", match) + } + if count > 0 { + args = append(args, "count", count) + } + args = append(args, "novalues") + cmd := NewScanCmd(ctx, c, args...) + _ = c(ctx, cmd) + return cmd +} + type HExpireArgs struct { NX bool XX bool diff --git a/iterator_test.go b/iterator_test.go index ccd941478..dfa8fbbed 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -96,6 +96,22 @@ var _ = Describe("ScanIterator", func() { Expect(vals).To(HaveLen(71 * 2)) Expect(vals).To(ContainElement("K01")) Expect(vals).To(ContainElement("K71")) + Expect(vals).To(ContainElement("x")) + }) + + It("should hscan without values across multiple pages", func() { + Expect(hashSeed(71)).NotTo(HaveOccurred()) + + var vals []string + iter := client.HScanNoValues(ctx, hashKey, 0, "", 10).Iterator() + for iter.Next(ctx) { + vals = append(vals, iter.Val()) + } + Expect(iter.Err()).NotTo(HaveOccurred()) + Expect(vals).To(HaveLen(71)) + Expect(vals).To(ContainElement("K01")) + Expect(vals).To(ContainElement("K71")) + Expect(vals).NotTo(ContainElement("x")) }) It("should scan to page borders", func() { From 445d2667eb23951fe221abe0afe6a4c4683cf01e Mon Sep 17 00:00:00 2001 From: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com> Date: Thu, 20 Jun 2024 02:25:49 +0300 Subject: [PATCH 5/6] remove tests from RE (#3035) --- commands_test.go | 8 +++++--- iterator_test.go | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/commands_test.go b/commands_test.go index dea3d5f53..641757d23 100644 --- a/commands_test.go +++ b/commands_test.go @@ -193,7 +193,7 @@ var _ = Describe("Commands", func() { Expect(r.Val()).To(Equal(int64(0))) }) - It("should ClientKillByFilter with MAXAGE", func() { + It("should ClientKillByFilter with MAXAGE", Label("NonRedisEnterprise"), func() { var s []string started := make(chan bool) done := make(chan bool) @@ -217,7 +217,7 @@ var _ = Describe("Commands", func() { killed := client.ClientKillByFilter(ctx, "MAXAGE", "1") Expect(killed.Err()).NotTo(HaveOccurred()) - Expect(killed.Val()).To(Equal(int64(2))) + Expect(killed.Val()).To(SatisfyAny(Equal(int64(2)), Equal(int64(3)))) select { case <-done: @@ -1141,7 +1141,7 @@ var _ = Describe("Commands", func() { Expect(keys[1]).To(Equal("hello")) }) - It("should HScan without values", func() { + It("should HScan without values", Label("NonRedisEnterprise"), func() { for i := 0; i < 1000; i++ { sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") Expect(sadd.Err()).NotTo(HaveOccurred()) @@ -1154,6 +1154,8 @@ var _ = Describe("Commands", func() { Expect(len(keys)).To(BeNumerically(">=", 2)) Expect(keys[0]).To(HavePrefix("key")) Expect(keys[1]).To(HavePrefix("key")) + Expect(keys).NotTo(BeEmpty()) + Expect(cursor).NotTo(BeZero()) }) It("should ZScan", func() { diff --git a/iterator_test.go b/iterator_test.go index dfa8fbbed..472ce38a7 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -99,7 +99,7 @@ var _ = Describe("ScanIterator", func() { Expect(vals).To(ContainElement("x")) }) - It("should hscan without values across multiple pages", func() { + It("should hscan without values across multiple pages", Label("NonRedisEnterprise"), func() { Expect(hashSeed(71)).NotTo(HaveOccurred()) var vals []string From 3975cd5380e76e5dc83aae7e19bfeee320cb650c Mon Sep 17 00:00:00 2001 From: b1ron <80292536+b1ron@users.noreply.github.com> Date: Thu, 20 Jun 2024 10:25:51 +0200 Subject: [PATCH 6/6] Add support for XREAD last entry (#3005) * add support for XREAD last entry * handle reading from multiple streams * add test to ensure we block for empty stream * small tweak * add an option to XReadArgs instead * modify test comment * small preallocation optimization * Changed argument to generic ID, skip tests on Enterprise * Fix test case * Updated expiration command --------- Co-authored-by: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com> Co-authored-by: vladvildanov --- commands_test.go | 77 ++++++++++++++++++++++++++++++++++++++++++++-- stream_commands.go | 8 ++++- 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/commands_test.go b/commands_test.go index 641757d23..edc956943 100644 --- a/commands_test.go +++ b/commands_test.go @@ -2588,13 +2588,14 @@ var _ = Describe("Commands", func() { Expect(sadd.Err()).NotTo(HaveOccurred()) } - res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result() + expireAt := time.Now().Add(10 * time.Second) + res, err := client.HPExpireAt(ctx, "myhash", expireAt, "key1", "key200").Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal([]int64{1, -2})) res, err = client.HPExpireTime(ctx, "myhash", "key1", "key2", "key200").Result() Expect(err).NotTo(HaveOccurred()) - Expect(res).To(BeEquivalentTo([]int64{time.Now().Add(10 * time.Second).UnixMilli(), -1, -2})) + Expect(res).To(BeEquivalentTo([]int64{expireAt.UnixMilli(), -1, -2})) }) It("should HTTL", Label("hash-expiration", "NonRedisEnterprise"), func() { @@ -5888,6 +5889,78 @@ var _ = Describe("Commands", func() { Expect(err).To(Equal(redis.Nil)) }) + It("should XRead LastEntry", Label("NonRedisEnterprise"), func() { + res, err := client.XRead(ctx, &redis.XReadArgs{ + Streams: []string{"stream"}, + Count: 2, // we expect 1 message + ID: "+", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XStream{ + { + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }, + }, + })) + }) + + It("should XRead LastEntry from two streams", Label("NonRedisEnterprise"), func() { + res, err := client.XRead(ctx, &redis.XReadArgs{ + Streams: []string{"stream", "stream"}, + ID: "+", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XStream{ + { + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }, + }, + { + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }, + }, + })) + }) + + It("should XRead LastEntry blocks", Label("NonRedisEnterprise"), func() { + start := time.Now() + go func() { + defer GinkgoRecover() + + time.Sleep(100 * time.Millisecond) + id, err := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "empty", + ID: "4-0", + Values: map[string]interface{}{"quatro": "quatre"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("4-0")) + }() + + res, err := client.XRead(ctx, &redis.XReadArgs{ + Streams: []string{"empty"}, + Block: 500 * time.Millisecond, + ID: "+", + }).Result() + Expect(err).NotTo(HaveOccurred()) + // Ensure that the XRead call with LastEntry option blocked for at least 100ms. + Expect(time.Since(start)).To(BeNumerically(">=", 100*time.Millisecond)) + Expect(res).To(Equal([]redis.XStream{ + { + Stream: "empty", + Messages: []redis.XMessage{ + {ID: "4-0", Values: map[string]interface{}{"quatro": "quatre"}}, + }, + }, + })) + }) + Describe("group", func() { BeforeEach(func() { err := client.XGroupCreate(ctx, "stream", "group", "0").Err() diff --git a/stream_commands.go b/stream_commands.go index 1ad33740c..6d7b22922 100644 --- a/stream_commands.go +++ b/stream_commands.go @@ -137,10 +137,11 @@ type XReadArgs struct { Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2 Count int64 Block time.Duration + ID string } func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd { - args := make([]interface{}, 0, 6+len(a.Streams)) + args := make([]interface{}, 0, 2*len(a.Streams)+6) args = append(args, "xread") keyPos := int8(1) @@ -159,6 +160,11 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd { for _, s := range a.Streams { args = append(args, s) } + if a.ID != "" { + for range a.Streams { + args = append(args, a.ID) + } + } cmd := NewXStreamSliceCmd(ctx, args...) if a.Block >= 0 {