Skip to content

Commit

Permalink
Merge pull request #6603 from onflow/leo/db-ops-dbstore
Browse files Browse the repository at this point in the history
[Storage] Pruning - add a test case to verify compaction can reclaim disk space
  • Loading branch information
zhangchiqing authored Jan 13, 2025
2 parents 76aa569 + f5365d9 commit cb32bef
Show file tree
Hide file tree
Showing 11 changed files with 577 additions and 39 deletions.
27 changes: 27 additions & 0 deletions storage/operation/badgerimpl/dbstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package badgerimpl

import (
"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
)

func ToDB(db *badger.DB) storage.DB {
return &dbStore{db: db}
}

type dbStore struct {
db *badger.DB
}

func (b *dbStore) Reader() storage.Reader {
return dbReader{db: b.db}
}

func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error {
return WithReaderBatchWriter(b.db, fn)
}

func (b *dbStore) NewBatch() storage.Batch {
return NewReaderBatchWriter(b.db)
}
1 change: 1 addition & 0 deletions storage/operation/badgerimpl/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type ReaderBatchWriter struct {
}

var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil)
var _ storage.Batch = (*ReaderBatchWriter)(nil)

// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation").
// This reader will not read un-committed writes written to ReaderBatchWriter.Writer until the write batch is committed.
Expand Down
126 changes: 95 additions & 31 deletions storage/operation/dbtest/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
Expand All @@ -16,45 +17,108 @@ import (
type WithWriter func(func(storage.Writer) error) error

func RunWithStorages(t *testing.T, fn func(*testing.T, storage.Reader, WithWriter)) {
RunWithBadger(t, fn)
RunWithPebble(t, fn)
}

func RunWithDB(t *testing.T, fn func(*testing.T, storage.DB)) {
t.Run("BadgerStorage", func(t *testing.T) {
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
withWriter := func(writing func(storage.Writer) error) error {
writer := badgerimpl.NewReaderBatchWriter(db)
err := writing(writer)
if err != nil {
return err
}

err = writer.Commit()
if err != nil {
return err
}
return nil
}

reader := badgerimpl.ToReader(db)
fn(t, reader, withWriter)
fn(t, badgerimpl.ToDB(db))
})
})

t.Run("PebbleStorage", func(t *testing.T) {
unittest.RunWithPebbleDB(t, func(db *pebble.DB) {
withWriter := func(writing func(storage.Writer) error) error {
writer := pebbleimpl.NewReaderBatchWriter(db)
err := writing(writer)
if err != nil {
return err
}

err = writer.Commit()
if err != nil {
return err
}
return nil
}
fn(t, pebbleimpl.ToDB(db))
})
})
}

func RunWithBadger(t *testing.T, fn func(*testing.T, storage.Reader, WithWriter)) {
t.Run("BadgerStorage", func(t *testing.T) {
unittest.RunWithBadgerDB(t, runWithBadger(func(r storage.Reader, wr WithWriter) {
fn(t, r, wr)
}))
})
}

func RunWithPebble(t *testing.T, fn func(*testing.T, storage.Reader, WithWriter)) {
t.Run("PebbleStorage", func(t *testing.T) {
unittest.RunWithPebbleDB(t, runWithPebble(func(r storage.Reader, wr WithWriter) {
fn(t, r, wr)
}))
})
}

func RunWithPebbleDB(t *testing.T, opts *pebble.Options, fn func(*testing.T, storage.Reader, WithWriter, string, *pebble.DB)) {
t.Run("PebbleStorage", func(t *testing.T) {
unittest.RunWithTempDir(t, func(dir string) {
db, err := pebble.Open(dir, opts)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()

reader := pebbleimpl.ToReader(db)
fn(t, reader, withWriter)
runWithPebble(func(r storage.Reader, w WithWriter) {
fn(t, r, w, dir, db)
})(db)
})
})
}

func BenchWithStorages(t *testing.B, fn func(*testing.B, storage.Reader, WithWriter)) {
t.Run("BadgerStorage", func(t *testing.B) {
unittest.RunWithBadgerDB(t, runWithBadger(func(r storage.Reader, wr WithWriter) {
fn(t, r, wr)
}))
})

t.Run("PebbleStorage", func(t *testing.B) {
unittest.RunWithPebbleDB(t, runWithPebble(func(r storage.Reader, wr WithWriter) {
fn(t, r, wr)
}))
})
}

func runWithBadger(fn func(storage.Reader, WithWriter)) func(*badger.DB) {
return func(db *badger.DB) {
withWriter := func(writing func(storage.Writer) error) error {
writer := badgerimpl.NewReaderBatchWriter(db)
err := writing(writer)
if err != nil {
return err
}

err = writer.Commit()
if err != nil {
return err
}
return nil
}

reader := badgerimpl.ToReader(db)
fn(reader, withWriter)
}
}

