Skip to content

Commit

Permalink
stop compaction after flushing the memtable and don't close the chann…
Browse files Browse the repository at this point in the history
…el while closing the doWrite (#918)

* close channel while closing the db

* stop compaction after flushing the memtable
Signed-off-by: பாலாஜி ஜின்னா <balaji@dgraph.io>
  • Loading branch information
poonai authored Jul 12, 2019
1 parent 1496af9 commit 2b39009
Showing 1 changed file with 72 additions and 34 deletions.
106 changes: 72 additions & 34 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"expvar"
"io"
"math"
Expand Down Expand Up @@ -375,6 +374,9 @@ func (db *DB) close() (err error) {
// Stop writes next.
db.closers.writes.SignalAndWait()

// Don't accept any more write.
close(db.writeCh)

db.closers.pub.SignalAndWait()

// Now close the value log.
Expand Down Expand Up @@ -413,6 +415,7 @@ func (db *DB) close() (err error) {
time.Sleep(10 * time.Millisecond)
}
}
db.stopMemoryFlush()
db.stopCompactions()

// Force Compact L0
Expand Down Expand Up @@ -741,15 +744,19 @@ func (db *DB) doWrites(lc *y.Closer) {
}

closedCase:
close(db.writeCh)
for r := range db.writeCh { // Flush the channel.
reqs = append(reqs, r)
// All the pending request are drained.
// Don't close the writeCh, because it has be used in several places.
for {
select {
case r = <-db.writeCh:
reqs = append(reqs, r)
default:
pendingCh <- struct{}{} // Push to pending before doing a write.
writeRequests(reqs)
return
}
}

pendingCh <- struct{}{} // Push to pending before doing a write.
writeRequests(reqs)
return

writeCase:
go writeRequests(reqs)
reqs = make([]*request, 0, 10)
Expand Down Expand Up @@ -1210,12 +1217,15 @@ func (db *DB) MaxBatchSize() int64 {
return db.opt.maxBatchSize
}

func (db *DB) stopCompactions() {
func (db *DB) stopMemoryFlush() {
// Stop memtable flushes.
if db.closers.memtable != nil {
close(db.flushChan)
db.closers.memtable.SignalAndWait()
}
}

func (db *DB) stopCompactions() {
// Stop compactions.
if db.closers.compactors != nil {
db.closers.compactors.SignalAndWait()
Expand All @@ -1228,6 +1238,10 @@ func (db *DB) startCompactions() {
db.closers.compactors = y.NewCloser(1)
db.lc.startCompact(db.closers.compactors)
}
}

func (db *DB) startMemoryFlush() {
// Start memory fluhser.
if db.closers.memtable != nil {
db.flushChan = make(chan flushTask, db.opt.NumMemtables)
db.closers.memtable = y.NewCloser(1)
Expand Down Expand Up @@ -1308,29 +1322,47 @@ func (db *DB) Flatten(workers int) error {
}
}

func (db *DB) prepareToDrop() func() {
if db.opt.ReadOnly {
panic("Attempting to drop data in read-only mode.")
}
func (db *DB) blockWrite() {
// Stop accepting new writes.
atomic.StoreInt32(&db.blockWrites, 1)

// Make all pending writes finish. The following will also close writeCh.
db.closers.writes.SignalAndWait()
db.opt.Infof("Writes flushed. Stopping compactions now...")
}

// Stop all compactions.
db.stopCompactions()
return func() {
db.opt.Infof("Resuming writes")
db.startCompactions()
func (db *DB) unblockWrite() {
db.closers.writes = y.NewCloser(1)
go db.doWrites(db.closers.writes)

db.writeCh = make(chan *request, kvWriteChCapacity)
db.closers.writes = y.NewCloser(1)
go db.doWrites(db.closers.writes)
// Resume writes.
atomic.StoreInt32(&db.blockWrites, 0)
}

// Resume writes.
atomic.StoreInt32(&db.blockWrites, 0)
func (db *DB) prepareToDrop() func() {
if db.opt.ReadOnly {
panic("Attempting to drop data in read-only mode.")
}
// In order prepare for drop, we need to block the incoming writes and
// write it to db. Then, flush all the pending flushtask. So that, we
// don't miss any entries.
db.blockWrite()
reqs := make([]*request, 0, 10)
for {
select {
case r := <-db.writeCh:
reqs = append(reqs, r)
default:
if err := db.writeRequests(reqs); err != nil {
db.opt.Errorf("writeRequests: %v", err)
}
db.stopMemoryFlush()
return func() {
db.opt.Infof("Resuming writes")
db.startMemoryFlush()
db.unblockWrite()
}
}
}
}

Expand All @@ -1347,20 +1379,24 @@ func (db *DB) prepareToDrop() func() {
// writes are paused before running DropAll, and resumed after it is finished.
func (db *DB) DropAll() error {
f, err := db.dropAll()
defer f()
if err != nil {
return err
}
if f == nil {
panic("both error and returned function cannot be nil in DropAll")
}
f()
return nil
}

func (db *DB) dropAll() (func(), error) {
db.opt.Infof("DropAll called. Blocking writes...")
f := db.prepareToDrop()

// prepareToDrop will stop all the incomming write and flushes any pending flush tasks.
// Before we drop, we'll stop the compaction because anyways all the datas are going to
// be deleted.
db.stopCompactions()
resume := func() {
db.startCompactions()
f()
}
// Block all foreign interactions with memory tables.
db.Lock()
defer db.Unlock()
Expand All @@ -1375,34 +1411,34 @@ func (db *DB) dropAll() (func(), error) {

num, err := db.lc.dropTree()
if err != nil {
return nil, err
return resume, err
}
db.opt.Infof("Deleted %d SSTables. Now deleting value logs...\n", num)

num, err = db.vlog.dropAll()
if err != nil {
return nil, err
return resume, err
}
db.vhead = valuePointer{} // Zero it out.
db.lc.nextFileID = 1
db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
return f, nil
return resume, nil
}

// DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
// - Stop accepting new writes.
// - Stop memtable flushes and compactions.
// - Stop memtable flushes before aquiring lock. Because we're acquring lock here
// and memtable flush stalls for lock, which leads to deadlock
// - Flush out all memtables, skipping over keys with the given prefix, Kp.
// - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
// back after a restart.
// - Stop compaction.
// - Compact L0->L1, skipping over Kp.
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefix(prefix []byte) error {
db.opt.Infof("DropPrefix called on %s. Blocking writes...", hex.Dump(prefix))
f := db.prepareToDrop()
defer f()

// Block all foreign interactions with memory tables.
db.Lock()
defer db.Unlock()
Expand All @@ -1426,6 +1462,8 @@ func (db *DB) DropPrefix(prefix []byte) error {
}
memtable.DecrRef()
}
db.stopCompactions()
defer db.startCompactions()
db.imm = db.imm[:0]
db.mt = skl.NewSkiplist(arenaSize(db.opt))

Expand Down

0 comments on commit 2b39009

Please sign in to comment.