Skip to content

Commit

Permalink
Codev (#17)
Browse files Browse the repository at this point in the history
* add codev

* add test coverage

* add test coverage for set

* coverage set and bitmap

* fix dead lock when Set union itself

---------

Co-authored-by: guangzhixu <guangzhixu@deepglint.com>
  • Loading branch information
xgzlucario and satoshi-099 authored Oct 21, 2023
1 parent e22850f commit c8b9037
Show file tree
Hide file tree
Showing 10 changed files with 393 additions and 93 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/rotom.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Test and coverage

on: [push, pull_request]

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: 'stable'
- name: Run coverage
run: go test -race -coverprofile=coverage.txt -covermode=atomic
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4-beta
with:
token: ${{ secrets.CODECOV_TOKEN }}
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
rotom.db
*.db
coverage.html
coverage.*
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ run-gc:
GODEBUG=gctrace=1 go run main.go

test-cover:
go test -coverprofile=c.out
go test -race -coverprofile=c.out -covermode=atomic
go tool cover -html=c.out -o coverage.html
rm c.out
rm *.db
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *Client) Set(key string, val []byte) ([]byte, error) {

// SetEx
func (c *Client) SetEx(key string, val []byte, ttl time.Duration) ([]byte, error) {
return c.SetTx(key, val, cache.GetUnixNano()+int64(ttl))
return c.SetTx(key, val, cache.GetClock()+int64(ttl))
}

// SetTx
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/sakeven/RbTree v0.0.0-20220710124251-94e35f9fed6c
github.com/sourcegraph/conc v0.3.0
github.com/stretchr/testify v1.8.4
github.com/xgzlucario/GigaCache v0.0.0-20231015082606-97872112f8fe
github.com/xgzlucario/GigaCache v0.0.0-20231020075600-61001d347387
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/xgzlucario/GigaCache v0.0.0-20231015082606-97872112f8fe h1:A3t9xRF/MOx4wP+Zp59ICSH1Xk0z2TJcWmp8Meh9Q5k=
github.com/xgzlucario/GigaCache v0.0.0-20231015082606-97872112f8fe/go.mod h1:n0gu6svrq5UYwUWv8RRYgt06u8e5E3AMNg5eqflP74Y=
github.com/xgzlucario/GigaCache v0.0.0-20231020075600-61001d347387 h1:1B7fhItxnyPHVI95u1uyXL7DkqT7F22RAN+LdirTl0M=
github.com/xgzlucario/GigaCache v0.0.0-20231020075600-61001d347387/go.mod h1:n0gu6svrq5UYwUWv8RRYgt06u8e5E3AMNg5eqflP74Y=
github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A=
github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
Expand Down
79 changes: 49 additions & 30 deletions rotom.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,13 @@ type (
var (
// Default config for db
DefaultConfig = &Config{
Path: "rotom.db",
ShardCount: 1024,
SyncPolicy: base.EveryInterval,
SyncInterval: time.Second,
ShrinkInterval: time.Minute,
Logger: slog.Default(),
Path: "rotom.db",
ShardCount: 1024,
SyncPolicy: base.EveryInterval,
SyncInterval: time.Second,
ShrinkInterval: time.Minute,
RunSkipLoadError: true,
Logger: slog.Default(),
}

// No persistent config
Expand All @@ -170,6 +171,8 @@ type Config struct {
SyncInterval time.Duration // Job for db sync to disk.
ShrinkInterval time.Duration // Job for shrink db file size.

RunSkipLoadError bool // Starts when loading database file error.

Logger *slog.Logger // Logger for db, set <nil> if you don't want to use it.
}

Expand Down Expand Up @@ -197,6 +200,7 @@ func Open(conf *Config) (*Engine, error) {
// load db from disk.
if err := e.load(); err != nil {
e.logError("db load error: %v", err)
return nil, err
}

// runtime monitor.
Expand Down Expand Up @@ -289,7 +293,7 @@ func (e *Engine) Set(key string, val []byte) {

// SetEx store key-value pair with ttl.
func (e *Engine) SetEx(key string, val []byte, ttl time.Duration) {
e.SetTx(key, val, cache.GetUnixNano()+int64(ttl))
e.SetTx(key, val, cache.GetClock()+int64(ttl))
}

// SetTx store key-value pair with deadline.
Expand Down Expand Up @@ -370,7 +374,7 @@ func (e *Engine) HLen(key string) (int, error) {

// HSet
func (e *Engine) HSet(key, field string, val []byte) error {
m, err := e.fetchMap(key)
m, err := e.fetchMap(key, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -403,7 +407,7 @@ func (e *Engine) HKeys(key string) ([]string, error) {

// SAdd
func (e *Engine) SAdd(key string, item string) error {
s, err := e.fetchSet(key)
s, err := e.fetchSet(key, true)
if err != nil {
return err
}
Expand All @@ -414,15 +418,14 @@ func (e *Engine) SAdd(key string, item string) error {
}

// SRemove
func (e *Engine) SRemove(key string, item string) error {
func (e *Engine) SRemove(key string, item string) (bool, error) {
s, err := e.fetchSet(key)
if err != nil {
return err
return false, err
}
e.encode(NewCodec(OpSRemove).Str(key).Str(item))
s.Remove(item)

return nil
return s.Remove(item), nil
}

// SHas
Expand All @@ -443,6 +446,15 @@ func (e *Engine) SCard(key string) (int, error) {
return s.Len(), nil
}

// SMembers
func (e *Engine) SMembers(key string) ([]string, error) {
s, err := e.fetchSet(key)
if err != nil {
return nil, err
}
return s.ToSlice(), nil
}

// SUnion
func (e *Engine) SUnion(key1, key2, dest string) error {
s1, err := e.fetchSet(key1)
Expand Down Expand Up @@ -514,7 +526,7 @@ func (e *Engine) SDiff(key1, key2, dest string) error {

// LPush
func (e *Engine) LPush(key, item string) error {
ls, err := e.fetchList(key)
ls, err := e.fetchList(key, true)
if err != nil {
return err
}
Expand All @@ -526,7 +538,7 @@ func (e *Engine) LPush(key, item string) error {

// RPush
func (e *Engine) RPush(key, item string) error {
ls, err := e.fetchList(key)
ls, err := e.fetchList(key, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -586,7 +598,7 @@ func (e *Engine) BitTest(key string, offset uint32) (bool, error) {

// BitSet
func (e *Engine) BitSet(key string, offset uint32, val bool) (bool, error) {
bm, err := e.fetchBitMap(key)
bm, err := e.fetchBitMap(key, true)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -771,6 +783,12 @@ func (e *Engine) load() error {
// <OP><argsNum><args...>
for len(line) > 2 {
op := Operation(line[0])

// if operation valid
if int(op) >= len(cmdTable) {
return base.ErrParseRecordLine
}

argsNum := cmdTable[op].ArgsNum
line = line[1:]

Expand All @@ -788,7 +806,7 @@ func (e *Engine) load() error {

case OpSetTx: // type, key, ts, val
ts := base.ParseInt[int64](args[2]) * timeCarry
if ts < cache.GetUnixNano() && ts != NoTTL {
if ts < cache.GetClock() && ts != NoTTL {
continue
}

Expand Down Expand Up @@ -1048,42 +1066,42 @@ func parseLine(line []byte, argsNum int) ([][]byte, []byte, error) {
}

// fetchMap
func (e *Engine) fetchMap(key string) (m Map, err error) {
func (e *Engine) fetchMap(key string, setWhenNotExist ...bool) (m Map, err error) {
return fetch(e, key, func() Map {
return structx.NewSyncMap[string, []byte]()
})
}, setWhenNotExist...)
}

// fetchSet
func (e *Engine) fetchSet(key string) (s Set, err error) {
func (e *Engine) fetchSet(key string, setWhenNotExist ...bool) (s Set, err error) {
return fetch(e, key, func() Set {
return structx.NewSet[string]()
})
}, setWhenNotExist...)
}

// fetchList
func (e *Engine) fetchList(key string) (m List, err error) {
func (e *Engine) fetchList(key string, setWhenNotExist ...bool) (m List, err error) {
return fetch(e, key, func() List {
return structx.NewList[string]()
})
}, setWhenNotExist...)
}

// fetchBitMap
func (e *Engine) fetchBitMap(key string) (bm BitMap, err error) {
func (e *Engine) fetchBitMap(key string, setWhenNotExist ...bool) (bm BitMap, err error) {
return fetch(e, key, func() BitMap {
return structx.NewBitmap()
})
}, setWhenNotExist...)
}

// fetchZSet
func (e *Engine) fetchZSet(key string) (z ZSet, err error) {
func (e *Engine) fetchZSet(key string, setWhenNotExist ...bool) (z ZSet, err error) {
return fetch(e, key, func() ZSet {
return structx.NewZSet[string, float64, []byte]()
})
}, setWhenNotExist...)
}

// fetch
func fetch[T any](e *Engine, key string, new func() T) (T, error) {
func fetch[T any](e *Engine, key string, new func() T, setWhenNotExist ...bool) (T, error) {
m, _, ok := e.m.Get(key)
if ok {
m, ok := m.(T)
Expand All @@ -1093,9 +1111,10 @@ func fetch[T any](e *Engine, key string, new func() T) (T, error) {
var v T
return v, base.ErrWrongType
}

vptr := new()
e.m.Set(key, vptr)
if len(setWhenNotExist) > 0 && setWhenNotExist[0] {
e.m.Set(key, vptr)
}

return vptr, nil
}
Expand Down
Loading

0 comments on commit c8b9037

Please sign in to comment.