Skip to content

Commit

Permalink
Response byte reuse (#13)
Browse files Browse the repository at this point in the history
* build request from byte args and encode it

* fetch slice from pool

* return ByteResponse

* wip

* update cluster tests

* update example test

* update conn test
  • Loading branch information
stlava authored Apr 30, 2024
1 parent 36128af commit 232af0a
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ test: testcluster testconn testredis
mv redis-$(REDIS_VERSION)/src/redis-server /tmp/redis-server
rm redis-$(REDIS_VERSION) -rf

testredis: /tmp/redis-server/redis-server
testredis:
PATH=/tmp/redis-server/:${PATH} go test ./redis

testconn: /tmp/redis-server/redis-server
Expand Down
80 changes: 80 additions & 0 deletions redis/byteslice/byteslice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) 2021 The Gnet Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package byteslice

import (
"math"
"math/bits"
"reflect"
"runtime"
"sync"
"unsafe"
)

var builtinPool Pool

// Pool consists of 32 sync.Pool, representing byte slices of length from 0 to 32 in powers of 2.
type Pool struct {
pools [32]sync.Pool
}

// Get returns a byte slice with given length from the built-in pool.
func Get(size int) []byte {
return builtinPool.Get(size)
}

// Put returns the byte slice to the built-in pool.
func Put(buf []byte) {
builtinPool.Put(buf)
}

// Get retrieves a byte slice of the length requested by the caller from pool or allocates a new one.
func (p *Pool) Get(size int) (buf []byte) {
if size <= 0 {
return nil
}
if size > math.MaxInt32 {
return make([]byte, size)
}
idx := index(uint32(size))
ptr, _ := p.pools[idx].Get().(unsafe.Pointer)
if ptr == nil {
return make([]byte, 1<<idx)[:size]
}
sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf))
sh.Data = uintptr(ptr)
sh.Len = size
sh.Cap = 1 << idx
runtime.KeepAlive(ptr)
return
}

// Put returns the byte slice to the pool.
func (p *Pool) Put(buf []byte) {
size := cap(buf)
if size == 0 || size > math.MaxInt32 {
return
}
idx := index(uint32(size))
if size != 1<<idx { // this byte slice is not from Pool.Get(), put it into the previous interval of idx
idx--
}
// array pointer
p.pools[idx].Put(unsafe.Pointer(&buf[:1][0]))
}

