From 687b6edd6c09cefabb4463a6411eacfdc52ced95 Mon Sep 17 00:00:00 2001 From: parvez Date: Tue, 11 May 2021 14:49:14 +0530 Subject: [PATCH 1/7] Added missing idle args in XPendingExtArgs Made changes to XPendingExt method for allowing the usage of idle argument if defined in XPendingExtArgs --- commands.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/commands.go b/commands.go index 9e5728325..e7e7a415d 100644 --- a/commands.go +++ b/commands.go @@ -1815,6 +1815,7 @@ type XPendingExtArgs struct { End string Count int64 Consumer string + Idle int64 } func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd { @@ -1823,6 +1824,9 @@ func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingE if a.Consumer != "" { args = append(args, a.Consumer) } + if a.Idle != 0 { + args = append(args, "idle", a.Idle) + } cmd := NewXPendingExtCmd(ctx, args...) _ = c(ctx, cmd) return cmd From 81591a29ee0c51eeafe4f228ade05953f33accc4 Mon Sep 17 00:00:00 2001 From: parvez Date: Tue, 11 May 2021 16:29:47 +0530 Subject: [PATCH 2/7] Fixed args order for XPending idle time --- commands.go | 11 ++++++----- commands_test.go | 1 + 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/commands.go b/commands.go index e7e7a415d..7abc68900 100644 --- a/commands.go +++ b/commands.go @@ -1811,22 +1811,23 @@ func (c cmdable) XPending(ctx context.Context, stream, group string) *XPendingCm type XPendingExtArgs struct { Stream string Group string + Idle time.Duration Start string End string Count int64 Consumer string - Idle int64 } func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd { args := make([]interface{}, 0, 7) - args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count) - if a.Consumer != "" { - args = append(args, a.Consumer) - } + args = append(args, "xpending", a.Stream, a.Group) if a.Idle != 0 { args = append(args, "idle", a.Idle) } + args = append(args, a.Start, a.End, a.Count) + if a.Consumer != "" { + args = append(args, a.Consumer) + } cmd := NewXPendingExtCmd(ctx, args...) _ = c(ctx, cmd) return cmd diff --git a/commands_test.go b/commands_test.go index f64dc97b6..74d9b477d 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4229,6 +4229,7 @@ var _ = Describe("Commands", func() { infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ Stream: "stream", Group: "group", + Idle: time.Duration(0), Start: "-", End: "+", Count: 10, From 3574fe6a9780e48e44dd0db3441ab8a5fe50cb10 Mon Sep 17 00:00:00 2001 From: parvez Date: Tue, 11 May 2021 23:20:20 +0530 Subject: [PATCH 3/7] Converting idle duration to milli second before passing as an argument --- commands.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands.go b/commands.go index 7abc68900..6b5cf280e 100644 --- a/commands.go +++ b/commands.go @@ -1822,7 +1822,7 @@ func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingE args := make([]interface{}, 0, 7) args = append(args, "xpending", a.Stream, a.Group) if a.Idle != 0 { - args = append(args, "idle", a.Idle) + args = append(args, "idle", a.Idle * time.Millisecond) } args = append(args, a.Start, a.End, a.Count) if a.Consumer != "" { From 70f407854bdeec884407d71f5d0cf5a188a2c4df Mon Sep 17 00:00:00 2001 From: parvez Date: Wed, 12 May 2021 19:24:11 +0530 Subject: [PATCH 4/7] Added separate test case for XPending with idle time --- commands.go | 2 +- commands_test.go | 28 ++++++++++++++++++++++++---- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/commands.go b/commands.go index 6b5cf280e..e2b898b1a 100644 --- a/commands.go +++ b/commands.go @@ -1822,7 +1822,7 @@ func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingE args := make([]interface{}, 0, 7) args = append(args, "xpending", a.Stream, a.Group) if a.Idle != 0 { - args = append(args, "idle", a.Idle * time.Millisecond) + args = append(args, "idle", formatMs(ctx, a.Idle)) } args = append(args, a.Start, a.End, a.Count) if a.Consumer != "" { diff --git a/commands_test.go b/commands_test.go index 74d9b477d..b614e0309 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4225,18 +4225,38 @@ var _ = Describe("Commands", func() { Higher: "3-0", Consumers: map[string]int64{"consumer": 3}, })) - - infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ + args := &redis.XPendingExtArgs{ Stream: "stream", Group: "group", - Idle: time.Duration(0), Start: "-", End: "+", Count: 10, Consumer: "consumer", - }).Result() + } + infoExt, err := client.XPendingExt(ctx, args).Result() + Expect(err).NotTo(HaveOccurred()) + for i := range infoExt { + infoExt[i].Idle = 0 + } + Expect(infoExt).To(Equal([]redis.XPendingExt{ + {ID: "1-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, + {ID: "2-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, + {ID: "3-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, + })) + + // verify empty message array with idle time + args.Idle = 500 * time.Millisecond + infoExt, err = client.XPendingExt(ctx, args).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(infoExt).To(Equal([]redis.XPendingExt{})) + + // sleeping for 2 millisecond to test for idle time + time.Sleep(5 * time.Millisecond) + args.Idle = 5 * time.Millisecond + infoExt, err = client.XPendingExt(ctx, args).Result() Expect(err).NotTo(HaveOccurred()) for i := range infoExt { + Expect(infoExt[i].Idle).Should(BeNumerically(">", 5*time.Millisecond)) infoExt[i].Idle = 0 } Expect(infoExt).To(Equal([]redis.XPendingExt{ From e5009afe00de8ae304c14fd81c77957796c30d52 Mon Sep 17 00:00:00 2001 From: parvez Date: Tue, 11 May 2021 14:49:14 +0530 Subject: [PATCH 5/7] Added missing idle args in XPendingExtArgs Made changes to XPendingExt method for allowing the usage of idle argument if defined in XPendingExtArgs Fixed args order for XPending idle time Converting idle duration to milli second before passing as an argument Added separate test case for XPending with idle time Fixed comment message Added missing Idle arg in XPendingExtArgs Changed XPendingExt method to use Idle time Added test case for XPendingExt method --- commands.go | 7 ++++++- commands_test.go | 27 ++++++++++++++++++++++++--- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/commands.go b/commands.go index 9e5728325..e2b898b1a 100644 --- a/commands.go +++ b/commands.go @@ -1811,6 +1811,7 @@ func (c cmdable) XPending(ctx context.Context, stream, group string) *XPendingCm type XPendingExtArgs struct { Stream string Group string + Idle time.Duration Start string End string Count int64 @@ -1819,7 +1820,11 @@ type XPendingExtArgs struct { func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd { args := make([]interface{}, 0, 7) - args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count) + args = append(args, "xpending", a.Stream, a.Group) + if a.Idle != 0 { + args = append(args, "idle", formatMs(ctx, a.Idle)) + } + args = append(args, a.Start, a.End, a.Count) if a.Consumer != "" { args = append(args, a.Consumer) } diff --git a/commands_test.go b/commands_test.go index f64dc97b6..52b08c518 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4225,17 +4225,38 @@ var _ = Describe("Commands", func() { Higher: "3-0", Consumers: map[string]int64{"consumer": 3}, })) - - infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ + args := &redis.XPendingExtArgs{ Stream: "stream", Group: "group", Start: "-", End: "+", Count: 10, Consumer: "consumer", - }).Result() + } + infoExt, err := client.XPendingExt(ctx, args).Result() + Expect(err).NotTo(HaveOccurred()) + for i := range infoExt { + infoExt[i].Idle = 0 + } + Expect(infoExt).To(Equal([]redis.XPendingExt{ + {ID: "1-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, + {ID: "2-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, + {ID: "3-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, + })) + + // verify empty message array with idle time + args.Idle = 500 * time.Millisecond + infoExt, err = client.XPendingExt(ctx, args).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(infoExt).To(Equal([]redis.XPendingExt{})) + + // sleeping for 5 millisecond to ensure test for idle time + time.Sleep(5 * time.Millisecond) + args.Idle = 5 * time.Millisecond + infoExt, err = client.XPendingExt(ctx, args).Result() Expect(err).NotTo(HaveOccurred()) for i := range infoExt { + Expect(infoExt[i].Idle).Should(BeNumerically(">", 5*time.Millisecond)) infoExt[i].Idle = 0 } Expect(infoExt).To(Equal([]redis.XPendingExt{ From 88dbad3186c1ecb1340c8f7ec4449c8d801ae5e5 Mon Sep 17 00:00:00 2001 From: monkey Date: Thu, 13 May 2021 22:48:40 +0800 Subject: [PATCH 6/7] fix XPendingExtArgs Signed-off-by: monkey --- commands_test.go | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/commands_test.go b/commands_test.go index 52b08c518..47233cbfb 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4244,26 +4244,10 @@ var _ = Describe("Commands", func() { {ID: "3-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, })) - // verify empty message array with idle time - args.Idle = 500 * time.Millisecond + args.Idle = 72 * time.Hour infoExt, err = client.XPendingExt(ctx, args).Result() Expect(err).NotTo(HaveOccurred()) - Expect(infoExt).To(Equal([]redis.XPendingExt{})) - - // sleeping for 5 millisecond to ensure test for idle time - time.Sleep(5 * time.Millisecond) - args.Idle = 5 * time.Millisecond - infoExt, err = client.XPendingExt(ctx, args).Result() - Expect(err).NotTo(HaveOccurred()) - for i := range infoExt { - Expect(infoExt[i].Idle).Should(BeNumerically(">", 5*time.Millisecond)) - infoExt[i].Idle = 0 - } - Expect(infoExt).To(Equal([]redis.XPendingExt{ - {ID: "1-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, - {ID: "2-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, - {ID: "3-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, - })) + Expect(infoExt).To(HaveLen(0)) n, err := client.XGroupDelConsumer(ctx, "stream", "group", "consumer").Result() Expect(err).NotTo(HaveOccurred()) From 868494afaf0cd9b933c31e758f10f1fc462cc784 Mon Sep 17 00:00:00 2001 From: monkey Date: Thu, 13 May 2021 23:03:18 +0800 Subject: [PATCH 7/7] fix XPendingExt Signed-off-by: monkey --- commands.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands.go b/commands.go index e2b898b1a..3fa8b758c 100644 --- a/commands.go +++ b/commands.go @@ -1819,7 +1819,7 @@ type XPendingExtArgs struct { } func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd { - args := make([]interface{}, 0, 7) + args := make([]interface{}, 0, 9) args = append(args, "xpending", a.Stream, a.Group) if a.Idle != 0 { args = append(args, "idle", formatMs(ctx, a.Idle))