|
| 1 | +package rediscluster |
| 2 | + |
| 3 | +import ( |
| 4 | + "github.com/fzzbt/radix/redis" |
| 5 | + "github.com/joaojeronimo/go-crc16" |
| 6 | + "strconv" |
| 7 | + "strings" |
| 8 | +) |
| 9 | + |
| 10 | +func clusterSlot(key string) int { |
| 11 | + return (int(crc16.Crc16(key)) % 4096) |
| 12 | +} |
| 13 | + |
| 14 | +type SlotInterval struct { |
| 15 | + LowerLimit int |
| 16 | + UpperLimit int |
| 17 | +} |
| 18 | + |
| 19 | +func ParseNode(unparsedNode string) (node Node) { |
| 20 | + parts := strings.Split(unparsedNode, " ") |
| 21 | + node.Name = parts[0] |
| 22 | + node.Address = parts[1] |
| 23 | + node.Flags = parts[2] |
| 24 | + //node.LastPingSent, _ = strconv.Atoi(parts[4]) |
| 25 | + //node.LastPongReceived, _ = strconv.Atoi(parts[5]) |
| 26 | + node.State = parts[6] |
| 27 | + |
| 28 | + slots := strings.Split(parts[7], "-") |
| 29 | + var slotInterval SlotInterval |
| 30 | + slotInterval.LowerLimit, _ = strconv.Atoi(slots[0]) |
| 31 | + slotInterval.UpperLimit, _ = strconv.Atoi(slots[1]) |
| 32 | + node.Slots = slotInterval |
| 33 | + return |
| 34 | +} |
| 35 | + |
| 36 | +type Node struct { |
| 37 | + Name string |
| 38 | + Address string |
| 39 | + Client *redis.Client |
| 40 | + Flags string |
| 41 | + LastPingSent int |
| 42 | + LastPongReceived int |
| 43 | + State string |
| 44 | + Slots SlotInterval |
| 45 | +} |
| 46 | + |
| 47 | +type Cluster struct { |
| 48 | + nodes []Node |
| 49 | +} |
| 50 | + |
| 51 | +func (n *Node) Connect() { |
| 52 | + conf := redis.DefaultConfig() |
| 53 | + conf.Address = n.Address |
| 54 | + n.Client = redis.NewClient(conf) |
| 55 | +} |
| 56 | + |
| 57 | +func discoverNodes(firstLink string) (cluster Cluster) { |
| 58 | + conf := redis.DefaultConfig() |
| 59 | + conf.Address = firstLink |
| 60 | + c := redis.NewClient(conf) |
| 61 | + s, err := c.Cluster("nodes").Str() |
| 62 | + if err != nil { |
| 63 | + return |
| 64 | + } |
| 65 | + unparsedNodes := strings.Split(s, "\n") |
| 66 | + var parsedNodes []Node |
| 67 | + for i := 0; i < len(unparsedNodes)-1; i++ { // -1 because the last line is empty |
| 68 | + parsedNode := ParseNode(unparsedNodes[i]) |
| 69 | + if parsedNode.Address == ":0" { |
| 70 | + parsedNode.Address = firstLink |
| 71 | + parsedNode.Client = c |
| 72 | + } else { |
| 73 | + parsedNode.Connect() |
| 74 | + } |
| 75 | + parsedNodes = append(parsedNodes, parsedNode) |
| 76 | + } |
| 77 | + cluster.nodes = parsedNodes |
| 78 | + return |
| 79 | +} |
| 80 | + |
| 81 | +func NewCluster(firstLink string) (cc Cluster) { |
| 82 | + return discoverNodes(firstLink) |
| 83 | +} |
| 84 | + |
| 85 | +func (cc *Cluster) Call(command string, args ...interface{}) (reply *redis.Reply) { |
| 86 | + slot := clusterSlot(args[0].(string)) |
| 87 | + var slots SlotInterval |
| 88 | + for i := 0; i < len(cc.nodes); i++ { |
| 89 | + slots = cc.nodes[i].Slots |
| 90 | + if slot > slots.LowerLimit && slot < slots.UpperLimit { |
| 91 | + reply = cc.nodes[i].Client.Call(command, args...) |
| 92 | + return |
| 93 | + } |
| 94 | + } |
| 95 | + return |
| 96 | +} |
| 97 | + |
| 98 | +func (cc *Cluster) AsyncCall(command string, args ...interface{}) (future redis.Future) { |
| 99 | + slot := clusterSlot(args[0].(string)) |
| 100 | + var slots SlotInterval |
| 101 | + for i := 0; i < len(cc.nodes); i++ { |
| 102 | + slots = cc.nodes[i].Slots |
| 103 | + if slot > slots.LowerLimit && slot < slots.UpperLimit { |
| 104 | + future = cc.nodes[i].Client.AsyncCall(command, args...) |
| 105 | + return |
| 106 | + } |
| 107 | + } |
| 108 | + return |
| 109 | +} |
| 110 | + |
| 111 | +func (cc *Cluster) Close() bool { |
| 112 | + for i := 0; i < len(cc.nodes); i++ { |
| 113 | + cc.nodes[i].Client.Close() |
| 114 | + } |
| 115 | + return true |
| 116 | +} |
0 commit comments