diff --git a/command.go b/command.go index 652e241be..b79338cb9 100644 --- a/command.go +++ b/command.go @@ -17,6 +17,55 @@ import ( "github.com/redis/go-redis/v9/internal/util" ) +// keylessCommands contains Redis commands that have empty key specifications (9th slot empty) +// Only includes core Redis commands, excludes FT.*, ts.*, timeseries.*, search.* and subcommands +var keylessCommands = map[string]struct{}{ + "acl": {}, + "asking": {}, + "auth": {}, + "bgrewriteaof": {}, + "bgsave": {}, + "client": {}, + "cluster": {}, + "config": {}, + "debug": {}, + "discard": {}, + "echo": {}, + "exec": {}, + "failover": {}, + "function": {}, + "hello": {}, + "latency": {}, + "lolwut": {}, + "module": {}, + "monitor": {}, + "multi": {}, + "pfselftest": {}, + "ping": {}, + "psubscribe": {}, + "psync": {}, + "publish": {}, + "pubsub": {}, + "punsubscribe": {}, + "quit": {}, + "readonly": {}, + "readwrite": {}, + "replconf": {}, + "replicaof": {}, + "role": {}, + "save": {}, + "script": {}, + "select": {}, + "shutdown": {}, + "slaveof": {}, + "slowlog": {}, + "subscribe": {}, + "swapdb": {}, + "sync": {}, + "unsubscribe": {}, + "unwatch": {}, +} + type Cmder interface { // command name. // e.g. "set k v ex 10" -> "set", "cluster info" -> "cluster". @@ -75,12 +124,22 @@ func writeCmd(wr *proto.Writer, cmd Cmder) error { return wr.WriteArgs(cmd.Args()) } +// cmdFirstKeyPos returns the position of the first key in the command's arguments. +// If the command does not have a key, it returns 0. +// TODO: Use the data in CommandInfo to determine the first key position. func cmdFirstKeyPos(cmd Cmder) int { if pos := cmd.firstKeyPos(); pos != 0 { return int(pos) } - switch cmd.Name() { + name := cmd.Name() + + // first check if the command is keyless + if _, ok := keylessCommands[name]; ok { + return 0 + } + + switch name { case "eval", "evalsha", "eval_ro", "evalsha_ro": if cmd.stringArg(2) != "0" { return 3 diff --git a/internal_test.go b/internal_test.go index 8f1f1f312..4a655cff0 100644 --- a/internal_test.go +++ b/internal_test.go @@ -364,15 +364,22 @@ var _ = Describe("ClusterClient", func() { It("select slot from args for GETKEYSINSLOT command", func() { cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200) - slot := client.cmdSlot(cmd) + slot := client.cmdSlot(cmd, -1) Expect(slot).To(Equal(100)) }) It("select slot from args for COUNTKEYSINSLOT command", func() { cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100) - slot := client.cmdSlot(cmd) + slot := client.cmdSlot(cmd, -1) Expect(slot).To(Equal(100)) }) + + It("follows preferred random slot", func() { + cmd := NewStatusCmd(ctx, "ping") + + slot := client.cmdSlot(cmd, 101) + Expect(slot).To(Equal(101)) + }) }) }) diff --git a/osscluster.go b/osscluster.go index 55017d8ba..0526022ba 100644 --- a/osscluster.go +++ b/osscluster.go @@ -998,7 +998,7 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error { } func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, -1) var node *clusterNode var moved bool var ask bool @@ -1344,9 +1344,13 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd return err } + preferredRandomSlot := -1 if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) { for _, cmd := range cmds { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } node, err := c.slotReadOnlyNode(state, slot) if err != nil { return err @@ -1357,7 +1361,10 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd } for _, cmd := range cmds { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } node, err := state.slotMasterNode(slot) if err != nil { return err @@ -1519,58 +1526,78 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err return err } - cmdsMap := c.mapCmdsBySlot(cmds) - // TxPipeline does not support cross slot transaction. - if len(cmdsMap) > 1 { + keyedCmdsBySlot := c.slottedKeyedCommands(cmds) + slot := -1 + switch len(keyedCmdsBySlot) { + case 0: + slot = hashtag.RandomSlot() + case 1: + for sl := range keyedCmdsBySlot { + slot = sl + break + } + default: + // TxPipeline does not support cross slot transaction. setCmdsErr(cmds, ErrCrossSlot) return ErrCrossSlot } - for slot, cmds := range cmdsMap { - node, err := state.slotMasterNode(slot) - if err != nil { - setCmdsErr(cmds, err) - continue - } + node, err := state.slotMasterNode(slot) + if err != nil { + setCmdsErr(cmds, err) + return err + } - cmdsMap := map[*clusterNode][]Cmder{node: cmds} - for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { - if attempt > 0 { - if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { - setCmdsErr(cmds, err) - return err - } + cmdsMap := map[*clusterNode][]Cmder{node: cmds} + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + setCmdsErr(cmds, err) + return err } + } - failedCmds := newCmdsMap() - var wg sync.WaitGroup + failedCmds := newCmdsMap() + var wg sync.WaitGroup - for node, cmds := range cmdsMap { - wg.Add(1) - go func(node *clusterNode, cmds []Cmder) { - defer wg.Done() - c.processTxPipelineNode(ctx, node, cmds, failedCmds) - }(node, cmds) - } + for node, cmds := range cmdsMap { + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + c.processTxPipelineNode(ctx, node, cmds, failedCmds) + }(node, cmds) + } - wg.Wait() - if len(failedCmds.m) == 0 { - break - } - cmdsMap = failedCmds.m + wg.Wait() + if len(failedCmds.m) == 0 { + break } + cmdsMap = failedCmds.m } return cmdsFirstErr(cmds) } -func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { - cmdsMap := make(map[int][]Cmder) +// slottedKeyedCommands returns a map of slot to commands taking into account +// only commands that have keys. +func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder { + cmdsSlots := map[int][]Cmder{} + + preferredRandomSlot := -1 for _, cmd := range cmds { - slot := c.cmdSlot(cmd) - cmdsMap[slot] = append(cmdsMap[slot], cmd) + if cmdFirstKeyPos(cmd) == 0 { + continue + } + + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } + + cmdsSlots[slot] = append(cmdsSlots[slot], cmd) } - return cmdsMap + + return cmdsSlots } func (c *ClusterClient) processTxPipelineNode( @@ -1885,17 +1912,20 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo { return info } -func (c *ClusterClient) cmdSlot(cmd Cmder) int { +func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int { args := cmd.Args() if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") { return args[2].(int) } - return cmdSlot(cmd, cmdFirstKeyPos(cmd)) + return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot) } -func cmdSlot(cmd Cmder, pos int) int { +func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int { if pos == 0 { + if preferredRandomSlot != -1 { + return preferredRandomSlot + } return hashtag.RandomSlot() } firstKey := cmd.stringArg(pos) diff --git a/osscluster_test.go b/osscluster_test.go index 10023218d..2c7f40a5f 100644 --- a/osscluster_test.go +++ b/osscluster_test.go @@ -603,6 +603,15 @@ var _ = Describe("ClusterClient", func() { Expect(err).To(MatchError(redis.ErrCrossSlot)) }) + It("works normally with keyless commands and no CrossSlot error", func() { + pipe.Set(ctx, "A{s}", "A_value", 0) + pipe.Ping(ctx) + pipe.Set(ctx, "B{s}", "B_value", 0) + pipe.Ping(ctx) + _, err := pipe.Exec(ctx) + Expect(err).To(Not(HaveOccurred())) + }) + // doesn't fail when no commands are queued It("returns no error when there are no commands", func() { _, err := pipe.Exec(ctx) diff --git a/ring_test.go b/ring_test.go index ef95e9805..5fd7d9823 100644 --- a/ring_test.go +++ b/ring_test.go @@ -304,7 +304,7 @@ var _ = Describe("Redis Ring", func() { ring = redis.NewRing(opt) }) It("supports Process hook", func() { - err := ring.Ping(ctx).Err() + err := ring.Set(ctx, "key", "test", 0).Err() Expect(err).NotTo(HaveOccurred()) var stack []string @@ -312,12 +312,12 @@ var _ = Describe("Redis Ring", func() { ring.AddHook(&hook{ processHook: func(hook redis.ProcessHook) redis.ProcessHook { return func(ctx context.Context, cmd redis.Cmder) error { - Expect(cmd.String()).To(Equal("ping: ")) + Expect(cmd.String()).To(Equal("get key: ")) stack = append(stack, "ring.BeforeProcess") err := hook(ctx, cmd) - Expect(cmd.String()).To(Equal("ping: PONG")) + Expect(cmd.String()).To(Equal("get key: test")) stack = append(stack, "ring.AfterProcess") return err @@ -329,12 +329,12 @@ var _ = Describe("Redis Ring", func() { shard.AddHook(&hook{ processHook: func(hook redis.ProcessHook) redis.ProcessHook { return func(ctx context.Context, cmd redis.Cmder) error { - Expect(cmd.String()).To(Equal("ping: ")) + Expect(cmd.String()).To(Equal("get key: ")) stack = append(stack, "shard.BeforeProcess") err := hook(ctx, cmd) - Expect(cmd.String()).To(Equal("ping: PONG")) + Expect(cmd.String()).To(Equal("get key: test")) stack = append(stack, "shard.AfterProcess") return err @@ -344,7 +344,7 @@ var _ = Describe("Redis Ring", func() { return nil }) - err = ring.Ping(ctx).Err() + err = ring.Get(ctx, "key").Err() Expect(err).NotTo(HaveOccurred()) Expect(stack).To(Equal([]string{ "ring.BeforeProcess",