func index(n uint32) uint32 {
return uint32(bits.Len32(n - 1))
}
2 changes: 1 addition & 1 deletion redis/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func ExampleSync() {
// Output:
// OK
// OK
// ["1" "2"]
// [{"1"} {"2"}]
// redispipe.result: WRONGTYPE Operation against a key holding the wrong kind of value {request: Req("HSET", ["key1" "field1" "val1"]), address: 127.0.0.1:46231}
// <nil>
// ['\x02' '\x01' "2" "1"]
Expand Down
28 changes: 24 additions & 4 deletions redis/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,26 @@ package redis
import (
"bufio"
"bytes"
"github.com/joomcode/redispipe/redis/byteslice"
"io"
"strings"

"github.com/joomcode/errorx"
)

var readerPool byteslice.Pool

type ByteResponse struct {
Val []byte
}

// Release puts Val byte slice back to pool for reuse. You must not use Val after this.
func (br *ByteResponse) Release() {
readerPool.Put(br.Val)
}

// ReadResponse reads single RESP answer from bufio.Reader
func ReadResponse(b *bufio.Reader) interface{} {
func ReadResponse(b *bufio.Reader, wrapBytes bool) interface{} {
line, isPrefix, err := b.ReadLine()
if err != nil {
return ErrIO.WrapWithNoMessage(err)
Expand Down Expand Up @@ -72,14 +84,22 @@ func ReadResponse(b *bufio.Reader) interface{} {
if v < 0 {
return nil
}
buf := make([]byte, v+2, v+2)

buf := readerPool.Get(int(v + 2))
if _, err = io.ReadFull(b, buf); err != nil {
return ErrIO.WrapWithNoMessage(err)
}

if buf[v] != '\r' || buf[v+1] != '\n' {
readerPool.Put(buf)
return ErrNoFinalRN.NewWithNoMessage()
}
return buf[:v:v]

if wrapBytes {
return ByteResponse{Val: buf[:v]}
}

return buf[:v]
case '*':
var rerr *errorx.Error
if v, rerr = parseInt(line[1:]); rerr != nil {
Expand All @@ -90,7 +110,7 @@ func ReadResponse(b *bufio.Reader) interface{} {
}
result := make([]interface{}, v)
for i := int64(0); i < v; i++ {
result[i] = ReadResponse(b)
result[i] = ReadResponse(b, false)
if e, ok := result[i].(*errorx.Error); ok && !e.IsOfType(ErrResult) {
return e
}
Expand Down
14 changes: 5 additions & 9 deletions redis/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func lines2bufio(lines ...string) *bufio.Reader {
}

func readLines(lines ...string) interface{} {
return ReadResponse(lines2bufio(lines...))
return ReadResponse(lines2bufio(lines...), true)
}

func checkErrType(t *testing.T, res interface{}, kind *errorx.Type) bool {
Expand Down Expand Up @@ -156,21 +156,17 @@ func TestReadResponse_Correct(t *testing.T) {
assert.Equal(t, int64(-9223372036854775808), res)

res = readLines("$0\r\n", "\r\n")
assert.Equal(t, []byte(""), res)
assert.Equal(t, len(res.([]byte)), cap(res.([]byte)))
assert.Equal(t, ByteResponse{Val: []byte("")}, res)

res = readLines("$1\r\n", "a\r\n")
assert.Equal(t, []byte("a"), res)
assert.Equal(t, len(res.([]byte)), cap(res.([]byte)))
assert.Equal(t, ByteResponse{Val: []byte("a")}, res)

res = readLines("$4\r\n", "asdf\r\n")
assert.Equal(t, []byte("asdf"), res)
assert.Equal(t, len(res.([]byte)), cap(res.([]byte)))
assert.Equal(t, ByteResponse{Val: []byte("asdf")}, res)

big := strings.Repeat("a", 1024*1024)
res = readLines(fmt.Sprintf("$%d\r\n", len(big)), big, "\r\n")
assert.Equal(t, []byte(big), res)
assert.Equal(t, len(res.([]byte)), cap(res.([]byte)))
assert.Equal(t, ByteResponse{Val: []byte(big)}, res)

res = readLines("*0\r\n")
assert.Equal(t, []interface{}{}, res)
Expand Down
4 changes: 4 additions & 0 deletions rediscluster/bench/go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cockroachdb/circuitbreaker v0.0.0-20210826084326-2045d59d3b5d/go.mod h1:mN5a3LcljXtJdPkmDnkbSCjPEmImXBXR+jmS31mxVwA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA=
github.com/garyburd/redigo v1.6.2/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY=
github.com/joomcode/errorx v1.0.3 h1:3e1mi0u7/HTPNdg6d6DYyKGBhA5l9XpsfuVE29NxnWw=
github.com/joomcode/errorx v1.0.3/go.mod h1:eQzdtdlNyN7etw6YCS4W4+lu442waxZYw5yvz0ULrRo=
github.com/mediocregopher/radix.v2 v0.0.0-20181115013041-b67df6e626f9/go.mod h1:fLRUbhbSd5Px2yKUaGYYPltlyxi1guJz1vCmo1RQL50=
github.com/mediocregopher/radix/v3 v3.7.0 h1:SM9zJdme5pYGEVvh1HttjBjDmIaNBDKy+oDCv5w81Wo=
github.com/mediocregopher/radix/v3 v3.7.0/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea/go.mod h1:1VcHEd3ro4QMoHfiNl/j7Jkln9+KQuorp0PItHMJYNg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
42 changes: 25 additions & 17 deletions rediscluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *Suite) TestBasicOps() {
s.slotnode(i).DoSure("SET", slotkey("basic", key), key+"y")
}
for _, key := range s.keys {
s.Equal([]byte(key+"y"), scl.Do(s.ctx, "GET", slotkey("basic", key)))
s.Equal(redis.ByteResponse{Val: []byte(key + "y")}, scl.Do(s.ctx, "GET", slotkey("basic", key)))
}
}

Expand Down Expand Up @@ -256,7 +256,7 @@ func (s *Suite) TestSendMany() {
}
ress = scl.SendMany(s.ctx, reqs)
for i, res := range ress {
s.Equal([]byte(s.keys[i]+"y"), res)
s.Equal(redis.ByteResponse{Val: []byte(s.keys[i] + "y")}, res)
}
}

Expand Down Expand Up @@ -424,7 +424,7 @@ func (s *Suite) TestGetMoved() {

s.cl.MoveSlot(10999, 1, 2)

s.Equal([]byte(key), sconn.Do(s.ctx, "GET", key))
s.Equal(redis.ByteResponse{Val: []byte(key)}, sconn.Do(s.ctx, "GET", key))
s.Contains(DebugEvents(), "moved")

s.cl.MoveSlot(10999, 2, 1)
Expand Down Expand Up @@ -485,7 +485,7 @@ func (s *Suite) TestAsk() {

key := slotkey("ask", s.keys[10997])
s.r().Equal("OK", sconn.Do(s.ctx, "SET", key, key))
s.Equal([]byte(key), sconn.Do(s.ctx, "GET", key))
s.Equal(redis.ByteResponse{Val: []byte(key)}, sconn.Do(s.ctx, "GET", key))
s.Contains(DebugEvents(), "asking")

// recheck that redis responses with correct errors
Expand Down Expand Up @@ -531,7 +531,7 @@ func (s *Suite) TestAskTransaction() {
s.Equal([]interface{}{"OK", "OK"}, res)
s.Contains(DebugEvents(), "transaction asking")

s.Equal([]byte("1"), sconn.Do(s.ctx, "GET", key1))
s.Equal(redis.ByteResponse{Val: []byte("1")}, sconn.Do(s.ctx, "GET", key1))

DebugEventsReset()
// if some keys are absent in new shard, then redis returns TRYAGAIN error
Expand All @@ -558,8 +558,8 @@ func (s *Suite) TestAskTransaction() {
})
s.Nil(err)

s.Equal([]byte("1"), sconn.Do(s.ctx, "GET", key2))
s.Equal([]byte("2"), sconn.Do(s.ctx, "GET", key3))
s.Equal(redis.ByteResponse{Val: []byte("1")}, sconn.Do(s.ctx, "GET", key2))
s.Equal(redis.ByteResponse{Val: []byte("2")}, sconn.Do(s.ctx, "GET", key3))
s.Contains(DebugEvents(), "transaction asking")
s.Contains(DebugEvents(), "transaction tryagain")

Expand Down Expand Up @@ -589,8 +589,8 @@ func (s *Suite) TestMovedTransaction() {
s.Nil(err)
s.Equal([]interface{}{"OK", "OK"}, res)

s.Equal([]byte("2"), sconn.Do(s.ctx, "GET", key1))
s.Equal([]byte("3"), sconn.Do(s.ctx, "GET", key2))
s.Equal(redis.ByteResponse{Val: []byte("2")}, sconn.Do(s.ctx, "GET", key1))
s.Equal(redis.ByteResponse{Val: []byte("3")}, sconn.Do(s.ctx, "GET", key2))
s.Equal([]string{"transaction moved"}, DebugEvents())
}

Expand Down Expand Up @@ -626,7 +626,7 @@ func (s *Suite) TestAllReturns_Good() {
skey := s.keys[(i*N+j)*127%NumSlots]
key := slotkey("allgood", skey)
res := sconn.Do(s.ctx, "GET", key)
if !s.Equal([]byte(skey), res) {
if !s.Equal(redis.ByteResponse{Val: []byte(skey)}, res) {
return
}

Expand All @@ -648,7 +648,7 @@ func (s *Suite) TestAllReturns_Good() {
if !s.Equal("OK", ress[0]) {
return
}
if ress[1] != nil && !s.Equal([]byte(keya), ress[1]) {
if ress[1] != nil && !s.Equal(redis.ByteResponse{Val: []byte(keya)}, ress[1]) {
return
}
}
Expand Down Expand Up @@ -699,7 +699,7 @@ func (s *Suite) TestAllReturns_GoodMoving() {
skey := s.keys[(i*N+j)*127%NumSlots]
key := slotkey("allgoodmove", skey)
res := sconn.Do(ctx, "GET", key)
if !s.Equal([]byte(skey), res) {
if !s.Equal(redis.ByteResponse{Val: []byte(skey)}, res) {
log.Println("Res ", res)
atomic.AddUint32(&bad, 1)
}
Expand All @@ -723,7 +723,7 @@ func (s *Suite) TestAllReturns_GoodMoving() {
log.Println("Ress[0] ", ress[0])
atomic.AddUint32(&bad, 1)
}
if ress[1] != nil && !s.Equal([]byte(keya), ress[1]) {
if ress[1] != nil && !s.Equal(redis.ByteResponse{Val: []byte(keya)}, ress[1]) {
log.Println("Ress[1] ", ress[1])
atomic.AddUint32(&bad, 1)
}
Expand Down Expand Up @@ -818,11 +818,15 @@ func (s *Suite) TestAllReturns_Bad() {
ress, err = sconn.SendTransaction(s.ctx, reqs)
}
if check {
ok := s.Equal([]byte(skey), res)
ok := s.Equal(redis.ByteResponse{Val: []byte(skey)}, res)
ok = ok && err == nil
ok = ok && s.Equal("OK", ress[0])
if ress[1] != nil {
ok = ok && s.Equal([]byte(keya), ress[1])
if !transact {
ok = ok && s.Equal(redis.ByteResponse{Val: []byte(keya)}, ress[1])
} else {
ok = ok && s.Equal([]byte(keya), ress[1])
}
}
checks <- ok
}
Expand Down Expand Up @@ -1015,11 +1019,15 @@ func (s *Suite) TestAllReturns_Bad_Latency() {
ress, err = sconn.SendTransaction(s.ctx, reqs)
}
if check {
ok := s.Equal([]byte(skey), res)
ok := s.Equal(redis.ByteResponse{Val: []byte(skey)}, res)
ok = ok && err == nil
ok = ok && s.Equal("OK", ress[0])
if ress[1] != nil {
ok = ok && s.Equal([]byte(keya), ress[1])
if !transact {
ok = ok && s.Equal(redis.ByteResponse{Val: []byte(keya)}, ress[1])
} else {
ok = ok && s.Equal([]byte(keya), ress[1])
}
}
checks <- ok
}
Expand Down
4 changes: 4 additions & 0 deletions redisconn/bench/go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cockroachdb/circuitbreaker v0.0.0-20210826084326-2045d59d3b5d/go.mod h1:mN5a3LcljXtJdPkmDnkbSCjPEmImXBXR+jmS31mxVwA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA=
github.com/garyburd/redigo v1.6.2/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY=
github.com/gomodule/redigo v1.8.4 h1:Z5JUg94HMTR1XpwBaSH4vq3+PNSIykBLxMdglbw10gg=
github.com/gomodule/redigo v1.8.4/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
Expand All @@ -9,6 +12,7 @@ github.com/joomcode/errorx v1.0.3/go.mod h1:eQzdtdlNyN7etw6YCS4W4+lu442waxZYw5yv
github.com/mediocregopher/radix.v2 v0.0.0-20181115013041-b67df6e626f9/go.mod h1:fLRUbhbSd5Px2yKUaGYYPltlyxi1guJz1vCmo1RQL50=
github.com/mediocregopher/radix/v3 v3.7.0 h1:SM9zJdme5pYGEVvh1HttjBjDmIaNBDKy+oDCv5w81Wo=
github.com/mediocregopher/radix/v3 v3.7.0/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea/go.mod h1:1VcHEd3ro4QMoHfiNl/j7Jkln9+KQuorp0PItHMJYNg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
Loading

0 comments on commit 232af0a

Please sign in to comment.