Skip to content

Commit

Permalink
Merge pull request #83 from yowcow/fix-nsstorage-get
Browse files Browse the repository at this point in the history
fix RLock in bolt NSStorage `GetNS()`
  • Loading branch information
yowcow authored Jul 24, 2018
2 parents bfdbf61 + b8a576e commit e94317a
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cmd/heavy-load/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
/main
/stress-*
/_watcher
/_storage
12 changes: 8 additions & 4 deletions cmd/heavy-load/Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
all: _watcher _storage main
ARTIFACTS = stress-bolt stress-bolt-ns

all: _watcher _storage $(ARTIFACTS)

_%:
mkdir -p $@

main:
go build -o $@
stress-%:
go build -o $@ ./$*

clean:
rm -rf main
rm -rf $(ARTIFACTS)

realclean: clean
rm -rf _watcher _storage

.PHONY: all clean realclean
160 changes: 160 additions & 0 deletions cmd/heavy-load/bolt-ns/bolt-ns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package main

import (
"bytes"
"context"
"flag"
"io"
"log"
"os"
"sync"
"time"

"github.com/yowcow/goromdb/handler/simplehandler"
"github.com/yowcow/goromdb/loader"
"github.com/yowcow/goromdb/storage/boltstorage"
"github.com/yowcow/goromdb/watcher"
)

var (
concurrency int
duration int
help bool
logger *log.Logger

bucket = "goromdb"
watcherFile = "_watcher/data.db"
storagePath = "_storage"

sourceDataFile = "../../data/store/sample-boltdb.db"
sourceMD5File = "../../data/store/sample-boltdb.db.md5"
)

func init() {
flag.IntVar(&concurrency, "c", 1, "concurrency")
flag.IntVar(&duration, "d", 1, "duration in seconds")
flag.BoolVar(&help, "h", false, "show help")
flag.Parse()

if help {
flag.Usage()
os.Exit(0)
}
}

func init() {
logger = log.New(os.Stdout, "", log.Ldate|log.Ltime)
}

func init() {
if _, err := os.Stat(sourceDataFile); os.IsNotExist(err) {
logger.Println(sourceDataFile, "is not found")
os.Exit(1)
}
if _, err := os.Stat(sourceMD5File); os.IsNotExist(err) {
logger.Println(sourceMD5File, "is not found")
os.Exit(2)
}
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(duration)*time.Second+3*time.Second)
defer cancel()

// create watcher
wcr := watcher.New(watcherFile, 1, logger)
filein := wcr.Start(ctx)

// create storage
//stg := boltstorage.New(bucket)
stg := boltstorage.NewNS()

// create loader
ldr, err := loader.New(storagePath, "data.db")
if err != nil {
panic(err)
}

// create handler
hdr := simplehandler.NewNS(stg, logger)

var wg sync.WaitGroup

// start goromdb handler
wg.Add(1)
go func(w *sync.WaitGroup) {
defer w.Done()
done := hdr.Start(filein, ldr)
<-done
}(&wg)

// start infinite file loading
wg.Add(1)
go func(w *sync.WaitGroup) {
defer w.Done()
tc := time.NewTicker(500 * time.Millisecond)
for {
// copy data.db
if _, err := os.Stat(watcherFile); os.IsNotExist(err) {
if r, err := os.Open(sourceDataFile); err == nil {
if w, err := os.OpenFile(watcherFile+".tmp", os.O_WRONLY|os.O_CREATE, 0644); err == nil {
io.Copy(w, r)
w.Close()
}
r.Close()
}
os.Rename(watcherFile+".tmp", watcherFile)
}
// copy data.db.md5
if _, err := os.Stat(watcherFile + ".md5"); os.IsNotExist(err) {
if r, err := os.Open(sourceMD5File); err == nil {
if w, err := os.OpenFile(watcherFile+".md5.tmp", os.O_WRONLY|os.O_CREATE, 0644); err == nil {
io.Copy(w, r)
w.Close()
}
r.Close()
}
os.Rename(watcherFile+".md5.tmp", watcherFile+".md5")
}

select {
case <-tc.C:
case <-ctx.Done():
tc.Stop()
return
}
}
}(&wg)

time.Sleep(3 * time.Second) // wait 3 secs

// start infinite `Get` calls
f := func(id int, w *sync.WaitGroup, c context.Context, l *log.Logger) {
defer w.Done()
for {
_, err := hdr.GetNS([]byte(bucket), []byte("hoge"))
if err != nil {
l.Println("worker", id, "got error:", err)
}

select {
case <-c.Done():
return
default:
}
}
}

logbuf := new(bytes.Buffer)
l := log.New(logbuf, "", log.Ldate|log.Ltime)
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go f(i+1, &wg, ctx, l)
}

// wait for everybody
wg.Wait()

logger.Println("===== errors during `GetNS()` calls =====")
io.WriteString(os.Stdout, logbuf.String())
}
File renamed without changes.
6 changes: 3 additions & 3 deletions storage/boltstorage/nsstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ func (s *NSStorage) GetNS(ns, key []byte) ([]byte, error) {
return nil, storage.InternalError("please specify bucket")
}

s.mux.RLock()
defer s.mux.RUnlock()

db := s.getDB()
if db == nil {
return nil, storage.InternalError("couldn't load db")
}

s.mux.RLock()
defer s.mux.RUnlock()

return getFromBucket(db, ns, key)
}

0 comments on commit e94317a

Please sign in to comment.