Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: sharded key-value store for fix-length blobs #2685

Merged
merged 22 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/sirupsen/logrus v1.6.0
github.com/smartystreets/assertions v1.1.1 // indirect
github.com/spf13/afero v1.3.1 // indirect
github.com/spf13/afero v1.6.0
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/cobra v1.0.0
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand All @@ -69,6 +69,8 @@ require (
resenje.org/web v0.4.3
)

require github.com/libp2p/go-libp2p-yamux v0.6.0

require (
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible // indirect
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
Expand Down Expand Up @@ -119,7 +121,6 @@ require (
github.com/libp2p/go-libp2p-testing v0.5.0 // indirect
github.com/libp2p/go-libp2p-tls v0.3.1 // indirect
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0 // indirect
github.com/libp2p/go-libp2p-yamux v0.6.0 // indirect
github.com/libp2p/go-maddr-filter v0.1.0 // indirect
github.com/libp2p/go-mplex v0.3.0 // indirect
github.com/libp2p/go-msgio v0.1.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1047,8 +1047,8 @@ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.3.1 h1:GPTpEAuNr98px18yNQ66JllNil98wfRZ/5Ukny8FeQA=
github.com/spf13/afero v1.3.1/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4=
github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY=
github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/soc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage"
mockpost "github.com/ethersphere/bee/pkg/postage/mock"
"github.com/ethersphere/bee/pkg/soc"
testingsoc "github.com/ethersphere/bee/pkg/soc/testing"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
)

Expand Down Expand Up @@ -81,7 +81,7 @@ func TestSOC(t *testing.T) {
s := testingsoc.GenerateMockSOC(t, testData)

// modify the sign
sig := make([]byte, soc.SignatureSize)
sig := make([]byte, swarm.SocSignatureSize)
copy(sig, s.Signature)
sig[12] = 0x98
sig[10] = 0x12
Expand Down
95 changes: 95 additions & 0 deletions pkg/localstore/disaster_recovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package localstore

import (
"encoding/binary"
"fmt"

"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/shed"
)

const headerSize = 16 + postage.StampSize

type locOrErr struct {
err error
loc sharky.Location
}

// recovery tries to recover a dirty database.
func recovery(db *DB) (chan locOrErr, error) {
// - go through all retrieval data index entries
// - find all used locations in sharky
// - return them so that sharky can be initialized with them

// first define the index instance
retrievalDataIndex, err := db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Location", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, headerSize)
binary.BigEndian.PutUint64(b[:8], fields.BinID)
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
stamp, err := postage.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary()
if err != nil {
return nil, err
}
copy(b[16:], stamp)
value = append(b, fields.Location...)
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
e.BinID = binary.BigEndian.Uint64(value[:8])
stamp := new(postage.Stamp)
if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil {
return e, err
}
e.BatchID = stamp.BatchID()
e.Index = stamp.Index()
e.Timestamp = stamp.Timestamp()
e.Sig = stamp.Sig()
e.Location = value[headerSize:]
return e, nil
},
})

if err != nil {
return nil, err
}

usedLocations := make(chan locOrErr)

go func() {
defer close(usedLocations)

err := retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) {
loc, err := sharky.LocationFromBinary(item.Location)
if err != nil {
return true, fmt.Errorf("location from binary: %w", err)
}

usedLocations <- locOrErr{
loc: loc,
}

return false, nil
}, nil)
if err != nil {
usedLocations <- locOrErr{
err: fmt.Errorf("iterate index: %w", err),
}
}
}()

return usedLocations, nil
}
47 changes: 47 additions & 0 deletions pkg/localstore/disaster_recovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package localstore

import (
"context"
"testing"

"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)

func TestRecovery(t *testing.T) {
chunkCount := 150

db := newTestDB(t, &Options{
Capacity: 100,
ReserveCapacity: 200,
})

loc, _ := recovery(db)

for range loc {
t.Fatal("not expecting any locations, found at least one")
}

for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(5, 3, 2, false)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
}

loc, _ = recovery(db)

var locationCount int
for range loc {
locationCount++
}

if locationCount != chunkCount {
t.Fatalf("want %d chunks, got %d", chunkCount, locationCount)
}
}
38 changes: 25 additions & 13 deletions pkg/localstore/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"

"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
Expand Down Expand Up @@ -58,30 +59,41 @@ func (db *DB) Export(w io.Writer) (count int64, err error) {

err = db.retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) {

loc, err := sharky.LocationFromBinary(item.Location)
if err != nil {
return false, err
}

data := make([]byte, loc.Length)
err = db.sharky.Read(context.TODO(), loc, data)
if err != nil {
return false, err
}

hdr := &tar.Header{
Name: hex.EncodeToString(item.Address),
Mode: 0644,
Size: int64(postage.StampSize + len(item.Data)),
Size: int64(postage.StampSize + len(data)),
}

if err := tw.WriteHeader(hdr); err != nil {
return false, err
}
if _, err := tw.Write(item.BatchID); err != nil {
return false, err
}
if _, err := tw.Write(item.Index); err != nil {
return false, err
}
if _, err := tw.Write(item.Timestamp); err != nil {
return false, err
}
if _, err := tw.Write(item.Sig); err != nil {
return false, err
write := func(buf []byte) {
if err != nil {
return
}
_, err = tw.Write(buf)
}
if _, err := tw.Write(item.Data); err != nil {
write(item.BatchID)
write(item.Index)
write(item.Timestamp)
write(item.Sig)
write(data)
if err != nil {
return false, err
}

count++
return false, nil
}, nil)
Expand Down
20 changes: 20 additions & 0 deletions pkg/localstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"time"

"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
Expand Down Expand Up @@ -163,6 +164,7 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) {
}

var totalChunksEvicted uint64
locations := make([]sharky.Location, 0, len(candidates))

// get rid of dirty entries
for _, item := range candidates {
Expand All @@ -185,6 +187,12 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) {

totalChunksEvicted++

i, err := db.retrievalDataIndex.Get(item)
if err != nil {
return 0, false, err
}
item.Location = i.Location

db.metrics.GCStoreTimeStamps.Set(float64(item.StoreTimestamp))
db.metrics.GCStoreAccessTimeStamps.Set(float64(item.AccessTimestamp))

Expand Down Expand Up @@ -217,6 +225,11 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) {
if err != nil {
return 0, false, err
}
loc, err := sharky.LocationFromBinary(item.Location)
if err != nil {
return 0, false, err
}
locations = append(locations, loc)
}

db.metrics.GCCommittedCounter.Add(float64(totalChunksEvicted))
Expand All @@ -228,6 +241,13 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) {
return 0, false, err
}

for _, loc := range locations {
err = db.sharky.Release(db.ctx, loc)
if err != nil {
db.logger.Warningf("failed releasing sharky location %+v", loc)
}
}

return totalChunksEvicted, done, nil
}

Expand Down
Loading