Skip to content

Commit

Permalink
add: Introducing the redis ae loop
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Jun 7, 2024
1 parent fa68440 commit 4977439
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 126 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

实现特性:

1. 基于 single epoll server 的网络 IO 框架([1m-go-tcp-server](https://github.com/smallnest/1m-go-tcp-server)
1. 基于 unix 编程实现的基于 epoll 的无锁 AeLoop 事件循环机制([参考godis](https://github.com/archeryue/godis)
2. 兼容 Redis RESP 协议,你可以使用任何 redis 客户端连接 rotom
3. DB hashmap 基于 [GigaCache](https://github.com/xgzlucario/GigaCache)
4. AOF 支持
Expand Down
39 changes: 12 additions & 27 deletions ae.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package main

import (
"log"
"net"
"reflect"
"time"

"golang.org/x/sys/unix"
Expand Down Expand Up @@ -85,16 +82,16 @@ func (loop *AeLoop) AddFileEvent(fd int, mask FeType, proc FileProc, extra inter
ev |= fe2ep[mask]
err := unix.EpollCtl(loop.fileEventFd, op, fd, &unix.EpollEvent{Fd: int32(fd), Events: ev})
if err != nil {
log.Printf("epoll ctr err: %v\n", err)
logger.Error().Msgf("epoll ctl error: %v", err)
return
}
// ae ctl
var fe AeFileEvent
fe.fd = fd
fe.mask = mask
fe.proc = proc
fe.extra = extra
loop.FileEvents[getFeKey(fd, mask)] = &fe
loop.FileEvents[getFeKey(fd, mask)] = &AeFileEvent{
fd: fd,
mask: mask,
proc: proc,
extra: extra,
}
}

func (loop *AeLoop) RemoveFileEvent(fd int, mask FeType) {
Expand All @@ -107,7 +104,7 @@ func (loop *AeLoop) RemoveFileEvent(fd int, mask FeType) {
}
err := unix.EpollCtl(loop.fileEventFd, op, fd, &unix.EpollEvent{Fd: int32(fd), Events: ev})
if err != nil {
log.Printf("epoll del err: %v\n", err)
logger.Error().Msgf("epoll del error: %v", err)
}
// ae ctl
loop.FileEvents[getFeKey(fd, mask)] = nil
Expand Down Expand Up @@ -183,7 +180,7 @@ func (loop *AeLoop) AeWait() (tes []*AeTimeEvent, fes []*AeFileEvent) {
var events [128]unix.EpollEvent
n, err := unix.EpollWait(loop.fileEventFd, events[:], int(timeout))
if err != nil {
log.Printf("epoll wait warnning: %v\n", err)
logger.Error().Msgf("epoll wait error: %v", err)
}
// collect file events
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -221,25 +218,13 @@ func (loop *AeLoop) AeProcess(tes []*AeTimeEvent, fes []*AeFileEvent) {
te.when = GetMsTime() + te.interval
}
}
if len(fes) > 0 {
// log.Println("ae is processing file events")
for _, fe := range fes {
fe.proc(loop, fe.fd, fe.extra)
}
for _, fe := range fes {
fe.proc(loop, fe.fd, fe.extra)
}
}

func (loop *AeLoop) AeMain() {
for {
tes, fes := loop.AeWait()
loop.AeProcess(tes, fes)
loop.AeProcess(loop.AeWait())
}
}

func socketFD(conn net.Conn) int {
tcpConn := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn")
fdVal := tcpConn.FieldByName("fd")
pfdVal := reflect.Indirect(fdVal).FieldByName("pfd")

return int(pfdVal.FieldByName("Sysfd").Int())
}
4 changes: 2 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"encoding/json"
"log"
"os"
)

Expand All @@ -17,7 +16,8 @@ func LoadConfig(path string) (config *Config, err error) {
if err != nil {
return
}
log.Printf("read config file: %s", jsonStr)

logger.Debug().Msgf("read config file: %s", jsonStr)

config = &Config{}
if err = json.Unmarshal(jsonStr, config); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/RoaringBitmap/roaring v1.9.4
github.com/deckarep/golang-set/v2 v2.6.0
github.com/redis/go-redis/v9 v9.5.2
github.com/rs/zerolog v1.33.0
github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980
github.com/stretchr/testify v1.9.0
github.com/tidwall/mmap v0.3.0
Expand All @@ -21,6 +22,8 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 // indirect
Expand Down
15 changes: 15 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand All @@ -19,18 +20,29 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.5.2 h1:L0L3fcSNReTRGyZ6AqAEN0K56wYeYAwapBIhkvh0f3E=
github.com/redis/go-redis/v9 v9.5.2/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980 h1:t5uAkycj8WepkboiZvJzHB+FvkNj+P6Z2dEN4pFajU4=
github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980/go.mod h1:zwEumjdcK6Q/ky/gFPqMviw1p7ZUb+B3pU4ybgOHvns=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand All @@ -46,6 +58,9 @@ github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b/go.mod h1:1Zg
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 h1:LoYXNGAShUG3m/ehNk4iFctuhGX/+R1ZpfJ4/ia80JM=
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
22 changes: 16 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@ package main

import (
"flag"
"log"
"net/http"
_ "net/http/pprof"
"os"
"time"

"github.com/rs/zerolog"
)

var logger = zerolog.
New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.DateTime}).
Level(zerolog.TraceLevel).
With().
Timestamp().
Logger()

func runDebug() {
go http.ListenAndServe(":6060", nil)
}
Expand All @@ -19,23 +29,23 @@ func main() {
flag.BoolVar(&debug, "debug", false, "run with debug mode.")
flag.Parse()

log.Printf("cmd arguments: config=%s, debug=%v", path, debug)
logger.Debug().Str("config", path).Bool("debug", debug).Msg("read cmd arguments")

config, err := LoadConfig(path)
if err != nil {
log.Panicf("load config error: %v\n", err)
logger.Error().Msgf("load config error: %v", err)
}
if err = initServer(config); err != nil {
log.Panicf("init server error: %v\n", err)
logger.Error().Msgf("init server error: %v", err)
}
if err = InitDB(config); err != nil {
log.Panicf("init db error: %v\n", err)
logger.Error().Msgf("init db error: %v", err)
}
if debug {
runDebug()
}
server.aeLoop.AddFileEvent(server.fd, AE_READABLE, AcceptHandler, nil)
// server.aeLoop.AddTimeEvent(AE_NORMAL, 100, ServerCron, nil)
log.Println("rotom server is up.")
logger.Debug().Msg("rotom server is ready to accept.")
server.aeLoop.AeMain()
}
41 changes: 5 additions & 36 deletions net.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,16 @@
package main

import (
"log"

"golang.org/x/sys/unix"
)

const BACKLOG int = 64

func Accept(fd int) (int, error) {
nfd, _, err := unix.Accept(fd)
// ignore client addr for now
return nfd, err
}

func Connect(host [4]byte, port int) (int, error) {
s, err := unix.Socket(unix.AF_INET, unix.SOCK_STREAM, 0)
if err != nil {
log.Printf("init socket err: %v\n", err)
return -1, err
}
var addr unix.SockaddrInet4
addr.Addr = host
addr.Port = port
err = unix.Connect(s, &addr)
if err != nil {
log.Printf("connect err: %v\n", err)
return -1, err
}
return s, nil
}

func Read(fd int, buf []byte) (int, error) {
return unix.Read(fd, buf)
}
Expand All @@ -46,30 +26,19 @@ func Close(fd int) {
func TcpServer(port int) (int, error) {
s, err := unix.Socket(unix.AF_INET, unix.SOCK_STREAM, 0)
if err != nil {
log.Printf("init socket err: %v\n", err)
return -1, nil
return -1, err
}
err = unix.SetsockoptInt(s, unix.SOL_SOCKET, unix.SO_REUSEPORT, port)
if err != nil {
log.Printf("set SO_REUSEPORT err: %v\n", err)
unix.Close(s)
return -1, nil
return -1, err
}
var addr unix.SockaddrInet4
// golang.syscall will handle htons
addr.Port = port
// golang will set addr.Addr = any(0)
err = unix.Bind(s, &addr)
err = unix.Bind(s, &unix.SockaddrInet4{Port: port})
if err != nil {
log.Printf("bind addr err: %v\n", err)
unix.Close(s)
return -1, nil
return -1, err
}
err = unix.Listen(s, BACKLOG)
if err != nil {
log.Printf("listen socket err: %v\n", err)
unix.Close(s)
return -1, nil
return -1, err
}
return s, nil
}
Loading

0 comments on commit 4977439

Please sign in to comment.