Skip to content

Commit

Permalink
Write-ahead log (#29)
Browse files Browse the repository at this point in the history
Replaces the unstructured data file for storing key-value pairs with a write-ahead log.

- In the event of a crash or a power loss the database is automatically recovered (#27).
- Fixes disk space overhead when storing small keys and values (#24).
- Optional background compaction allows reclaiming disk space occupied by overwritten or deleted keys (#28).

See docs/design.md for more details.
  • Loading branch information
akrylysov authored Mar 8, 2020
1 parent 4b58804 commit eeec217
Show file tree
Hide file tree
Showing 37 changed files with 2,366 additions and 1,132 deletions.
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
/test.db
/test.db.index
/test.db.lock
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
language: go

go:
- 1.x
- "1.11.x"
- "1.x"

os:
- linux
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## [0.9.0] - 2020-03-08
## Changed
- Replace the unstructured data file for storing key-value pairs with a write-ahead log.
### Added
- In the event of a crash or a power loss the database is automatically recovered.
- Optional background compaction allows reclaiming disk space occupied by overwritten or deleted keys.
### Fixed
- Fix disk space overhead when storing small keys and values.

## [0.8.3] - 2019-11-03
### Fixed
- Fix slice bounds out of range error mapping files on Windows.
Expand Down
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {

### Writing to a database

Use the `DB.Put()` function to insert a new key/value pair:
Use the `DB.Put()` function to insert a new key-value pair:

```go
err := db.Put([]byte("testKey"), []byte("testValue"))
Expand All @@ -56,7 +56,7 @@ if err != nil {

### Reading from a database

Use the `DB.Get()` function to retrieve the inserted value:
To retrieve the inserted value, use the `DB.Get()` function:

```go
val, err := db.Get([]byte("testKey"))
Expand All @@ -68,17 +68,17 @@ log.Printf("%s", val)

### Iterating over items

Use the `DB.Items()` function which returns a new instance of `ItemIterator`:
To iterate over items, use `ItemIterator` returned by `DB.Items()`:

```go
it := db.Items()
for {
key, val, err := it.Next()
if err != nil {
if err != pogreb.ErrIterationDone {
log.Fatal(err)
}
break
if err == pogreb.ErrIterationDone {
break
}
if err != nil {
log.Fatal(err)
}
log.Printf("%s %s", key, val)
}
Expand All @@ -95,4 +95,4 @@ on DigitalOcean 8 CPUs / 16 GB RAM / 160 GB SSD + Ubuntu 16.04.3 (higher is bett

## Internals

[Pogreb - how it works](https://artem.krylysov.com/blog/2018/03/24/pogreb-key-value-store/).
[Design document](/docs/design.md).
48 changes: 27 additions & 21 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,62 @@ import (
"github.com/akrylysov/pogreb/fs"
)

const (
slotsPerBucket = 31
bucketSize = 512
)

// slot corresponds to a single item in the hash table.
type slot struct {
hash uint32
segmentID uint16
keySize uint16
valueSize uint32
kvOffset int64
offset uint32 // Segment offset.
}

func (sl slot) kvSize() uint32 {
return uint32(sl.keySize) + sl.valueSize
}

// bucket is an array of slots.
type bucket struct {
slots [slotsPerBucket]slot
next int64
next int64 // Offset of overflow bucket.
}

// bucketHandle is a bucket, plus its offset and the file it's written to.
type bucketHandle struct {
bucket
file fs.MmapFile
offset int64
}

const (
bucketSize uint32 = 512
)

func align512(n uint32) uint32 {
return (n + 511) &^ 511
}

func (b bucket) MarshalBinary() ([]byte, error) {
buf := make([]byte, bucketSize)
data := buf
for i := 0; i < slotsPerBucket; i++ {
sl := b.slots[i]
binary.LittleEndian.PutUint32(buf[:4], sl.hash)
binary.LittleEndian.PutUint16(buf[4:6], sl.keySize)
binary.LittleEndian.PutUint32(buf[6:10], sl.valueSize)
binary.LittleEndian.PutUint64(buf[10:18], uint64(sl.kvOffset))
buf = buf[18:]
binary.LittleEndian.PutUint16(buf[4:6], sl.segmentID)
binary.LittleEndian.PutUint16(buf[6:8], sl.keySize)
binary.LittleEndian.PutUint32(buf[8:12], sl.valueSize)
binary.LittleEndian.PutUint32(buf[12:16], sl.offset)
buf = buf[16:]
}
binary.LittleEndian.PutUint64(buf[:8], uint64(b.next))
return data, nil
}

func (b *bucket) UnmarshalBinary(data []byte) error {
for i := 0; i < slotsPerBucket; i++ {
_ = data[18] // bounds check hint to compiler; see golang.org/issue/14808
_ = data[16] // bounds check hint to compiler; see golang.org/issue/14808
b.slots[i].hash = binary.LittleEndian.Uint32(data[:4])
b.slots[i].keySize = binary.LittleEndian.Uint16(data[4:6])
b.slots[i].valueSize = binary.LittleEndian.Uint32(data[6:10])
b.slots[i].kvOffset = int64(binary.LittleEndian.Uint64(data[10:18]))
data = data[18:]
b.slots[i].segmentID = binary.LittleEndian.Uint16(data[4:6])
b.slots[i].keySize = binary.LittleEndian.Uint16(data[6:8])
b.slots[i].valueSize = binary.LittleEndian.Uint32(data[8:12])
b.slots[i].offset = binary.LittleEndian.Uint32(data[12:16])
data = data[16:]
}
b.next = int64(binary.LittleEndian.Uint64(data[:8]))
return nil
Expand Down Expand Up @@ -89,15 +92,17 @@ func (b *bucketHandle) write() error {
return err
}

// slotWriter inserts and writes slots into a bucket.
type slotWriter struct {
bucket *bucketHandle
slotIdx int
prevBuckets []*bucketHandle
}

func (sw *slotWriter) insert(sl slot, db *DB) error {
func (sw *slotWriter) insert(sl slot, idx *index) error {
if sw.slotIdx == slotsPerBucket {
nextBucket, err := db.createOverflowBucket()
// Bucket is full, create a new overflow bucket.
nextBucket, err := idx.createOverflowBucket()
if err != nil {
return err
}
Expand All @@ -112,6 +117,7 @@ func (sw *slotWriter) insert(sl slot, db *DB) error {
}

func (sw *slotWriter) write() error {
// Write previous buckets first.
for i := len(sw.prevBuckets) - 1; i >= 0; i-- {
if err := sw.prevBuckets[i].write(); err != nil {
return err
Expand Down
148 changes: 148 additions & 0 deletions compaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package pogreb

import (
"sync/atomic"
)

func (db *DB) moveRecord(rec record) (bool, error) {
hash := db.hash(rec.key)
reclaimed := true
err := db.index.forEachBucket(db.index.bucketIndex(hash), func(b bucketHandle) (bool, error) {
for i, sl := range b.slots {
if sl.offset == 0 {
return b.next == 0, nil
}

// Slot points to a different record.
if hash != sl.hash || rec.offset != sl.offset || rec.segmentID != sl.segmentID {
continue
}

segmentID, offset, err := db.datalog.writeRecord(rec.data, rec.rtype) // TODO: batch writes
if err != nil {
return true, err
}
// Update index.
b.slots[i].segmentID = segmentID
b.slots[i].offset = offset
reclaimed = false
return true, b.write()
}
return false, nil
})
return reclaimed, err
}

// CompactionResult holds the compaction result.
type CompactionResult struct {
CompactedSegments int
ReclaimedRecords int
ReclaimedBytes int
}

func (db *DB) compact(f *segment) (CompactionResult, error) {
cr := CompactionResult{}

db.mu.Lock()
f.meta.Full = true // Prevent writes to the compacted file.
db.mu.Unlock()

it, err := newSegmentIterator(f)
if err != nil {
return cr, err
}
// Move records from f to the current segment.
for {
err := func() error {
db.mu.Lock()
defer db.mu.Unlock()
rec, err := it.next()
if err != nil {
return err
}
if rec.rtype == recordTypeDelete {
cr.ReclaimedRecords++
cr.ReclaimedBytes += len(rec.data)
return nil
}
reclaimed, err := db.moveRecord(rec)
if reclaimed {
cr.ReclaimedRecords++
cr.ReclaimedBytes += len(rec.data)
}
return err
}()
if err == ErrIterationDone {
break
}
if err != nil {
return cr, err
}
}

db.mu.Lock()
defer db.mu.Unlock()
err = db.datalog.removeSegment(f)
return cr, err
}

func (db *DB) pickForCompaction() ([]*segment, error) {
segments, err := db.datalog.segmentsByModification()
if err != nil {
return nil, err
}
var picked []*segment
for i := len(segments) - 1; i >= 0; i-- {
seg := segments[i]

if uint32(seg.size) < db.opts.compactionMinSegmentSize {
continue
}

fragmentation := float32(seg.meta.DeletedBytes) / float32(seg.size)
if fragmentation < db.opts.compactionMinFragmentation {
continue
}

if seg.meta.DeleteRecords > 0 {
// Delete records can be discarded only when older files contain no put records for the corresponding keys.
// All files older than the file eligible for compaction have to be compacted.
return append(segments[:i+1], picked...), nil
}

picked = append([]*segment{seg}, picked...)
}
return picked, nil
}

// Compact compacts the DB. Deleted and overwritten items are discarded.
func (db *DB) Compact() (CompactionResult, error) {
cr := CompactionResult{}

// Run only a single compaction at a time.
if !atomic.CompareAndSwapInt32(&db.compactionRunning, 0, 1) {
return cr, errBusy
}
defer func() {
atomic.StoreInt32(&db.compactionRunning, 0)
}()

db.mu.RLock()
segments, err := db.pickForCompaction()
db.mu.RUnlock()
if err != nil {
return cr, err
}

for _, f := range segments {
fcr, err := db.compact(f)
if err != nil {
return cr, err
}
cr.CompactedSegments++
cr.ReclaimedRecords += fcr.ReclaimedRecords
cr.ReclaimedBytes += fcr.ReclaimedBytes
}

return cr, nil
}
Loading

0 comments on commit eeec217

Please sign in to comment.