Skip to content

Commit

Permalink
Revert badgerDB to levelDB (#113)
Browse files Browse the repository at this point in the history
* Revert "shed: Replace levelDB with badgerDB (#75)"
  • Loading branch information
acud authored Apr 21, 2020
1 parent f3a6d2e commit 7459b90
Show file tree
Hide file tree
Showing 28 changed files with 573 additions and 1,003 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/dist
/.idea
/.vscode
vendor/*

# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
Expand Down
39 changes: 21 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,32 @@ require (
github.com/btcsuite/btcd v0.20.1-beta
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/coreos/go-semver v0.3.0
github.com/dgraph-io/badger/v2 v2.0.3
github.com/ethersphere/bmt v0.1.0
github.com/gogo/protobuf v1.3.1
github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.4
github.com/libp2p/go-libp2p v0.7.4
github.com/gorilla/mux v1.7.3
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/libp2p/go-libp2p v0.5.1
github.com/libp2p/go-libp2p-autonat-svc v0.1.0
github.com/libp2p/go-libp2p-core v0.5.1
github.com/libp2p/go-libp2p-peerstore v0.2.3
github.com/libp2p/go-libp2p-quic-transport v0.3.3
github.com/libp2p/go-tcp-transport v0.2.0
github.com/libp2p/go-ws-transport v0.3.0
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multistream v0.1.1
github.com/libp2p/go-libp2p-core v0.3.0
github.com/libp2p/go-libp2p-peerstore v0.1.4
github.com/libp2p/go-libp2p-quic-transport v0.2.2
github.com/libp2p/go-tcp-transport v0.1.1
github.com/libp2p/go-ws-transport v0.2.0
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/multiformats/go-multiaddr v0.2.0
github.com/multiformats/go-multistream v0.1.0
github.com/opentracing/opentracing-go v1.1.0
github.com/prometheus/client_golang v1.5.1
github.com/sirupsen/logrus v1.5.0
github.com/spf13/cobra v0.0.7
github.com/spf13/viper v1.6.3
github.com/syndtr/goleveldb v1.0.0
github.com/prometheus/client_golang v1.3.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cobra v0.0.5
github.com/spf13/viper v1.6.2
github.com/syndtr/goleveldb v1.0.1-0.20190923125748-758128399b1d
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
golang.org/x/crypto v0.0.0-20200406173513-056763e48d71
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
resenje.org/web v0.4.3
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
resenje.org/web v0.4.0
)
425 changes: 120 additions & 305 deletions go.sum

Large diffs are not rendered by default.

27 changes: 11 additions & 16 deletions pkg/localstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"errors"
"time"

"github.com/dgraph-io/badger/v2"
"github.com/ethersphere/bee/pkg/shed"
"github.com/syndtr/goleveldb/leveldb"
)

var (
Expand Down Expand Up @@ -85,7 +85,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
}
}()

batch := db.shed.GetBatch(true)
batch := new(leveldb.Batch)
target := db.gcTarget()

// protect database from changing idexes and gcSize
Expand Down Expand Up @@ -146,10 +146,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
}
db.metrics.GCCollectedCounter.Inc()

if err := db.gcSize.PutInBatch(batch, gcSize-collectedCount); err != nil {
return 0, false, err
}

db.gcSize.PutInBatch(batch, gcSize-collectedCount)
err = db.shed.WriteBatch(batch)
if err != nil {
db.metrics.GCExcludeWriteBatchError.Inc()
Expand All @@ -168,7 +165,7 @@ func (db *DB) removeChunksInExcludeIndexFromGC() (err error) {
}
}()

batch := db.shed.GetBatch(true)
batch := new(leveldb.Batch)
excludedCount := 0
var gcSizeChange int64
err = db.gcExcludeIndex.Iterate(func(item shed.Item) (stop bool, err error) {
Expand Down Expand Up @@ -247,18 +244,18 @@ func (db *DB) triggerGarbageCollection() {
// incGCSizeInBatch changes gcSize field value
// by change which can be negative. This function
// must be called under batchMu lock.
func (db *DB) incGCSizeInBatch(batch *badger.Txn, change int64) (err error) {
func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) {
if change == 0 {
return nil
}
gcSize, err := db.gcSize.Get()
if err != nil && !errors.Is(err, shed.ErrNotFound) {
if err != nil && !errors.Is(err, leveldb.ErrNotFound) {
return err
}

var new uint64
var newSize uint64
if change > 0 {
new = gcSize + uint64(change)
newSize = gcSize + uint64(change)
} else {
// 'change' is an int64 and is negative
// a conversion is needed with correct sign
Expand All @@ -267,14 +264,12 @@ func (db *DB) incGCSizeInBatch(batch *badger.Txn, change int64) (err error) {
// protect uint64 undeflow
return nil
}
new = gcSize - c
}
if err := db.gcSize.PutInBatch(batch, new); err != nil {
return err
newSize = gcSize - c
}
db.gcSize.PutInBatch(batch, newSize)

// trigger garbage collection if we reached the capacity
if new >= db.capacity {
if newSize >= db.capacity {
db.triggerGarbageCollection()
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/localstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

// TestDB_collectGarbageWorker tests garbage collection runs
Expand Down Expand Up @@ -237,7 +238,7 @@ func TestPinGC(t *testing.T) {
t.Run("first chunks after pinned chunks should be removed", func(t *testing.T) {
for i := pinChunksCount; i < (int(dbCapacity) - int(gcTarget)); i++ {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
if err != storage.ErrNotFound {
if err != leveldb.ErrNotFound {
t.Fatal(err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/prometheus/client_golang/prometheus"
"github.com/syndtr/goleveldb/leveldb"
)

var _ storage.Storer = &DB{}
Expand Down Expand Up @@ -191,7 +192,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return nil, err
}
schemaName, err := db.schemaName.Get()
if err != nil && !errors.Is(err, shed.ErrNotFound) {
if err != nil && !errors.Is(err, leveldb.ErrNotFound) {
return nil, err
}
if schemaName == "" {
Expand Down
3 changes: 2 additions & 1 deletion pkg/localstore/localstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethersphere/bee/pkg/storage"
chunktesting "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

func init() {
Expand Down Expand Up @@ -245,7 +246,7 @@ func newRetrieveIndexesTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTim
validateItem(t, item, chunk.Address().Bytes(), chunk.Data(), storeTimestamp, 0)

// access index should not be set
wantErr := shed.ErrNotFound
wantErr := leveldb.ErrNotFound
_, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
if err != wantErr {
t.Errorf("got error %v, want %v", err, wantErr)
Expand Down
7 changes: 4 additions & 3 deletions pkg/localstore/mode_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

// Get returns a chunk from the database. If the chunk is
Expand All @@ -42,7 +43,7 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address)

out, err := db.get(mode, addr)
if err != nil {
if err == shed.ErrNotFound {
if err == leveldb.ErrNotFound {
return nil, storage.ErrNotFound
}
return nil, err
Expand Down Expand Up @@ -123,15 +124,15 @@ func (db *DB) updateGC(item shed.Item) (err error) {
db.batchMu.Lock()
defer db.batchMu.Unlock()

batch := db.shed.GetBatch(true)
batch := new(leveldb.Batch)

// update accessTimeStamp in retrieve, gc

i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
item.AccessTimestamp = i.AccessTimestamp
case shed.ErrNotFound:
case leveldb.ErrNotFound:
// no chunk accesses
default:
return err
Expand Down
3 changes: 2 additions & 1 deletion pkg/localstore/mode_get_multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

// GetMulti returns chunks from the database. If one of the chunks is not found
Expand All @@ -41,7 +42,7 @@ func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm

out, err := db.getMulti(mode, addrs...)
if err != nil {
if err == shed.ErrNotFound {
if err == leveldb.ErrNotFound {
return nil, storage.ErrNotFound
}
return nil, err
Expand Down
20 changes: 9 additions & 11 deletions pkg/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"context"
"time"

"github.com/dgraph-io/badger/v2"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

// Put stores Chunks to database and depending
Expand Down Expand Up @@ -55,7 +55,7 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
db.batchMu.Lock()
defer db.batchMu.Unlock()

batch := db.shed.GetBatch(true)
batch := new(leveldb.Batch)

// variables that provide information for operations
// to be done after write batch function successfully executes
Expand Down Expand Up @@ -130,9 +130,7 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
}

for po, id := range binIDs {
if err := db.binIDs.PutInBatch(batch, uint64(po), id); err != nil {
return nil, err
}
db.binIDs.PutInBatch(batch, uint64(po), id)
}

err = db.incGCSizeInBatch(batch, gcSizeChange)
Expand All @@ -159,14 +157,14 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
// - it does not enter the syncpool
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putRequest(batch *badger.Txn, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
i, err := db.retrievalDataIndex.Get(item)
switch err {
case nil:
exists = true
item.StoreTimestamp = i.StoreTimestamp
item.BinID = i.BinID
case shed.ErrNotFound:
case leveldb.ErrNotFound:
// no chunk accesses
exists = false
default:
Expand Down Expand Up @@ -199,7 +197,7 @@ func (db *DB) putRequest(batch *badger.Txn, binIDs map[uint8]uint64, item shed.I
// - put to indexes: retrieve, push, pull
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putUpload(batch *badger.Txn, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
exists, err = db.retrievalDataIndex.Has(item)
if err != nil {
return false, 0, err
Expand Down Expand Up @@ -261,7 +259,7 @@ func (db *DB) putUpload(batch *badger.Txn, binIDs map[uint8]uint64, item shed.It
// - put to indexes: retrieve, pull
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putSync(batch *badger.Txn, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
exists, err = db.retrievalDataIndex.Has(item)
if err != nil {
return false, 0, err
Expand Down Expand Up @@ -311,7 +309,7 @@ func (db *DB) putSync(batch *badger.Txn, binIDs map[uint8]uint64, item shed.Item
// a chunk is added to a node's localstore and given that the chunk is
// already within that node's NN (thus, it can be added to the gc index
// safely)
func (db *DB) setGC(batch *badger.Txn, item shed.Item) (gcSizeChange int64, err error) {
func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) {
if item.BinID == 0 {
i, err := db.retrievalDataIndex.Get(item)
if err != nil {
Expand All @@ -328,7 +326,7 @@ func (db *DB) setGC(batch *badger.Txn, item shed.Item) (gcSizeChange int64, err
return 0, err
}
gcSizeChange--
case shed.ErrNotFound:
case leveldb.ErrNotFound:
// the chunk is not accessed before
default:
return 0, err
Expand Down
6 changes: 3 additions & 3 deletions pkg/localstore/mode_put_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"testing"
"time"

"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

// TestModePutRequest validates ModePutRequest index values on the provided DB.
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestModePut_addToGc(t *testing.T) {
binIDs[po]++
var wantErr error
if !m.putToGc {
wantErr = shed.ErrNotFound
wantErr = leveldb.ErrNotFound
}
newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp)
newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po], wantErr)(t)
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestModePut_addToGcExisting(t *testing.T) {
binIDs[po]++
var wantErr error
if !m.putToGc {
wantErr = shed.ErrNotFound
wantErr = leveldb.ErrNotFound
}

newRetrieveIndexesTestWithAccess(db, ch, wantStoreTimestamp, wantAccessTimestamp)
Expand Down
Loading

0 comments on commit 7459b90

Please sign in to comment.