From e329b9ef6c2f768f67b889f3e45eb20a032d4d4c Mon Sep 17 00:00:00 2001 From: Hristo Temelski Date: Tue, 9 Sep 2025 17:33:30 +0300 Subject: [PATCH 1/3] Added batch process method to the pipeline --- pipeline.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pipeline.go b/pipeline.go index dbbced506..567bf121a 100644 --- a/pipeline.go +++ b/pipeline.go @@ -30,9 +30,12 @@ type Pipeliner interface { // If a certain Redis command is not yet supported, you can use Do to execute it. Do(ctx context.Context, args ...interface{}) *Cmd - // Process puts the commands to be executed into the pipeline buffer. + // Process queues the cmd for later execution. Process(ctx context.Context, cmd Cmder) error + // BatchProcess adds multiple commands to be executed into the pipeline buffer. + BatchProcess(ctx context.Context, cmd ...Cmder) error + // Discard discards all commands in the pipeline buffer that have not yet been executed. Discard() @@ -79,7 +82,12 @@ func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd { // Process queues the cmd for later execution. func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error { - c.cmds = append(c.cmds, cmd) + return c.BatchProcess(ctx, cmd) +} + +// BatchProcess queues multiple cmds for later execution. +func (c *Pipeline) BatchProcess(ctx context.Context, cmd ...Cmder) error { + c.cmds = append(c.cmds, cmd...) return nil } From a4748fe56b8df27c020b1d82224cbf75669307ee Mon Sep 17 00:00:00 2001 From: Hristo Temelski Date: Tue, 9 Sep 2025 17:55:36 +0300 Subject: [PATCH 2/3] Added Process and BatchProcess tests --- pipeline_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pipeline_test.go b/pipeline_test.go index d32ab35b1..e5eee58c7 100644 --- a/pipeline_test.go +++ b/pipeline_test.go @@ -114,6 +114,25 @@ var _ = Describe("pipelining", func() { err := pipe.Do(ctx).Err() Expect(err).To(Equal(errors.New("redis: please enter the command to be executed"))) }) + + It("should process", func() { + err := pipe.Process(ctx, redis.NewCmd(ctx, "asking")) + Expect(err).To(Equal(nil)) + Expect(pipe.Cmds()).To(HaveLen(1)) + }) + + It("should batchProcess", func() { + err := pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking")) + Expect(err).To(Equal(nil)) + Expect(pipe.Cmds()).To(HaveLen(1)) + + pipe.Discard() + Expect(pipe.Cmds()).To(HaveLen(0)) + + err = pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking"), redis.NewCmd(ctx, "set", "key", "value")) + Expect(err).To(Equal(nil)) + Expect(pipe.Cmds()).To(HaveLen(2)) + }) } Describe("Pipeline", func() { From 1e14220cbd587b95a99abde94d631d23c9ed6fd8 Mon Sep 17 00:00:00 2001 From: Hristo Temelski Date: Tue, 9 Sep 2025 18:01:10 +0300 Subject: [PATCH 3/3] Fix test matching --- pipeline_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipeline_test.go b/pipeline_test.go index e5eee58c7..15eacb3db 100644 --- a/pipeline_test.go +++ b/pipeline_test.go @@ -117,20 +117,20 @@ var _ = Describe("pipelining", func() { It("should process", func() { err := pipe.Process(ctx, redis.NewCmd(ctx, "asking")) - Expect(err).To(Equal(nil)) + Expect(err).To(BeNil()) Expect(pipe.Cmds()).To(HaveLen(1)) }) It("should batchProcess", func() { err := pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking")) - Expect(err).To(Equal(nil)) + Expect(err).To(BeNil()) Expect(pipe.Cmds()).To(HaveLen(1)) pipe.Discard() Expect(pipe.Cmds()).To(HaveLen(0)) err = pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking"), redis.NewCmd(ctx, "set", "key", "value")) - Expect(err).To(Equal(nil)) + Expect(err).To(BeNil()) Expect(pipe.Cmds()).To(HaveLen(2)) }) }