Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Pruning - add a test case to verify compaction can reclaim disk space #6603

Open
wants to merge 14 commits into
base: leo/db-ops
Choose a base branch
from
3 changes: 2 additions & 1 deletion cmd/bootstrap/utils/md5.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package utils

// The google storage API only provides md5 and crc32 hence overriding the linter flag for md5
import (
"crypto/md5" //nolint:gosec
// #nosec
"crypto/md5"
"io"
"os"
)
Expand Down
8 changes: 7 additions & 1 deletion storage/batch.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package storage

import "github.com/dgraph-io/badger/v2"
import (
"github.com/dgraph-io/badger/v2"
)

// deprecated
// use Writer instead
type Transaction interface {
Set(key, val []byte) error
}

// deprecated
// use ReaderBatchWriter instead
// BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra
// callbacks which fire after the batch is successfully flushed.
type BatchStorage interface {
Expand Down
27 changes: 27 additions & 0 deletions storage/operation/badgerimpl/dbstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package badgerimpl

import (
"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
)

func ToDB(db *badger.DB) storage.DB {
return &dbStore{db: db}
}

type dbStore struct {
db *badger.DB
}

func (b *dbStore) Reader() storage.Reader {
return dbReader{db: b.db}
}

func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error {
return WithReaderBatchWriter(b.db, fn)
}

func (b *dbStore) NewBatch() storage.Batch {
return NewReaderBatchWriter(b.db)
}
65 changes: 65 additions & 0 deletions storage/operation/badgerimpl/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package badgerimpl

import (
"bytes"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
)

type badgerIterator struct {
iter *badger.Iterator
lowerBound []byte
upperBound []byte
}

var _ storage.Iterator = (*badgerIterator)(nil)

func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage.IteratorOption) *badgerIterator {
options := badger.DefaultIteratorOptions
if ops.IterateKeyOnly {
options.PrefetchValues = false
}

tx := db.NewTransaction(false)
iter := tx.NewIterator(options)

lowerBound, upperBound := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix)

return &badgerIterator{
iter: iter,
lowerBound: lowerBound,
upperBound: upperBound,
}
}

func (i *badgerIterator) SeekGE() {
i.iter.Seek(i.lowerBound)
}

func (i *badgerIterator) Valid() bool {
// if it's beyond the upper bound, it's invalid
if !i.iter.Valid() {
return false
}
key := i.iter.Item().Key()
// "< 0" means the upperBound is exclusive
valid := bytes.Compare(key, i.upperBound) < 0
return valid
}

func (i *badgerIterator) Next() {
i.iter.Next()
}

func (i *badgerIterator) IterItem() storage.IterItem {
return i.iter.Item()
}

var _ storage.IterItem = (*badger.Item)(nil)

func (i *badgerIterator) Close() error {
i.iter.Close()
return nil
}
54 changes: 54 additions & 0 deletions storage/operation/badgerimpl/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package badgerimpl

import (
"errors"
"io"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/storage"
)

type dbReader struct {
db *badger.DB
}

type noopCloser struct{}

var _ io.Closer = (*noopCloser)(nil)

func (noopCloser) Close() error { return nil }

func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) {
tx := b.db.NewTransaction(false)
defer tx.Discard()

item, err := tx.Get(key)
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
return nil, nil, storage.ErrNotFound
}
return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err)
}

var value []byte
err = item.Value(func(val []byte) error {
value = append([]byte{}, val...)
return nil
})
if err != nil {
return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err)
}

return value, noopCloser{}, nil
}

func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) {
return newBadgerIterator(b.db, startPrefix, endPrefix, ops), nil
}

// ToReader is a helper function to convert a *badger.DB to a Reader
func ToReader(db *badger.DB) storage.Reader {
return dbReader{db}
}
94 changes: 94 additions & 0 deletions storage/operation/badgerimpl/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package badgerimpl

import (
"fmt"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation"
op "github.com/onflow/flow-go/storage/operation"
)

type ReaderBatchWriter struct {
globalReader storage.Reader
batch *badger.WriteBatch

callbacks op.Callbacks
}

var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil)
var _ storage.Batch = (*ReaderBatchWriter)(nil)

func (b *ReaderBatchWriter) GlobalReader() storage.Reader {
return b.globalReader
}

func (b *ReaderBatchWriter) Writer() storage.Writer {
return b
}

func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch {
return b.batch
}

func (b *ReaderBatchWriter) AddCallback(callback func(error)) {
b.callbacks.AddCallback(callback)
}

func (b *ReaderBatchWriter) Commit() error {
err := b.batch.Flush()

b.callbacks.NotifyCallbacks(err)

return err
}

func WithReaderBatchWriter(db *badger.DB, fn func(storage.ReaderBatchWriter) error) error {
batch := NewReaderBatchWriter(db)

err := fn(batch)
if err != nil {
// fn might use lock to ensure concurrent safety while reading and writing data
// and the lock is usually released by a callback.
// in other words, fn might hold a lock to be released by a callback,
// we need to notify the callback for the locks to be released before
// returning the error.
batch.callbacks.NotifyCallbacks(err)
return err
}

return batch.Commit()
}

func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter {
return &ReaderBatchWriter{
globalReader: ToReader(db),
batch: db.NewWriteBatch(),
}
}

var _ storage.Writer = (*ReaderBatchWriter)(nil)

func (b *ReaderBatchWriter) Set(key, value []byte) error {
return b.batch.Set(key, value)
}

func (b *ReaderBatchWriter) Delete(key []byte) error {
return b.batch.Delete(key)
}

func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error {
err := operation.IterateKeysInPrefixRange(startPrefix, endPrefix, func(key []byte) error {
err := b.batch.Delete(key)
if err != nil {
return fmt.Errorf("could not add key to delete batch (%v): %w", key, err)
}
return nil
})(globalReader)

if err != nil {
return fmt.Errorf("could not find keys by range to be deleted: %w", err)
}
return nil
}
24 changes: 24 additions & 0 deletions storage/operation/callbacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package operation

import "sync"

type Callbacks struct {
sync.Mutex // protect callbacks
callbacks []func(error)
}

func (b *Callbacks) AddCallback(callback func(error)) {
b.Lock()
defer b.Unlock()

b.callbacks = append(b.callbacks, callback)
}

func (b *Callbacks) NotifyCallbacks(err error) {
b.Lock()
defer b.Unlock()

for _, callback := range b.callbacks {
callback(err)
}
}
34 changes: 34 additions & 0 deletions storage/operation/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package operation

import (
"encoding/binary"
"fmt"

"github.com/onflow/flow-go/model/flow"
)

// EncodeKeyPart encodes a value to be used as a part of a key to be stored in storage.
func EncodeKeyPart(v interface{}) []byte {
switch i := v.(type) {
case uint8:
return []byte{i}
case uint32:
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, i)
return b
case uint64:
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, i)
return b
case string:
return []byte(i)
case flow.Role:
return []byte{byte(i)}
case flow.Identifier:
return i[:]
case flow.ChainID:
return []byte(i)
default:
panic(fmt.Sprintf("unsupported type to convert (%T)", v))
}
}
Loading