Skip to content

Commit

Permalink
Move Closer from y to z (#1498)
Browse files Browse the repository at this point in the history
The `Closer` type was moved by badger/y package to ristretto/z via dgraph-io/ristretto#191
  • Loading branch information
Ibrahim Jarif committed Oct 2, 2020
1 parent ee0a9e2 commit 6b8322e
Show file tree
Hide file tree
Showing 17 changed files with 65 additions and 147 deletions.
9 changes: 5 additions & 4 deletions badger/cmd/read_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
)

var readBenchCmd = &cobra.Command{
Expand Down Expand Up @@ -84,7 +85,7 @@ func fullScanDB(db *badger.DB) {

startTime = time.Now()
// Print the stats
c := y.NewCloser(0)
c := z.NewCloser(0)
c.AddRunning(1)
go printStats(c)

Expand Down Expand Up @@ -142,7 +143,7 @@ func readBench(cmd *cobra.Command, args []string) error {
fmt.Println("DB is empty, hence returning")
return nil
}
c := y.NewCloser(0)
c := z.NewCloser(0)
startTime = time.Now()
for i := 0; i < numGoroutines; i++ {
c.AddRunning(1)
Expand All @@ -159,7 +160,7 @@ func readBench(cmd *cobra.Command, args []string) error {
return nil
}

func printStats(c *y.Closer) {
func printStats(c *z.Closer) {
defer c.Done()

t := time.NewTicker(time.Second)
Expand All @@ -181,7 +182,7 @@ func printStats(c *y.Closer) {
}
}

func readKeys(db *badger.DB, c *y.Closer, keys [][]byte) {
func readKeys(db *badger.DB, c *z.Closer, keys [][]byte) {
defer c.Done()
r := rand.New(rand.NewSource(time.Now().Unix()))
for {
Expand Down
11 changes: 6 additions & 5 deletions badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
)

var writeBenchCmd = &cobra.Command{
Expand Down Expand Up @@ -272,7 +273,7 @@ func writeBench(cmd *cobra.Command, args []string) error {

startTime = time.Now()
num := uint64(numKeys * mil)
c := y.NewCloser(4)
c := z.NewCloser(4)
go reportStats(c, db)
go dropAll(c, db)
go dropPrefix(c, db)
Expand Down Expand Up @@ -324,7 +325,7 @@ func showKeysStats(db *badger.DB) {
moveKeyCount, internalKeyCount)
}

func reportStats(c *y.Closer, db *badger.DB) {
func reportStats(c *z.Closer, db *badger.DB) {
defer c.Done()

t := time.NewTicker(time.Second)
Expand Down Expand Up @@ -374,7 +375,7 @@ func reportStats(c *y.Closer, db *badger.DB) {
}
}

func runGC(c *y.Closer, db *badger.DB) {
func runGC(c *z.Closer, db *badger.DB) {
defer c.Done()
period, err := time.ParseDuration(gcPeriod)
y.Check(err)
Expand All @@ -398,7 +399,7 @@ func runGC(c *y.Closer, db *badger.DB) {
}
}

func dropAll(c *y.Closer, db *badger.DB) {
func dropAll(c *z.Closer, db *badger.DB) {
defer c.Done()
dropPeriod, err := time.ParseDuration(dropAllPeriod)
y.Check(err)
Expand Down Expand Up @@ -429,7 +430,7 @@ func dropAll(c *y.Closer, db *badger.DB) {
}
}

func dropPrefix(c *y.Closer, db *badger.DB) {
func dropPrefix(c *z.Closer, db *badger.DB) {
defer c.Done()
dropPeriod, err := time.ParseDuration(dropPrefixPeriod)
y.Check(err)
Expand Down
41 changes: 21 additions & 20 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/dgraph-io/badger/v2/table"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto"
"github.com/dgraph-io/ristretto/z"
humanize "github.com/dustin/go-humanize"
"github.com/pkg/errors"
)
Expand All @@ -50,12 +51,12 @@ var (
)

type closers struct {
updateSize *y.Closer
compactors *y.Closer
memtable *y.Closer
writes *y.Closer
valueGC *y.Closer
pub *y.Closer
updateSize *z.Closer
compactors *z.Closer
memtable *z.Closer
writes *z.Closer
valueGC *z.Closer
pub *z.Closer
}

// DB provides the various functions required to interact with Badger.
Expand Down Expand Up @@ -358,7 +359,7 @@ func Open(opt Options) (db *DB, err error) {
return db, err
}
db.calculateSize()
db.closers.updateSize = y.NewCloser(1)
db.closers.updateSize = z.NewCloser(1)
go db.updateSize(db.closers.updateSize)
db.mt = skl.NewSkiplist(arenaSize(opt))

Expand All @@ -371,10 +372,10 @@ func Open(opt Options) (db *DB, err error) {
db.vlog.init(db)

if !opt.ReadOnly {
db.closers.compactors = y.NewCloser(1)
db.closers.compactors = z.NewCloser(1)
db.lc.startCompact(db.closers.compactors)

db.closers.memtable = y.NewCloser(1)
db.closers.memtable = z.NewCloser(1)
go func() {
_ = db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
}()
Expand All @@ -392,7 +393,7 @@ func Open(opt Options) (db *DB, err error) {
vptr.Decode(vs.Value)
}

replayCloser := y.NewCloser(1)
replayCloser := z.NewCloser(1)
go db.doWrites(replayCloser)

if err = db.vlog.open(db, vptr, db.replayFunction()); err != nil {
Expand All @@ -409,15 +410,15 @@ func Open(opt Options) (db *DB, err error) {
db.orc.readMark.Done(db.orc.nextTxnTs)
db.orc.incrementNextTs()

db.closers.writes = y.NewCloser(1)
db.closers.writes = z.NewCloser(1)
go db.doWrites(db.closers.writes)

if !db.opt.InMemory {
db.closers.valueGC = y.NewCloser(1)
db.closers.valueGC = z.NewCloser(1)
go db.vlog.waitOnGC(db.closers.valueGC)
}

db.closers.pub = y.NewCloser(1)
db.closers.pub = z.NewCloser(1)
go db.pub.listenForUpdates(db.closers.pub)

valueDirLockGuard = nil
Expand Down Expand Up @@ -843,7 +844,7 @@ func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
return req, nil
}

func (db *DB) doWrites(lc *y.Closer) {
func (db *DB) doWrites(lc *z.Closer) {
defer lc.Done()
pendingCh := make(chan struct{}, 1)

Expand Down Expand Up @@ -1104,7 +1105,7 @@ func (db *DB) handleFlushTask(ft flushTask) error {

// flushMemtable must keep running until we send it an empty flushTask. If there
// are errors during handling the flush task, we'll retry indefinitely.
func (db *DB) flushMemtable(lc *y.Closer) error {
func (db *DB) flushMemtable(lc *z.Closer) error {
defer lc.Done()

for ft := range db.flushChan {
Expand Down Expand Up @@ -1190,7 +1191,7 @@ func (db *DB) calculateSize() {
y.VlogSize.Set(db.opt.ValueDir, newInt(vlogSize))
}

func (db *DB) updateSize(lc *y.Closer) {
func (db *DB) updateSize(lc *z.Closer) {
defer lc.Done()
if db.opt.InMemory {
return
Expand Down Expand Up @@ -1449,7 +1450,7 @@ func (db *DB) stopCompactions() {
func (db *DB) startCompactions() {
// Resume compactions.
if db.closers.compactors != nil {
db.closers.compactors = y.NewCloser(1)
db.closers.compactors = z.NewCloser(1)
db.lc.startCompact(db.closers.compactors)
}
}
Expand All @@ -1458,7 +1459,7 @@ func (db *DB) startMemoryFlush() {
// Start memory fluhser.
if db.closers.memtable != nil {
db.flushChan = make(chan flushTask, db.opt.NumMemtables)
db.closers.memtable = y.NewCloser(1)
db.closers.memtable = z.NewCloser(1)
go func() {
_ = db.flushMemtable(db.closers.memtable)
}()
Expand Down Expand Up @@ -1549,7 +1550,7 @@ func (db *DB) blockWrite() error {
}

func (db *DB) unblockWrite() {
db.closers.writes = y.NewCloser(1)
db.closers.writes = z.NewCloser(1)
go db.doWrites(db.closers.writes)

// Resume writes.
Expand Down Expand Up @@ -1717,7 +1718,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, prefixes
return ErrNilCallback
}

c := y.NewCloser(1)
c := z.NewCloser(1)
recvCh, id := db.pub.newSubscriber(c, prefixes...)
slurp := func(batch *pb.KVList) error {
for {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20200820164438-623d8ef1614b
github.com/dgraph-io/ristretto v0.0.4-0.20200906165740-41ebdbffecfd
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20200820164438-623d8ef1614b h1:/g8jOqvD1UzHTOwENtkqcLmMLzTcN18P3ut8aSUZ45g=
github.com/dgraph-io/ristretto v0.0.4-0.20200820164438-623d8ef1614b/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.0.4-0.20200906165740-41ebdbffecfd h1:KoJOtZf+6wpQaDTuOWGuo61GxcPBIfhwRxRTaTWGCTc=
github.com/dgraph-io/ristretto v0.0.4-0.20200906165740-41ebdbffecfd/go.mod h1:YylP9MpCYGVZQrly/j/diqcdUetCRRePeBB0c2VGXsA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
3 changes: 2 additions & 1 deletion integration/testgc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
)

var maxValue int64 = 10000000
Expand Down Expand Up @@ -115,7 +116,7 @@ func main() {
_ = http.ListenAndServe("localhost:8080", nil)
}()

closer := y.NewCloser(11)
closer := z.NewCloser(11)
go func() {
// Run value log GC.
defer closer.Done()
Expand Down
5 changes: 3 additions & 2 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/table"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -362,7 +363,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
return nil
}

func (s *levelsController) startCompact(lc *y.Closer) {
func (s *levelsController) startCompact(lc *z.Closer) {
n := s.kv.opt.NumCompactors
lc.AddRunning(n - 1)
for i := 0; i < n; i++ {
Expand All @@ -372,7 +373,7 @@ func (s *levelsController) startCompact(lc *y.Closer) {
}
}

func (s *levelsController) runCompactor(id int, lc *y.Closer) {
func (s *levelsController) runCompactor(id int, lc *z.Closer) {
defer lc.Done()

randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
Expand Down
5 changes: 3 additions & 2 deletions managed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -320,7 +321,7 @@ func TestDropAllRace(t *testing.T) {

N := 10000
// Start a goroutine to keep trying to write to DB while DropAll happens.
closer := y.NewCloser(1)
closer := z.NewCloser(1)
go func() {
defer closer.Done()
ticker := time.NewTicker(time.Millisecond)
Expand Down Expand Up @@ -540,7 +541,7 @@ func TestDropPrefixRace(t *testing.T) {

N := 10000
// Start a goroutine to keep trying to write to DB while DropPrefix happens.
closer := y.NewCloser(1)
closer := z.NewCloser(1)
go func() {
defer closer.Done()
ticker := time.NewTicker(time.Millisecond)
Expand Down
5 changes: 3 additions & 2 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
"github.com/pkg/errors"
)

Expand All @@ -30,7 +31,7 @@ type MergeOperator struct {
f MergeFunc
db *DB
key []byte
closer *y.Closer
closer *z.Closer
}

// MergeFunc accepts two byte slices, one representing an existing value, and
Expand All @@ -49,7 +50,7 @@ func (db *DB) GetMergeOperator(key []byte,
f: f,
db: db,
key: key,
closer: y.NewCloser(1),
closer: z.NewCloser(1),
}

go op.runCompactions(dur)
Expand Down
7 changes: 4 additions & 3 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/trie"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"
)

type subscriber struct {
prefixes [][]byte
sendCh chan<- *pb.KVList
subCloser *y.Closer
subCloser *z.Closer
}

type publisher struct {
Expand All @@ -47,7 +48,7 @@ func newPublisher() *publisher {
}
}

func (p *publisher) listenForUpdates(c *y.Closer) {
func (p *publisher) listenForUpdates(c *z.Closer) {
defer func() {
p.cleanSubscribers()
c.Done()
Expand Down Expand Up @@ -108,7 +109,7 @@ func (p *publisher) publishUpdates(reqs requests) {
}
}

func (p *publisher) newSubscriber(c *y.Closer, prefixes ...[]byte) (<-chan *pb.KVList, uint64) {
func (p *publisher) newSubscriber(c *z.Closer, prefixes ...[]byte) (<-chan *pb.KVList, uint64) {
p.Lock()
defer p.Unlock()
ch := make(chan *pb.KVList, 1000)
Expand Down
Loading

0 comments on commit 6b8322e

Please sign in to comment.