Skip to content

Commit

Permalink
add: AeLoop add readQueryFromClient
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Jun 7, 2024
1 parent 4977439 commit f579396
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 302 deletions.
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,4 @@ $ redis-benchmark -t hset -p 6969
99.97% <= 1.4 milliseconds
100.00% <= 1.4 milliseconds
100300.91 requests per second
```

火焰图

![img](graph.jpg)
```
27 changes: 14 additions & 13 deletions ae.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,15 @@ func GetMsTime() int64 {
func (loop *AeLoop) AddTimeEvent(mask TeType, interval int64, proc TimeProc, extra interface{}) int {
id := loop.timeEventNextId
loop.timeEventNextId++
var te AeTimeEvent
te.id = id
te.mask = mask
te.interval = interval
te.when = GetMsTime() + interval
te.proc = proc
te.extra = extra
te.next = loop.TimeEvents
te := AeTimeEvent{
id: id,
mask: mask,
interval: interval,
when: GetMsTime() + interval,
proc: proc,
extra: extra,
next: loop.TimeEvents,
}
loop.TimeEvents = &te
return id
}
Expand Down Expand Up @@ -183,15 +184,15 @@ func (loop *AeLoop) AeWait() (tes []*AeTimeEvent, fes []*AeFileEvent) {
logger.Error().Msgf("epoll wait error: %v", err)
}
// collect file events
for i := 0; i < n; i++ {
if events[i].Events&unix.EPOLLIN != 0 {
fe := loop.FileEvents[getFeKey(int(events[i].Fd), AE_READABLE)]
for _, ev := range events[:n] {
if ev.Events&unix.EPOLLIN != 0 {
fe := loop.FileEvents[getFeKey(int(ev.Fd), AE_READABLE)]
if fe != nil {
fes = append(fes, fe)
}
}
if events[i].Events&unix.EPOLLOUT != 0 {
fe := loop.FileEvents[getFeKey(int(events[i].Fd), AE_WRITABLE)]
if ev.Events&unix.EPOLLOUT != 0 {
fe := loop.FileEvents[getFeKey(int(ev.Fd), AE_WRITABLE)]
if fe != nil {
fes = append(fes, fe)
}
Expand Down
58 changes: 13 additions & 45 deletions aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ import (
"bytes"
"io"
"os"
"sync"
"time"

"github.com/tidwall/mmap"
)

const (
Expand All @@ -20,73 +16,45 @@ type Aof struct {
filePath string
file *os.File
buf *bytes.Buffer
mu sync.Mutex
}

func NewAof(path string) (*Aof, error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, err
}

aof := &Aof{
return &Aof{
file: fd,
filePath: path,
buf: bytes.NewBuffer(make([]byte, 0, MB)),
}

go func() {
ticker := time.NewTicker(time.Second)
for range ticker.C {
aof.mu.Lock()
// flush buffer to disk
aof.buf.WriteTo(aof.file)
aof.file.Sync()
aof.mu.Unlock()
}
}()

return aof, nil
buf: bytes.NewBuffer(make([]byte, 0, KB)),
}, nil
}

func (aof *Aof) Close() error {
aof.mu.Lock()
defer aof.mu.Unlock()
return aof.file.Close()
}

func (aof *Aof) Write(buf []byte) error {
aof.mu.Lock()
_, err := aof.buf.Write(buf)
aof.mu.Unlock()
return err
func (aof *Aof) Write(buf []byte) (int, error) {
return aof.buf.Write(buf)
}

func (aof *Aof) Read(fn func(value Value)) error {
aof.mu.Lock()
defer aof.mu.Unlock()

// Read file data by mmap.
data, err := mmap.Open(aof.filePath, false)
if len(data) == 0 {
return nil
}
if err != nil {
return err
}
func (aof *Aof) Flush() error {
aof.buf.WriteTo(aof.file)
return aof.file.Sync()
}

func (aof *Aof) Read(fn func(value Value)) error {
// Iterate over the records in the file, applying the function to each.
reader := NewResp(data)
var input Value
reader := NewResp(aof.file)
for {
err := reader.Read(&input)
value, err := reader.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
fn(input)
fn(value)
}

return nil
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/rs/zerolog v1.33.0
github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980
github.com/stretchr/testify v1.9.0
github.com/tidwall/mmap v0.3.0
github.com/xgzlucario/GigaCache v0.0.0-20240605031700-e88a04a9dd84
github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b
golang.org/x/sys v0.21.0
Expand All @@ -20,7 +19,6 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand Down
5 changes: 0 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80N
github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
Expand Down Expand Up @@ -49,15 +47,12 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/mmap v0.3.0 h1:XXt1YsiXCF5/UAu3pLbu6g7iulJ9jsbs6vt7UpiV0sY=
github.com/tidwall/mmap v0.3.0/go.mod h1:2/dNzF5zA+te/JVHfrqNLcRkb8LjdH3c80vYHFQEZRk=
github.com/xgzlucario/GigaCache v0.0.0-20240605031700-e88a04a9dd84 h1:YZQ7pvAASgoW0FsOF4pXkzgdWJQSS7j4JsOaU8Oc724=
github.com/xgzlucario/GigaCache v0.0.0-20240605031700-e88a04a9dd84/go.mod h1:sPwGPAuvd9WdiONTmusXGNocqcY5L/J7+st1upAMlX8=
github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b h1:C/+nN/kFJ6yrmEhIu+5Ra2jx/W8w+Ayu8pTiZfuU5Xc=
github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b/go.mod h1:1ZgyZNk91XIllYdOPpwP+9L2RCw6QGSy6alTYF+Z0iU=
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 h1:LoYXNGAShUG3m/ehNk4iFctuhGX/+R1ZpfJ4/ia80JM=
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
Binary file removed graph.jpg
Binary file not shown.
50 changes: 14 additions & 36 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package main

import (
"fmt"
"strconv"
"time"

"github.com/xgzlucario/rotom/structx"
)
Expand All @@ -15,31 +13,12 @@ func pingCommand(_ []Value) Value {
func setCommand(args []Value) Value {
key := args[0].bulk
value := args[1].bulk
exargs := args[2:]
var duration time.Duration

for i, arg := range exargs {
switch b2s(arg.bulk) {
case "NX", "nx":
case "PX", "px":
case "EX", "ex":
if len(exargs) > i+1 {
seconds, err := strconv.Atoi(b2s(exargs[i+1].bulk))
if err != nil {
return newErrValue(ErrParseInteger)
}
duration = time.Duration(seconds)
} else {
return newErrValue(ErrWrongNumberArgs("set"))
}
}
}
db.strs.SetEx(b2s(key), value, duration)
db.strs.Set(string(key), value)
return ValueOK
}

func getCommand(args []Value) Value {
key := b2s(args[0].bulk)
key := string(args[0].bulk)

value, _, ok := db.strs.Get(key)
if ok {
Expand All @@ -54,7 +33,7 @@ func getCommand(args []Value) Value {
}

func hsetCommand(args []Value) Value {
hash := args[0].bulk
hash := string(args[0].bulk)
args = args[1:]

// check arguments number
Expand All @@ -71,30 +50,30 @@ func hsetCommand(args []Value) Value {
for i := 0; i < len(args); i += 2 {
key := args[i].bulk
value := args[i+1].bulk
if hmap.Set(b2s(key), value) {
if hmap.Set(string(key), value) {
newFields++
}
}
return newIntegerValue(newFields)
}

func hgetCommand(args []Value) Value {
hash := args[0].bulk
key := args[1].bulk
hash := string(args[0].bulk)
key := string(args[1].bulk)

hmap, err := fetchMap(hash)
if err != nil {
return newErrValue(ErrWrongType)
}
value, _, ok := hmap.Get(b2s(key))
value, _, ok := hmap.Get(key)
if !ok {
return ValueNull
}
return newBulkValue(value)
}

func hdelCommand(args []Value) Value {
hash := args[0].bulk
hash := string(args[0].bulk)
keys := args[1:]

hmap, err := fetchMap(hash)
Expand All @@ -103,15 +82,15 @@ func hdelCommand(args []Value) Value {
}
var success int
for _, v := range keys {
if hmap.Remove(b2s(v.bulk)) {
if hmap.Remove(string(v.bulk)) {
success++
}
}
return newIntegerValue(success)
}

func hgetallCommand(args []Value) Value {
hash := args[0].bulk
hash := string(args[0].bulk)

hmap, err := fetchMap(hash)
if err != nil {
Expand All @@ -126,12 +105,12 @@ func hgetallCommand(args []Value) Value {
return newArrayValue(res)
}

func fetchMap(key []byte, setnx ...bool) (Map, error) {
func fetchMap(key string, setnx ...bool) (Map, error) {
return fetch(key, func() Map { return structx.NewMap() }, setnx...)
}

func fetch[T any](key []byte, new func() T, setnx ...bool) (v T, err error) {
item, ok := db.extras[b2s(key)]
func fetch[T any](key string, new func() T, setnx ...bool) (v T, err error) {
item, ok := db.extras[key]
if ok {
v, ok := item.(T)
if ok {
Expand All @@ -142,8 +121,7 @@ func fetch[T any](key []byte, new func() T, setnx ...bool) (v T, err error) {

v = new()
if len(setnx) > 0 && setnx[0] {
// here NEED to use copy of key
db.extras[string(key)] = v
db.extras[key] = v
}
return v, nil
}
7 changes: 2 additions & 5 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,8 @@ func TestHandler(t *testing.T) {
res, _ = rdb.Get(ctx, "foo").Result()
assert.Equal(res, "bar")

// expire
rdb.Set(ctx, "foo", "bar1", time.Second).Result()
time.Sleep(time.Second * 2)

res, _ = rdb.Get(ctx, "foo").Result()
res, err := rdb.Get(ctx, "none").Result()
assert.Equal(err, redis.Nil)
assert.Equal(res, "")
})

Expand Down
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ func main() {
if debug {
runDebug()
}

// register main aeLoop event
server.aeLoop.AddFileEvent(server.fd, AE_READABLE, AcceptHandler, nil)
// server.aeLoop.AddTimeEvent(AE_NORMAL, 100, ServerCron, nil)
if server.config.AppendOnly {
server.aeLoop.AddTimeEvent(AE_NORMAL, 1000, ServerCronFlush, nil)
}
logger.Debug().Msg("rotom server is ready to accept.")
server.aeLoop.AeMain()
}
Loading

0 comments on commit f579396

Please sign in to comment.