Skip to content

Commit

Permalink
build request from byte args and encode it (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
stlava committed Apr 25, 2024
1 parent 2880a2b commit 36128af
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 14 deletions.
71 changes: 68 additions & 3 deletions redis/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,70 @@ import "fmt"

// Req - convenient wrapper to create Request.
func Req(cmd string, args ...interface{}) Request {
return Request{cmd, args}
return Request{cmd, nil, args, nil}
}

// ReqFromByteArgs creates a Request from a command, list of args in byte form, and
// uses the provided buf to store the Request.EncodedBytes
func ReqFromByteArgs(cmd string, args [][]byte, buf []byte) Request {
space := -1
for i, c := range cmd {
if c == ' ' {
space = i
break
}
}

if space == -1 {
buf = appendHead(buf, '*', len(args)+1)
buf = appendHead(buf, '$', len(cmd))
buf = append(buf, cmd...)
buf = append(buf, '\r', '\n')
} else {
buf = appendHead(buf, '*', len(args)+2)
buf = appendHead(buf, '$', space)
buf = append(buf, cmd[:space]...)
buf = append(buf, '\r', '\n')
buf = appendHead(buf, '$', len(cmd)-space-1)
buf = append(buf, cmd[space+1:]...)
buf = append(buf, '\r', '\n')
}

for _, arg := range args {
buf = appendHead(buf, '$', len(arg))
buf = append(buf, arg...)
buf = append(buf, '\r', '\n')
}

if cmd == "RANDOMKEY" {
return Request{Cmd: cmd, Args: nil, EncodedBytes: buf}
}

var n int
switch cmd {
case "FCALL", "FCALL_RO":
n = 2
case "EVAL", "EVALSHA":
n = 2
case "BITOP":
n = 1
default:
n = 0
}

return Request{Cmd: cmd, KeyBytes: args[n], Args: nil, EncodedBytes: buf}
}

// Request represents request to be passed to redis.
type Request struct {
// Cmd is a redis command to be sent.
// It could contain single space, then it will be split, and last part will be serialized as an argument.
Cmd string
Args []interface{}
Cmd string
KeyBytes []byte

// Either specify Args to encode or pre-encoded request via EncodedBytes
Args []interface{}
EncodedBytes []byte
}

func (r Request) String() string {
Expand All @@ -39,6 +94,11 @@ func (r Request) Key() (string, bool) {
if r.Cmd == "RANDOMKEY" {
return "RANDOMKEY", false
}

if r.KeyBytes != nil {
return string(r.KeyBytes), true
}

var n int
switch r.Cmd {
case "FCALL", "FCALL_RO":
Expand All @@ -63,6 +123,11 @@ func (r Request) KeyByte() ([]byte, bool) {
if r.Cmd == "RANDOMKEY" {
return rKey, false
}

if r.KeyBytes != nil {
return r.KeyBytes, true
}

var n int
switch r.Cmd {
case "FCALL", "FCALL_RO":
Expand Down
4 changes: 4 additions & 0 deletions redis/request_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
//
// Note: command could contain single space. In that case, it will be split and last part will be prepended to arguments.
func AppendRequest(buf []byte, req Request) ([]byte, error) {
if len(req.EncodedBytes) != 0 {
return append(buf, req.EncodedBytes...), nil
}

oldSize := len(buf)
space := -1
for i, c := range []byte(req.Cmd) {
Expand Down
2 changes: 1 addition & 1 deletion redis/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s ScanOpts) Request(it []byte) Request {
if s.Count > 0 {
args = append(args, "COUNT", s.Count)
}
return Request{s.Cmd, args}
return Request{s.Cmd, nil, args, nil}
}

// ScannerBase is internal "parent" object for scanner implementations
Expand Down
2 changes: 1 addition & 1 deletion redis/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Sync struct {
// Do is convenient method to construct and send request.
// Returns value that could be either result or error.
func (s Sync) Do(cmd string, args ...interface{}) interface{} {
return s.Send(Request{cmd, args})
return s.Send(Request{cmd, nil, args, nil})
}

// Send sends request to redis.
Expand Down
2 changes: 1 addition & 1 deletion redis/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type SyncCtx struct {
// Returns value that could be either result or error.
// When context is cancelled, Do returns ErrRequestCancelled error.
func (s SyncCtx) Do(ctx context.Context, cmd string, args ...interface{}) interface{} {
return s.Send(ctx, Request{cmd, args})
return s.Send(ctx, Request{cmd, nil, args, nil})
}

// Send sends request to redis.
Expand Down
4 changes: 2 additions & 2 deletions rediscluster/slotrange.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ func (cfg *clusterConfig) setConnRoles() {
}
for _, conn := range node.conns {
if i == 0 {
conn.Send(Request{"READWRITE", nil}, nil, 0)
conn.Send(Request{"READWRITE", nil, nil, nil}, nil, 0)
} else {
conn.SendBatch([]Request{{"READONLY", nil}, {"INFO", nil}},
conn.SendBatch([]Request{{"READONLY", nil, nil, nil}, {"INFO", nil, nil, nil}},
redis.FuncFuture(sh.setReplicaInfo), uint64(i*2))
}
}
Expand Down
8 changes: 4 additions & 4 deletions redisconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (conn *Connection) doSend(req Request, cb Future, n uint64, asking bool) *e
futures := conn.futures
if asking {
// send ASKING request before actual
futures = append(futures, future{&dumb, 0, 0, Request{"ASKING", nil}})
futures = append(futures, future{&dumb, 0, 0, Request{"ASKING", nil, nil, nil}})
}
futures = append(futures, future{cb, n, nownano(), req})

Expand Down Expand Up @@ -492,11 +492,11 @@ func (conn *Connection) doSendBatch(requests []Request, cb Future, start uint64,
futures := conn.futures
if flags&DoAsking != 0 {
// send ASKING request before actual
futures = append(futures, future{&dumb, 0, 0, Request{"ASKING", nil}})
futures = append(futures, future{&dumb, 0, 0, Request{"ASKING", nil, nil, nil}})
}
if flags&DoTransaction != 0 {
// send MULTI request for transaction start
futures = append(futures, future{&dumb, 0, 0, Request{"MULTI", nil}})
futures = append(futures, future{&dumb, 0, 0, Request{"MULTI", nil, nil, nil}})
}

now := nownano()
Expand All @@ -507,7 +507,7 @@ func (conn *Connection) doSendBatch(requests []Request, cb Future, start uint64,

if flags&DoTransaction != 0 {
// send EXEC request for transaction end
futures = append(futures, future{cb, start + uint64(len(requests)), now, Request{"EXEC", nil}})
futures = append(futures, future{cb, start + uint64(len(requests)), now, Request{"EXEC", nil, nil, nil}})
}

// should notify writer about this shard having queries
Expand Down
4 changes: 2 additions & 2 deletions redisdumb/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *Conn) Do(cmd string, args ...interface{}) interface{} {
c.Do("ASKING")
}
c.C.SetDeadline(time.Now().Add(timeout))
req, err = redis.AppendRequest(nil, redis.Request{cmd, args})
req, err = redis.AppendRequest(nil, redis.Request{cmd, nil, args, nil})
if err == nil {
if _, err = c.C.Write(req); err == nil {
res := redis.ReadResponse(c.R)
Expand Down Expand Up @@ -194,7 +194,7 @@ func Do(addr string, cmd string, args ...interface{}) interface{} {
}
defer conn.Close()
conn.SetDeadline(time.Now().Add(DefaultTimeout))
req, rerr := redis.AppendRequest(nil, redis.Request{cmd, args})
req, rerr := redis.AppendRequest(nil, redis.Request{cmd, nil, args, nil})
if rerr != nil {
return rerr
}
Expand Down

0 comments on commit 36128af

Please sign in to comment.