Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various small tweaks #2

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 61 additions & 20 deletions badger/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
package badger

import (
"encoding/binary"
"fmt"
"path/filepath"
"sync"

"github.com/dgraph-io/badger/memtable"
"github.com/dgraph-io/badger/mem"
"github.com/dgraph-io/badger/value"
"github.com/dgraph-io/badger/y"
)

// DBOptions are params for creating DB object.
type DBOptions struct {
// Options are params for creating DB object.
type Options struct {
Dir string
NumLevelZeroTables int // Maximum number of Level 0 tables before we start compacting.
NumLevelZeroTablesStall int // If we hit this number of Level 0 tables, we will stall until level 0 is compacted away.
Expand All @@ -37,9 +39,10 @@ type DBOptions struct {
LevelSizeMultiplier int
ValueThreshold int // If value size >= this threshold, we store offsets in value log.
Verbose bool
DoNotCompact bool
}

var DefaultDBOptions = DBOptions{
var DefaultOptions = Options{
Dir: "/tmp",
NumLevelZeroTables: 5,
NumLevelZeroTablesStall: 10,
Expand All @@ -50,40 +53,56 @@ var DefaultDBOptions = DBOptions{
LevelSizeMultiplier: 10,
ValueThreshold: 20,
Verbose: true,
DoNotCompact: false, // Only for testing.
}

type DB struct {
sync.RWMutex // Guards imm, mem.

imm *memtable.Memtable // Immutable, memtable being flushed.
mem *memtable.Memtable
immWg sync.WaitGroup // Nonempty when flushing immutable memtable.
opt DBOptions
lc *levelsController
vlog value.Log
imm *mem.Table // Immutable, memtable being flushed.
mt *mem.Table
immWg sync.WaitGroup // Nonempty when flushing immutable memtable.
opt Options
lc *levelsController
vlog value.Log
voffset uint64
}

// NewDB returns a new DB object. Compact levels are created as well.
func NewDB(opt DBOptions) *DB {
func NewDB(opt Options) *DB {
y.AssertTrue(len(opt.Dir) > 0)
out := &DB{
mem: memtable.NewMemtable(),
mt: mem.NewTable(),
opt: opt, // Make a copy.
lc: newLevelsController(opt),
}
vlogPath := filepath.Join(opt.Dir, "vlog")
out.vlog.Open(vlogPath)

val := out.Get(Head)
var voffset uint64
if len(val) == 0 {
voffset = 0
} else {
voffset = binary.BigEndian.Uint64(val)
}

fn := func(k, v []byte, meta byte) {
out.mt.Put(k, v, meta)
}
out.vlog.Replay(voffset, fn)

return out
}

// Close closes a DB.
func (s *DB) Close() {
}

func (s *DB) getMemTables() (*memtable.Memtable, *memtable.Memtable) {
func (s *DB) getMemTables() (*mem.Table, *mem.Table) {
s.RLock()
defer s.RUnlock()
return s.mem, s.imm
return s.mt, s.imm
}

func decodeValue(val []byte, vlog *value.Log) []byte {
Expand Down Expand Up @@ -132,11 +151,22 @@ func (s *DB) Get(key []byte) []byte {
return decodeValue(val, &s.vlog)
}

func (s *DB) updateOffset(ptrs []value.Pointer) {
ptr := ptrs[len(ptrs)-1]

s.Lock()
defer s.Unlock()
if s.voffset < ptr.Offset {
s.voffset = ptr.Offset + uint64(ptr.Len)
}
}

// Write applies a list of value.Entry to our memtable.
func (s *DB) Write(entries []value.Entry) error {
ptrs, err := s.vlog.Write(entries)
y.Check(err)
y.AssertTrue(len(ptrs) == len(entries))
s.updateOffset(ptrs)

if err := s.makeRoomForWrite(); err != nil {
return err
Expand All @@ -145,9 +175,9 @@ func (s *DB) Write(entries []value.Entry) error {
var offsetBuf [20]byte
for i, entry := range entries {
if len(entry.Value) < s.opt.ValueThreshold { // Will include deletion / tombstone case.
s.mem.Put(entry.Key, entry.Value, entry.Meta)
s.mt.Put(entry.Key, entry.Value, entry.Meta)
} else {
s.mem.Put(entry.Key, ptrs[i].Encode(offsetBuf[:]), entry.Meta|value.BitValuePointer)
s.mt.Put(entry.Key, ptrs[i].Encode(offsetBuf[:]), entry.Meta|value.BitValuePointer)
}
}
return nil
Expand All @@ -173,22 +203,33 @@ func (s *DB) Delete(key []byte) error {
})
}

var (
Head = []byte("_head_")
)

func (s *DB) makeRoomForWrite() error {
if s.mem.MemUsage() < s.opt.MaxTableSize {
if s.mt.MemUsage() < s.opt.MaxTableSize {
// Nothing to do. We have enough space.
return nil
}
s.immWg.Wait() // Make sure we finish flushing immutable memtable.

s.Lock()
if s.voffset > 0 {
fmt.Printf("Storing offset: %v\n", s.voffset)
offset := make([]byte, 8)
binary.BigEndian.PutUint64(offset, s.voffset)
s.mt.Put(Head, offset, 0)
}

// Changing imm, mem requires lock.
s.imm = s.mem
s.mem = memtable.NewMemtable()
s.imm = s.mt
s.mt = mem.NewTable()

// Note: It is important to start the compaction within this lock.
// Otherwise, you might be compacting the wrong imm!
s.immWg.Add(1)
go func(imm *memtable.Memtable) {
go func(imm *mem.Table) {
defer s.immWg.Done()

fileID, f := tempFile(s.opt.Dir)
Expand Down
66 changes: 62 additions & 4 deletions badger/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package badger

import (
"encoding/binary"
"fmt"
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -26,7 +29,7 @@ import (
)

func TestDBWrite(t *testing.T) {
db := NewDB(DefaultDBOptions)
db := NewDB(DefaultOptions)
var entries []value.Entry
for i := 0; i < 100; i++ {
entries = append(entries, value.Entry{
Expand All @@ -38,7 +41,7 @@ func TestDBWrite(t *testing.T) {
}

func TestDBGet(t *testing.T) {
db := NewDB(DefaultDBOptions)
db := NewDB(DefaultOptions)
require.NoError(t, db.Put([]byte("key1"), []byte("val1")))
require.EqualValues(t, "val1", db.Get([]byte("key1")))

Expand All @@ -59,7 +62,7 @@ func TestDBGet(t *testing.T) {
// Put a lot of data to move some data to disk.
// WARNING: This test might take a while but it should pass!
func TestDBGetMore(t *testing.T) {
db := NewDB(DefaultDBOptions)
db := NewDB(DefaultOptions)
// n := 500000
n := 100000
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -115,7 +118,7 @@ func TestDBGetMore(t *testing.T) {

// Put a lot of data to move some data to disk. Then iterate.
func TestDBIterateBasic(t *testing.T) {
db := NewDB(DefaultDBOptions)
db := NewDB(DefaultOptions)
defer db.Close()

// n := 500000
Expand All @@ -140,3 +143,58 @@ func TestDBIterateBasic(t *testing.T) {
}
require.EqualValues(t, n, count)
}

func TestCrash(t *testing.T) {
dir, err := ioutil.TempDir("", "")
require.Nil(t, err)
defer os.RemoveAll(dir)

opt := DefaultOptions
opt.MaxTableSize = 1 << 20
opt.Dir = dir
opt.DoNotCompact = true

db := NewDB(opt)
var keys [][]byte
for i := 0; i < 150000; i++ {
k := []byte(fmt.Sprintf("%09d", i))
keys = append(keys, k)
}

entries := make([]value.Entry, 0, 10)
for _, k := range keys {
e := value.Entry{
Key: k,
Value: k,
}
entries = append(entries, e)

if len(entries) == 100 {
err := db.Write(entries)
require.Nil(t, err)
entries = entries[:0]
}
}

for _, k := range keys {
require.Equal(t, k, db.Get(k))
}

db2 := NewDB(opt)
for _, k := range keys {
require.Equal(t, k, db2.Get(k), "Key: %s", k)
}

db.lc.tryCompact(1)
db.lc.tryCompact(1)
val := db.Get(Head)
// val := db.lc.levels[1].get(Head)
require.True(t, len(val) > 0)
voffset := binary.BigEndian.Uint64(val)
fmt.Printf("level 1 val: %v\n", voffset)

db3 := NewDB(opt)
for _, k := range keys {
require.Equal(t, k, db3.Get(k), "Key: %s", k)
}
}
17 changes: 11 additions & 6 deletions badger/levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type levelHandler struct {
// The following are initialized once and const.
level int
maxTotalSize int64
opt DBOptions
opt Options
}

type levelsController struct {
Expand All @@ -52,7 +52,7 @@ type levelsController struct {

// The following are initialized once and const.
levels []*levelHandler
opt DBOptions
opt Options
}

var (
Expand Down Expand Up @@ -164,14 +164,14 @@ func (s *levelHandler) overlappingTables(begin, end []byte) (int, int) {
return left, right
}

func newLevelHandler(opt DBOptions, level int) *levelHandler {
func newLevelHandler(opt Options, level int) *levelHandler {
return &levelHandler{
level: level,
opt: opt,
}
}

func newLevelsController(opt DBOptions) *levelsController {
func newLevelsController(opt Options) *levelsController {
y.AssertTrue(opt.NumLevelZeroTablesStall > opt.NumLevelZeroTables)
s := &levelsController{
opt: opt,
Expand All @@ -190,12 +190,17 @@ func newLevelsController(opt DBOptions) *levelsController {
}
}
for i := 0; i < s.opt.NumCompactWorkers; i++ {
go s.compact(i)
go s.runWorker(i)
}
return s
}

func (s *levelsController) compact(workerID int) {
func (s *levelsController) runWorker(workerID int) {
if s.opt.DoNotCompact {
fmt.Println("NOT running any compactions due to DB options.")
return
}

time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
timeChan := time.Tick(10 * time.Millisecond)
for {
Expand Down
2 changes: 1 addition & 1 deletion badger/levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func extractTable(table *tableHandler) [][]string {
// TestDoCompact tests the merging logic which is done in internal doCompact function.
// We might remove this internal test eventually.
func TestDoCompact(t *testing.T) {
c := newLevelsController(DefaultDBOptions)
c := newLevelsController(DefaultOptions)
t0 := buildTable(t, [][]string{
{"k2", "z2"},
{"k22", "z22"},
Expand Down
Loading