Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Pruning - add a test case to verify compaction can reclaim disk space #6603

Merged
merged 14 commits into from
Jan 13, 2025
27 changes: 27 additions & 0 deletions storage/operation/badgerimpl/dbstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package badgerimpl

import (
"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
)

func ToDB(db *badger.DB) storage.DB {
return &dbStore{db: db}
}

type dbStore struct {
db *badger.DB
}

func (b *dbStore) Reader() storage.Reader {
return dbReader{db: b.db}
}

func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error {
return WithReaderBatchWriter(b.db, fn)
}

func (b *dbStore) NewBatch() storage.Batch {
return NewReaderBatchWriter(b.db)
}
1 change: 1 addition & 0 deletions storage/operation/badgerimpl/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type ReaderBatchWriter struct {
}

var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil)
var _ storage.Batch = (*ReaderBatchWriter)(nil)

// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation").
// This reader will not read un-committed writes written to ReaderBatchWriter.Writer until the write batch is committed.
Expand Down
126 changes: 95 additions & 31 deletions storage/operation/dbtest/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
Expand All @@ -16,45 +17,108 @@ import (
type WithWriter func(func(storage.Writer) error) error

func RunWithStorages(t *testing.T, fn func(*testing.T, storage.Reader, WithWriter)) {
RunWithBadger(t, fn)
RunWithPebble(t, fn)
}

func RunWithDB(t *testing.T, fn func(*testing.T, storage.DB)) {
t.Run("BadgerStorage", func(t *testing.T) {
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
withWriter := func(writing func(storage.Writer) error) error {
writer := badgerimpl.NewReaderBatchWriter(db)
err := writing(writer)
if err != nil {
return err
}

err = writer.Commit()
if err != nil {
return err
}
return nil
}

reader := badgerimpl.ToReader(db)
fn(t, reader, withWriter)
fn(t, badgerimpl.ToDB(db))
})
})

t.Run("PebbleStorage", func(t *testing.T) {
unittest.RunWithPebbleDB(t, func(db *pebble.DB) {
withWriter := func(writing func(storage.Writer) error) error {
writer := pebbleimpl.NewReaderBatchWriter(db)
err := writing(writer)
if err != nil {
return err
}

err = writer.Commit()
if err != nil {
return err
}
return nil
}
fn(t, pebbleimpl.ToDB(db))
})
})
}

func RunWithBadger(t *testing.T, fn func(*testing.T, storage.Reader, WithWriter)) {
t.Run("BadgerStorage", func(t *testing.T) {
unittest.RunWithBadgerDB(t, runWithBadger(func(r storage.Reader, wr WithWriter) {
fn(t, r, wr)
}))
})
}

func RunWithPebble(t *testing.T, fn func(*testing.T, storage.Reader, WithWriter)) {
t.Run("PebbleStorage", func(t *testing.T) {
unittest.RunWithPebbleDB(t, runWithPebble(func(r storage.Reader, wr WithWriter) {
fn(t, r, wr)
}))
})
}

func RunWithPebbleDB(t *testing.T, opts *pebble.Options, fn func(*testing.T, storage.Reader, WithWriter, string, *pebble.DB)) {
t.Run("PebbleStorage", func(t *testing.T) {
unittest.RunWithTempDir(t, func(dir string) {
db, err := pebble.Open(dir, opts)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()

reader := pebbleimpl.ToReader(db)
fn(t, reader, withWriter)
runWithPebble(func(r storage.Reader, w WithWriter) {
fn(t, r, w, dir, db)
})(db)
})
})
}

func BenchWithStorages(t *testing.B, fn func(*testing.B, storage.Reader, WithWriter)) {
t.Run("BadgerStorage", func(t *testing.B) {
unittest.RunWithBadgerDB(t, runWithBadger(func(r storage.Reader, wr WithWriter) {
fn(t, r, wr)
}))
})

t.Run("PebbleStorage", func(t *testing.B) {
unittest.RunWithPebbleDB(t, runWithPebble(func(r storage.Reader, wr WithWriter) {
fn(t, r, wr)
}))
})
}

func runWithBadger(fn func(storage.Reader, WithWriter)) func(*badger.DB) {
return func(db *badger.DB) {
withWriter := func(writing func(storage.Writer) error) error {
writer := badgerimpl.NewReaderBatchWriter(db)
err := writing(writer)
if err != nil {
return err
}

err = writer.Commit()
if err != nil {
return err
}
return nil
}

reader := badgerimpl.ToReader(db)
fn(reader, withWriter)
}
}

