Skip to content

Commit

Permalink
opt: zero-copy RESP parser
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Jun 4, 2024
1 parent d210936 commit c38c305
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ test-cover:
go tool cover -html=coverage.txt -o coverage.html

pprof:
go tool pprof -http=:18081 "http://172.17.21.2:6060/debug/pprof/profile?seconds=30"
go tool pprof -http=:18081 "http://192.168.1.6:6060/debug/pprof/profile?seconds=30"

heap:
go tool pprof http://localhost:6060/debug/pprof/heap
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

实现特性:

1. 基于 single epoll server 的网络 IO 框架
2. 兼容 Redis RESP 通信协议,你可以使用任何 redis 客户端连接 rotom
1. 基于 single epoll server 的网络 IO 框架[1m-go-tcp-server](https://github.com/smallnest/1m-go-tcp-server)
2. 兼容 Redis RESP 协议,你可以使用任何 redis 客户端连接 rotom
3. DB hashmap 基于 [GigaCache](https://github.com/xgzlucario/GigaCache)
4. AOF 支持
5. 目前仅支持部分命令如 `ping`, `set`, `get`, `hset`, `hget`

目前的精力主要放在最有意思的框架设计上,短期内不会兼容更多的 RESP 命令
目前的精力主要放在框架设计与优化上,短期内不会兼容更多的 commands

## 使用

Expand Down
7 changes: 4 additions & 3 deletions aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,19 @@ func (aof *Aof) Read(fn func(value Value)) error {
if err != nil {
return err
}
reader := NewResp(data)

// Iterate over the records in the file, applying the function to each.
reader := NewResp(data)
var input Value
for {
value, err := reader.Read()
err := reader.Read(&input)
if err != nil {
if err == io.EOF {
break
}
return err
}
fn(value)
fn(input)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/xgzlucario/rotom
go 1.22

require (
github.com/RoaringBitmap/roaring v1.9.3
github.com/RoaringBitmap/roaring v1.9.4
github.com/deckarep/golang-set/v2 v2.6.0
github.com/redis/go-redis/v9 v9.5.2
github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/RoaringBitmap/roaring v1.9.3 h1:t4EbC5qQwnisr5PrP9nt0IRhRTb9gMUgQF4t4S2OByM=
github.com/RoaringBitmap/roaring v1.9.3/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ=
github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJRUA0wFAVE=
github.com/bits-and-blooms/bitset v1.13.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
Expand Down
59 changes: 35 additions & 24 deletions resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ type Value struct {
}

// Resp is a parser for RESP encoded data.
// It is a ZERO-COPY parser.
type Resp struct {
b []byte
}

// NewResp creates a new Resp object with a buffered reader.
func NewResp(b []byte) *Resp {
return &Resp{b: b}
// DO NOT EDIT the `input` param because it will be referenced during read.
func NewResp(input []byte) *Resp {
return &Resp{b: input}
}

func newErrValue(err error) Value {
Expand Down Expand Up @@ -95,65 +97,74 @@ func (r *Resp) readByte() (byte, error) {
}

// Read parses the next RESP value from the stream.
func (r *Resp) Read() (Value, error) {
func (r *Resp) Read(v *Value) error {
_type, err := r.readByte()
if err != nil {
return Value{}, err
return err
}

switch _type {
case ARRAY:
return r.readArray()
return r.readArray(v)
case BULK:
return r.readBulk()
return r.readBulk(v)
case INTEGER:
len, _, err := r.readInteger()
return Value{typ: INTEGER, num: int64(len)}, err
if err != nil {
return err
}
v.typ = INTEGER
v.num = int64(len)
return nil
default:
return Value{}, fmt.Errorf("%w: %c", ErrUnknownType, _type)
return fmt.Errorf("%w: %c", ErrUnknownType, _type)
}
}

// readArray reads an array prefixed with '*' from the stream.
func (r *Resp) readArray() (Value, error) {
v := Value{typ: ARRAY}

len, _, err := r.readInteger()
func (r *Resp) readArray(v *Value) error {
v.typ = ARRAY
length, _, err := r.readInteger()
if err != nil {
return v, err
return err
}

if len(v.array) < length {
v.array = make([]Value, length)
} else {
v.array = v.array[:length]
}

v.array = make([]Value, len)
for i := 0; i < len; i++ {
val, err := r.Read()
for i := range v.array {
err := r.Read(&v.array[i])
if err != nil {
return v, err
return err
}
v.array[i] = val
}

return v, nil
return nil
}

// readBulk reads a bulk string prefixed with '$' from the stream.
func (r *Resp) readBulk() (Value, error) {
v := Value{typ: BULK}
func (r *Resp) readBulk(v *Value) error {
v.typ = BULK

len, _, err := r.readInteger()
if err != nil {
return v, err
return err
}

if len == -1 { // RESP Bulk strings can be null, indicated by "$-1"
return Value{typ: NULL}, nil
v.typ = NULL
return nil
}

v.bulk = r.b[:len]
r.b = r.b[len:]

r.readLine() // Read the trailing CRLF

return v, nil
return nil
}

// Marshal converts a Value object into its corresponding RESP bytes.
Expand Down
55 changes: 36 additions & 19 deletions resp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func TestValue(t *testing.T) {
data := value.Marshal()
assert.Equal(string(data), "+OK\r\n")

_, err := NewResp(data).Read()
var value2 Value
err := NewResp(data).Read(&value2)
assert.NotNil(err)
})

Expand All @@ -27,44 +28,52 @@ func TestValue(t *testing.T) {
data := value.Marshal()
assert.Equal(string(data), "-err message\r\n")

_, err := NewResp(data).Read()
var value2 Value
err := NewResp(data).Read(&value2)
assert.NotNil(err)
})

t.Run("bulk-value", func(t *testing.T) {
value := newBulkValue([]byte("hello"))
data := value.Marshal()
assert.Equal(string(data), "$5\r\nhello\r\n")

value2, err := NewResp(data).Read()
assert.Nil(err)
assert.Equal(value, value2)
{
var value2 Value
err := NewResp(data).Read(&value2)
assert.Nil(err)
assert.Equal(value, value2)
}

// empty bulk string
value = newBulkValue([]byte(""))
data = value.Marshal()
assert.Equal(string(data), "$0\r\n\r\n")

value2, err = NewResp(data).Read()
assert.Nil(err)
assert.Equal(value, value2)
{
var value2 Value
err := NewResp(data).Read(&value2)
assert.Nil(err)
assert.Equal(value, value2)
}

// nil bulk string
value = newBulkValue(nil)
data = value.Marshal()
assert.Equal(string(data), "$-1\r\n")

value2, err = NewResp(data).Read()
assert.Nil(err)
assert.Equal(value, value2)
{
var value2 Value
err := NewResp(data).Read(&value2)
assert.Nil(err)
assert.Equal(value, value2)
}
})

t.Run("integer-value", func(t *testing.T) {
value := newIntegerValue(1)
data := value.Marshal()
assert.Equal(string(data), ":1\r\n")

value2, err := NewResp(data).Read()
var value2 Value
err := NewResp(data).Read(&value2)
assert.Nil(err)
assert.Equal(value, value2)
})
Expand All @@ -80,24 +89,26 @@ func TestValue(t *testing.T) {
data := value.Marshal()
assert.Equal(string(data), "*5\r\n:1\r\n:2\r\n:3\r\n$5\r\nhello\r\n$5\r\nworld\r\n")

value2, err := NewResp(data).Read()
var value2 Value
err := NewResp(data).Read(&value2)
assert.Nil(err)
assert.Equal(value, value2)
})

t.Run("error-value", func(t *testing.T) {
// read nil
_, err := NewResp(nil).Read()
var value Value
err := NewResp(nil).Read(&value)
assert.NotNil(err)

for _, prefix := range []byte{BULK, INTEGER, ARRAY} {
data := append([]byte{prefix}, "an error message"...)
_, err := NewResp(data).Read()
err := NewResp(data).Read(&value)
assert.NotNil(err)
}

// marshal error type
value := Value{typ: 76}
value = Value{typ: 76}
data := value.Marshal()
assert.Equal(string(data), ErrUnknownType.Error())
})
Expand All @@ -110,6 +121,12 @@ func TestValue(t *testing.T) {
assert.Equal(lower, string(lower2))
}
})

t.Run("panic", func(t *testing.T) {
assert.Panics(func() {
NewResp([]byte("$2\r\nOK\r\n")).Read(nil)
})
})
}

func BenchmarkRESP(b *testing.B) {
Expand Down
11 changes: 6 additions & 5 deletions rotom.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type DB struct {
type Server struct {
config *Config
epoller *epoll
args Value // global reused arguments
}

type Command struct {
Expand Down Expand Up @@ -136,7 +137,6 @@ func (server *Server) RunServe() {

go func() {
var buf = make([]byte, 512)

for {
connections, err := epoller.Wait()
if err != nil {
Expand Down Expand Up @@ -177,28 +177,29 @@ func (server *Server) RunServe() {
func (server *Server) handleConnection(buf []byte, conn net.Conn) {
resp := NewResp(buf)
for {
value, err := resp.Read()
err := resp.Read(&server.args)
if err != nil {
if err != io.EOF {
log.Println("read resp error:", err)
}
return
}

if value.typ != ARRAY || len(value.array) == 0 {
if server.args.typ != ARRAY || len(server.args.array) == 0 {
log.Println("invalid request, expected non-empty array")
continue
}

command := value.array[0].bulk
command := server.args.array[0].bulk
args := server.args.array[1:]
var res Value

cmd, err := lookupCommand(command)
if err != nil {
res = newErrValue(err)

} else {
res = cmd.processCommand(value.array[1:])
res = cmd.processCommand(args)

if server.config.AppendOnly && cmd.persist && res.typ != ERROR {
db.aof.Write(buf)
Expand Down

0 comments on commit c38c305

Please sign in to comment.