From 6b87a589b6ab4c2c86878a482fac7667ac780b1d Mon Sep 17 00:00:00 2001 From: kosiew Date: Thu, 26 Jun 2025 10:24:56 +0800 Subject: [PATCH] fix(ring): propagate pipeline errors --- ring.go | 28 ++++++++++++++++++++++++---- ring_test.go | 17 +++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/ring.go b/ring.go index ba4f94eed..73bdf5266 100644 --- a/ring.go +++ b/ring.go @@ -13,6 +13,7 @@ import ( "github.com/cespare/xxhash/v2" "github.com/dgryski/go-rendezvous" //nolint + "github.com/redis/go-redis/v9/auth" "github.com/redis/go-redis/v9/internal" @@ -797,29 +798,48 @@ func (c *Ring) generalProcessPipeline( cmdsMap[hash] = append(cmdsMap[hash], cmd) } - var wg sync.WaitGroup + var ( + wg sync.WaitGroup + mu sync.Mutex + firstErr error + ) for hash, cmds := range cmdsMap { wg.Add(1) go func(hash string, cmds []Cmder) { defer wg.Done() - // TODO: retry? shard, err := c.sharding.GetByName(hash) if err != nil { setCmdsErr(cmds, err) + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() return } if tx { cmds = wrapMultiExec(ctx, cmds) - _ = shard.Client.processTxPipelineHook(ctx, cmds) + err = shard.Client.processTxPipelineHook(ctx, cmds) } else { - _ = shard.Client.processPipelineHook(ctx, cmds) + err = shard.Client.processPipelineHook(ctx, cmds) + } + if err != nil { + setCmdsErr(cmds, err) + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() } }(hash, cmds) } wg.Wait() + if firstErr != nil { + return firstErr + } return cmdsFirstErr(cmds) } diff --git a/ring_test.go b/ring_test.go index 5fd7d9823..49d93945c 100644 --- a/ring_test.go +++ b/ring_test.go @@ -867,3 +867,20 @@ var _ = Describe("Ring GetShardClients and GetShardClientForKey", func() { Expect(len(shardMap)).To(BeNumerically("<=", 2)) // At most 2 shards (our setup) }) }) + +var _ = Describe("unreachable ring shard", func() { + It("pipeline returns dial error", func() { + ring := redis.NewRing(&redis.RingOptions{ + Addrs: map[string]string{"shard1": "10.255.255.1:6379"}, + DialTimeout: 100 * time.Millisecond, + HeartbeatFrequency: time.Hour, + }) + defer ring.Close() + + _, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error { + pipe.Ping(ctx) + return nil + }) + Expect(err).To(HaveOccurred()) + }) +})