func runWithPebble(fn func(storage.Reader, WithWriter)) func(*pebble.DB) {
return func(db *pebble.DB) {
withWriter := func(writing func(storage.Writer) error) error {
writer := pebbleimpl.NewReaderBatchWriter(db)
err := writing(writer)
if err != nil {
return err
}

err = writer.Commit()
if err != nil {
return err
}
return nil
}

reader := pebbleimpl.ToReader(db)
fn(reader, withWriter)
}
}
27 changes: 27 additions & 0 deletions storage/operation/pebbleimpl/dbstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package pebbleimpl

import (
"github.com/cockroachdb/pebble"

"github.com/onflow/flow-go/storage"
)

func ToDB(db *pebble.DB) storage.DB {
return &dbStore{db: db}
}

type dbStore struct {
db *pebble.DB
}

func (b *dbStore) Reader() storage.Reader {
return dbReader{db: b.db}
}

func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error {
return WithReaderBatchWriter(b.db, fn)
}

func (b *dbStore) NewBatch() storage.Batch {
return NewReaderBatchWriter(b.db)
}
1 change: 1 addition & 0 deletions storage/operation/pebbleimpl/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ReaderBatchWriter struct {
}

var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil)
var _ storage.Batch = (*ReaderBatchWriter)(nil)

// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation").
// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed.
Expand Down
23 changes: 22 additions & 1 deletion storage/operation/reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
// CheckFunc is a function that checks if the value should be read and decoded.
// return (true, nil) to read the value and pass it to the CreateFunc and HandleFunc for decoding
// return (false, nil) to skip reading the value
// return (false, err) if running into any error, the iteration should be stopped.
// return (false, err) if running into any exception, the iteration should be stopped.
// when making a CheckFunc to be used in the IterationFunc to iterate over the keys, a sentinel error
// can be defined and checked to stop the iteration early, such as finding the first key that match
// certain condition.
// Note: the returned bool is to decide whether to read the value or not, rather than whether to stop
// the iteration or not.
type CheckFunc func(key []byte) (bool, error)

// CreateFunc returns a pointer to an initialized entity that we can potentially
Expand Down Expand Up @@ -129,6 +131,25 @@ func TraverseByPrefix(r storage.Reader, prefix []byte, iterFunc IterationFunc, o
return IterateKeys(r, prefix, prefix, iterFunc, opt)
}

// KeyOnlyIterateFunc returns an IterationFunc that only iterates over keys
func KeyOnlyIterateFunc(fn func(key []byte) error) IterationFunc {
return func() (CheckFunc, CreateFunc, HandleFunc) {
checker := func(key []byte) (bool, error) {
return false, fn(key)
}

create := func() interface{} {
return nil
}

handle := func() error {
return nil
}

return checker, create, handle
}
}

// KeyExists returns true if a key exists in the database.
// When this returned function is executed (and only then), it will write into the `keyExists` whether
// the key exists.
Expand Down
63 changes: 63 additions & 0 deletions storage/operation/reads_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package operation_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation"
"github.com/onflow/flow-go/storage/operation/dbtest"
)

func BenchmarkRetrieve(t *testing.B) {
dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) {
e := Entity{ID: 1337}
require.NoError(t, withWriter(operation.Upsert(e.Key(), e)))

t.ResetTimer()

for i := 0; i < t.N; i++ {
var readBack Entity
require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r))
}
})
}

func BenchmarkNonExist(t *testing.B) {
dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) {
for i := 0; i < t.N; i++ {
e := Entity{ID: uint64(i)}
require.NoError(t, withWriter(operation.Upsert(e.Key(), e)))
}

t.ResetTimer()
nonExist := Entity{ID: uint64(t.N + 1)}
for i := 0; i < t.N; i++ {
var exists bool
require.NoError(t, operation.Exists(nonExist.Key(), &exists)(r))
}
})
}

func BenchmarkIterate(t *testing.B) {
dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) {
prefix1 := []byte("prefix-1")
prefix2 := []byte("prefix-2")
for i := 0; i < t.N; i++ {
e := Entity{ID: uint64(i)}
key1 := append(prefix1, e.Key()...)
key2 := append(prefix2, e.Key()...)

require.NoError(t, withWriter(operation.Upsert(key1, e)))
require.NoError(t, withWriter(operation.Upsert(key2, e)))
}

t.ResetTimer()
var found [][]byte
require.NoError(t, operation.Iterate(prefix1, prefix2, func(key []byte) error {
found = append(found, key)
return nil
})(r), "should iterate forward without error")
})
}
Loading

0 comments on commit cb32bef

Please sign in to comment.