func runWithPebble(fn func(storage.Reader, WithWriter)) func(*pebble.DB) {
return func(db *pebble.DB) {
withWriter := func(writing func(storage.Writer) error) error {
writer := pebbleimpl.NewReaderBatchWriter(db)
err := writing(writer)
if err != nil {
return err
}

err = writer.Commit()
if err != nil {
return err
}
return nil
}

reader := pebbleimpl.ToReader(db)
fn(reader, withWriter)
}
}
27 changes: 27 additions & 0 deletions storage/operation/pebbleimpl/dbstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package pebbleimpl

import (
"github.com/cockroachdb/pebble"

"github.com/onflow/flow-go/storage"
)

func ToDB(db *pebble.DB) storage.DB {
return &dbStore{db: db}
}

type dbStore struct {
db *pebble.DB
}

func (b *dbStore) Reader() storage.Reader {
return dbReader{db: b.db}
}

func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error {
return WithReaderBatchWriter(b.db, fn)
}

func (b *dbStore) NewBatch() storage.Batch {
return NewReaderBatchWriter(b.db)
}
1 change: 1 addition & 0 deletions storage/operation/pebbleimpl/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ReaderBatchWriter struct {
}

var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil)
var _ storage.Batch = (*ReaderBatchWriter)(nil)

// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation").
// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed.
Expand Down
23 changes: 22 additions & 1 deletion storage/operation/reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
// CheckFunc is a function that checks if the value should be read and decoded.
// return (true, nil) to read the value and pass it to the CreateFunc and HandleFunc for decoding
// return (false, nil) to skip reading the value
// return (false, err) if running into any error, the iteration should be stopped.
// return (false, err) if running into any exception, the iteration should be stopped.
// when making a CheckFunc to be used in the IterationFunc to iterate over the keys, a sentinel error
// can be defined and checked to stop the iteration early, such as finding the first key that match
// certain condition.
// Note: the returned bool is to decide whether to read the value or not, rather than whether to stop
// the iteration or not.
type CheckFunc func(key []byte) (bool, error)

// CreateFunc returns a pointer to an initialized entity that we can potentially
Expand Down Expand Up @@ -129,6 +131,25 @@ func TraverseByPrefix(r storage.Reader, prefix []byte, iterFunc IterationFunc, o
return IterateKeys(r, prefix, prefix, iterFunc, opt)
}

// KeyOnlyIterateFunc returns an IterationFunc that only iterates over keys
func KeyOnlyIterateFunc(fn func(key []byte) error) IterationFunc {
return func() (CheckFunc, CreateFunc, HandleFunc) {
checker := func(key []byte) (bool, error) {
return false, fn(key)
}

create := func() interface{} {
return nil
}

handle := func() error {
return nil
}

return checker, create, handle
}
}

// KeyExists returns true if a key exists in the database.
// When this returned function is executed (and only then), it will write into the `keyExists` whether
// the key exists.
Expand Down
63 changes: 63 additions & 0 deletions storage/operation/reads_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package operation_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation"
"github.com/onflow/flow-go/storage/operation/dbtest"
)

func BenchmarkRetrieve(t *testing.B) {
dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) {
e := Entity{ID: 1337}
require.NoError(t, withWriter(operation.Upsert(e.Key(), e)))

t.ResetTimer()

for i := 0; i < t.N; i++ {
var readBack Entity
require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r))
}
})
}

func BenchmarkNonExist(t *testing.B) {
dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) {
for i := 0; i < t.N; i++ {
e := Entity{ID: uint64(i)}
require.NoError(t, withWriter(operation.Upsert(e.Key(), e)))
}

t.ResetTimer()
nonExist := Entity{ID: uint64(t.N + 1)}
for i := 0; i < t.N; i++ {
var exists bool
require.NoError(t, operation.Exists(nonExist.Key(), &exists)(r))
}
})
}

func BenchmarkIterate(t *testing.B) {
dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) {
prefix1 := []byte("prefix-1")
prefix2 := []byte("prefix-2")
for i := 0; i < t.N; i++ {
e := Entity{ID: uint64(i)}
key1 := append(prefix1, e.Key()...)
key2 := append(prefix2, e.Key()...)

require.NoError(t, withWriter(operation.Upsert(key1, e)))
require.NoError(t, withWriter(operation.Upsert(key2, e)))
}

t.ResetTimer()
var found [][]byte
require.NoError(t, operation.Iterate(prefix1, prefix2, func(key []byte) error {
found = append(found, key)
return nil
})(r), "should iterate forward without error")
})
}
Loading
Loading