diff --git a/go.mod b/go.mod index f0c763e091b..42fea993cbf 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/prometheus/client_golang v1.11.0 github.com/sirupsen/logrus v1.6.0 github.com/smartystreets/assertions v1.1.1 // indirect - github.com/spf13/afero v1.3.1 // indirect + github.com/spf13/afero v1.6.0 github.com/spf13/cast v1.3.1 // indirect github.com/spf13/cobra v1.0.0 github.com/spf13/jwalterweatherman v1.1.0 // indirect @@ -69,6 +69,8 @@ require ( resenje.org/web v0.4.3 ) +require github.com/libp2p/go-libp2p-yamux v0.6.0 + require ( github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible // indirect github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect @@ -119,7 +121,6 @@ require ( github.com/libp2p/go-libp2p-testing v0.5.0 // indirect github.com/libp2p/go-libp2p-tls v0.3.1 // indirect github.com/libp2p/go-libp2p-transport-upgrader v0.5.0 // indirect - github.com/libp2p/go-libp2p-yamux v0.6.0 // indirect github.com/libp2p/go-maddr-filter v0.1.0 // indirect github.com/libp2p/go-mplex v0.3.0 // indirect github.com/libp2p/go-msgio v0.1.0 // indirect diff --git a/go.sum b/go.sum index 78e474410c7..915d788c52d 100644 --- a/go.sum +++ b/go.sum @@ -1047,8 +1047,8 @@ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= -github.com/spf13/afero v1.3.1 h1:GPTpEAuNr98px18yNQ66JllNil98wfRZ/5Ukny8FeQA= -github.com/spf13/afero v1.3.1/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4= +github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY= +github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= diff --git a/pkg/api/soc_test.go b/pkg/api/soc_test.go index 61d8ab741a3..42ee5034ed0 100644 --- a/pkg/api/soc_test.go +++ b/pkg/api/soc_test.go @@ -19,10 +19,10 @@ import ( "github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/postage" mockpost "github.com/ethersphere/bee/pkg/postage/mock" - "github.com/ethersphere/bee/pkg/soc" testingsoc "github.com/ethersphere/bee/pkg/soc/testing" statestore "github.com/ethersphere/bee/pkg/statestore/mock" "github.com/ethersphere/bee/pkg/storage/mock" + "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/tags" ) @@ -81,7 +81,7 @@ func TestSOC(t *testing.T) { s := testingsoc.GenerateMockSOC(t, testData) // modify the sign - sig := make([]byte, soc.SignatureSize) + sig := make([]byte, swarm.SocSignatureSize) copy(sig, s.Signature) sig[12] = 0x98 sig[10] = 0x12 diff --git a/pkg/localstore/disaster_recovery.go b/pkg/localstore/disaster_recovery.go new file mode 100644 index 00000000000..e21f7503fc0 --- /dev/null +++ b/pkg/localstore/disaster_recovery.go @@ -0,0 +1,95 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package localstore + +import ( + "encoding/binary" + "fmt" + + "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/sharky" + "github.com/ethersphere/bee/pkg/shed" +) + +const headerSize = 16 + postage.StampSize + +type locOrErr struct { + err error + loc sharky.Location +} + +// recovery tries to recover a dirty database. +func recovery(db *DB) (chan locOrErr, error) { + // - go through all retrieval data index entries + // - find all used locations in sharky + // - return them so that sharky can be initialized with them + + // first define the index instance + retrievalDataIndex, err := db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Location", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + b := make([]byte, headerSize) + binary.BigEndian.PutUint64(b[:8], fields.BinID) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + stamp, err := postage.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary() + if err != nil { + return nil, err + } + copy(b[16:], stamp) + value = append(b, fields.Location...) + return value, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.BinID = binary.BigEndian.Uint64(value[:8]) + stamp := new(postage.Stamp) + if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil { + return e, err + } + e.BatchID = stamp.BatchID() + e.Index = stamp.Index() + e.Timestamp = stamp.Timestamp() + e.Sig = stamp.Sig() + e.Location = value[headerSize:] + return e, nil + }, + }) + + if err != nil { + return nil, err + } + + usedLocations := make(chan locOrErr) + + go func() { + defer close(usedLocations) + + err := retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) { + loc, err := sharky.LocationFromBinary(item.Location) + if err != nil { + return true, fmt.Errorf("location from binary: %w", err) + } + + usedLocations <- locOrErr{ + loc: loc, + } + + return false, nil + }, nil) + if err != nil { + usedLocations <- locOrErr{ + err: fmt.Errorf("iterate index: %w", err), + } + } + }() + + return usedLocations, nil +} diff --git a/pkg/localstore/disaster_recovery_test.go b/pkg/localstore/disaster_recovery_test.go new file mode 100644 index 00000000000..885904b5ceb --- /dev/null +++ b/pkg/localstore/disaster_recovery_test.go @@ -0,0 +1,47 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package localstore + +import ( + "context" + "testing" + + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/swarm" +) + +func TestRecovery(t *testing.T) { + chunkCount := 150 + + db := newTestDB(t, &Options{ + Capacity: 100, + ReserveCapacity: 200, + }) + + loc, _ := recovery(db) + + for range loc { + t.Fatal("not expecting any locations, found at least one") + } + + for i := 0; i < chunkCount; i++ { + ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(5, 3, 2, false) + _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + if err != nil { + t.Fatal(err) + } + } + + loc, _ = recovery(db) + + var locationCount int + for range loc { + locationCount++ + } + + if locationCount != chunkCount { + t.Fatalf("want %d chunks, got %d", chunkCount, locationCount) + } +} diff --git a/pkg/localstore/export.go b/pkg/localstore/export.go index 23b359d7d24..4c62634fd2b 100644 --- a/pkg/localstore/export.go +++ b/pkg/localstore/export.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" @@ -58,30 +59,41 @@ func (db *DB) Export(w io.Writer) (count int64, err error) { err = db.retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) { + loc, err := sharky.LocationFromBinary(item.Location) + if err != nil { + return false, err + } + + data := make([]byte, loc.Length) + err = db.sharky.Read(context.TODO(), loc, data) + if err != nil { + return false, err + } + hdr := &tar.Header{ Name: hex.EncodeToString(item.Address), Mode: 0644, - Size: int64(postage.StampSize + len(item.Data)), + Size: int64(postage.StampSize + len(data)), } if err := tw.WriteHeader(hdr); err != nil { return false, err } - if _, err := tw.Write(item.BatchID); err != nil { - return false, err - } - if _, err := tw.Write(item.Index); err != nil { - return false, err - } - if _, err := tw.Write(item.Timestamp); err != nil { - return false, err - } - if _, err := tw.Write(item.Sig); err != nil { - return false, err + write := func(buf []byte) { + if err != nil { + return + } + _, err = tw.Write(buf) } - if _, err := tw.Write(item.Data); err != nil { + write(item.BatchID) + write(item.Index) + write(item.Timestamp) + write(item.Sig) + write(data) + if err != nil { return false, err } + count++ return false, nil }, nil) diff --git a/pkg/localstore/gc.go b/pkg/localstore/gc.go index e04271d7a82..480c8b04931 100644 --- a/pkg/localstore/gc.go +++ b/pkg/localstore/gc.go @@ -20,6 +20,7 @@ import ( "errors" "time" + "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/swarm" "github.com/syndtr/goleveldb/leveldb" @@ -163,6 +164,7 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { } var totalChunksEvicted uint64 + locations := make([]sharky.Location, 0, len(candidates)) // get rid of dirty entries for _, item := range candidates { @@ -185,6 +187,12 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { totalChunksEvicted++ + i, err := db.retrievalDataIndex.Get(item) + if err != nil { + return 0, false, err + } + item.Location = i.Location + db.metrics.GCStoreTimeStamps.Set(float64(item.StoreTimestamp)) db.metrics.GCStoreAccessTimeStamps.Set(float64(item.AccessTimestamp)) @@ -217,6 +225,11 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { if err != nil { return 0, false, err } + loc, err := sharky.LocationFromBinary(item.Location) + if err != nil { + return 0, false, err + } + locations = append(locations, loc) } db.metrics.GCCommittedCounter.Add(float64(totalChunksEvicted)) @@ -228,6 +241,13 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { return 0, false, err } + for _, loc := range locations { + err = db.sharky.Release(db.ctx, loc) + if err != nil { + db.logger.Warningf("failed releasing sharky location %+v", loc) + } + } + return totalChunksEvicted, done, nil } diff --git a/pkg/localstore/localstore.go b/pkg/localstore/localstore.go index c52e997d842..b9416fa5783 100644 --- a/pkg/localstore/localstore.go +++ b/pkg/localstore/localstore.go @@ -17,9 +17,13 @@ package localstore import ( + "context" "encoding/binary" "errors" + "fmt" + "io/fs" "os" + "path/filepath" "runtime/pprof" "sync" "time" @@ -28,12 +32,14 @@ import ( "github.com/ethersphere/bee/pkg/pinning" "github.com/ethersphere/bee/pkg/postage" "github.com/ethersphere/bee/pkg/postage/batchstore" + "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/tags" "github.com/hashicorp/go-multierror" "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/afero" "github.com/syndtr/goleveldb/leveldb" ) @@ -58,10 +64,19 @@ var ( flipFlopWorstCaseDuration = 10 * time.Second ) +const ( + sharkyNoOfShards = 32 + sharkyDirtyFileName = ".DIRTY" +) + // DB is the local store implementation and holds // database related objects. type DB struct { shed *shed.DB + // sharky instance + sharky *sharky.Store + fdirtyCloser func() error + tags *tags.Tags // stateStore is needed to access the pinning Service.Pins() method. @@ -104,7 +119,7 @@ type DB struct { // postage index index postageIndexIndex shed.Index - // field that stores number of intems in gc index + // field that stores number of items in gc index gcSize shed.Uint64Field // field that stores the size of the reserve @@ -151,6 +166,11 @@ type DB struct { // to terminate other goroutines close chan struct{} + // context + ctx context.Context + // the cancelation function from the context + cancel context.CancelFunc + // protect Close method from exiting before // garbage collection and gc size write workers // are done @@ -195,6 +215,63 @@ type Options struct { Tags *tags.Tags } +type memFS struct { + afero.Fs +} + +func (m *memFS) Open(path string) (fs.File, error) { + return m.Fs.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) +} + +type dirFS struct { + basedir string +} + +func (d *dirFS) Open(path string) (fs.File, error) { + return os.OpenFile(filepath.Join(d.basedir, path), os.O_RDWR|os.O_CREATE, 0644) +} + +func safeInit(rootPath, sharkyBasePath string, db *DB) error { + // create if needed + path := filepath.Join(rootPath, sharkyDirtyFileName) + if _, err := os.Stat(path); errors.Is(err, fs.ErrNotExist) { + // missing lock file implies a clean exit then create the file and return + return os.WriteFile(path, []byte{}, 0644) + } + locOrErr, err := recovery(db) + if err != nil { + return err + } + + recoverySharky, err := sharky.NewRecovery(sharkyBasePath, sharkyNoOfShards, swarm.SocMaxChunkSize) + if err != nil { + return err + } + + for l := range locOrErr { + if l.err != nil { + return l.err + } + + err = recoverySharky.Add(l.loc) + if err != nil { + return err + } + } + + err = recoverySharky.Save() + if err != nil { + return err + } + + err = recoverySharky.Close() + if err != nil { + return err + } + + return nil +} + // New returns a new DB. All fields and indexes are initialized // and possible conflicts with schema from existing database is checked. // One goroutine for writing batches is created. @@ -207,6 +284,8 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger } } + ctx, cancel := context.WithCancel(context.Background()) + db = &DB{ stateStore: ss, cacheCapacity: o.Capacity, @@ -214,6 +293,8 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger unreserveFunc: o.UnreserveFunc, baseKey: baseKey, tags: o.Tags, + ctx: ctx, + cancel: cancel, // channel collectGarbageTrigger // needs to be buffered with the size of 1 // to signal another event if it @@ -258,6 +339,33 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger return nil, err } + // instantiate sharky instance + var sharkyBase fs.FS + if path == "" { + // No need for recovery for in-mem sharky + sharkyBase = &memFS{Fs: afero.NewMemMapFs()} + } else { + sharkyBasePath := filepath.Join(path, "sharky") + if _, err := os.Stat(sharkyBasePath); os.IsNotExist(err) { + err := os.Mkdir(sharkyBasePath, 0775) + if err != nil { + return nil, err + } + } + sharkyBase = &dirFS{basedir: sharkyBasePath} + + err = safeInit(path, sharkyBasePath, db) + if err != nil { + return nil, fmt.Errorf("safe sharky initialization failed: %w", err) + } + db.fdirtyCloser = func() error { return os.Remove(filepath.Join(path, sharkyDirtyFileName)) } + } + + db.sharky, err = sharky.New(sharkyBase, sharkyNoOfShards, swarm.SocMaxChunkSize) + if err != nil { + return nil, err + } + // Identify current storage schema by arbitrary name. db.schemaName, err = db.shed.NewStringField("schema-name") if err != nil { @@ -274,10 +382,9 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger return nil, err } } else { - // execute possible migrations - err = db.migrate(schemaName) - if err != nil { - return nil, multierror.Append(err, db.shed.Close()) + // Execute possible migrations. + if err := db.migrate(schemaName); err != nil { + return nil, multierror.Append(err, db.sharky.Close(), db.shed.Close(), db.fdirtyCloser()) } } @@ -295,7 +402,7 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger // Index storing actual chunk address, data and bin id. headerSize := 16 + postage.StampSize - db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Data", shed.IndexFuncs{ + db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Location", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { return fields.Address, nil }, @@ -312,7 +419,7 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger return nil, err } copy(b[16:], stamp) - value = append(b, fields.Data...) + value = append(b, fields.Location...) return value, nil }, DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { @@ -326,7 +433,7 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger e.Index = stamp.Index() e.Timestamp = stamp.Timestamp() e.Sig = stamp.Sig() - e.Data = value[headerSize:] + e.Location = value[headerSize:] return e, nil }, }) @@ -557,8 +664,9 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger } // Close closes the underlying database. -func (db *DB) Close() (err error) { +func (db *DB) Close() error { close(db.close) + db.cancel() // wait for all handlers to finish done := make(chan struct{}) @@ -570,6 +678,9 @@ func (db *DB) Close() (err error) { <-db.reserveEvictionWorkerDone close(done) }() + + err := new(multierror.Error) + select { case <-done: case <-time.After(5 * time.Second): @@ -577,12 +688,15 @@ func (db *DB) Close() (err error) { // Print a full goroutine dump to debug blocking. // TODO: use a logger to write a goroutine profile prof := pprof.Lookup("goroutine") - err = prof.WriteTo(os.Stdout, 2) - if err != nil { - return err - } + err = multierror.Append(err, prof.WriteTo(os.Stdout, 2)) } - return db.shed.Close() + + err = multierror.Append(err, db.sharky.Close()) + err = multierror.Append(err, db.shed.Close()) + if db.fdirtyCloser != nil { + err = multierror.Append(err, db.fdirtyCloser()) + } + return err.ErrorOrNil() } // po computes the proximity order between the address @@ -691,5 +805,4 @@ func init() { func totalTimeMetric(metric prometheus.Counter, start time.Time) { totalTime := time.Since(start) metric.Add(float64(totalTime)) - } diff --git a/pkg/localstore/localstore_test.go b/pkg/localstore/localstore_test.go index e4800b59328..f7d3da3ef63 100644 --- a/pkg/localstore/localstore_test.go +++ b/pkg/localstore/localstore_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" chunktesting "github.com/ethersphere/bee/pkg/storage/testing" @@ -157,6 +158,14 @@ func newTestDB(t testing.TB, o *Options) *DB { if _, err := rand.Read(baseKey); err != nil { t.Fatal(err) } + if o == nil { + o = &Options{} + } + if o.UnreserveFunc == nil { + o.UnreserveFunc = func(postage.UnreserveIteratorFn) error { + return nil + } + } logger := logging.New(io.Discard, 0) db, err := New("", baseKey, nil, o, logger) if err != nil { @@ -253,7 +262,8 @@ func newRetrieveIndexesTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTim if err != nil { t.Fatal(err) } - validateItem(t, item, chunk.Address().Bytes(), chunk.Data(), storeTimestamp, 0, chunk.Stamp()) + validateItem(t, item, chunk.Address().Bytes(), storeTimestamp, 0, chunk.Stamp()) + validateData(t, db, item, chunk.Data()) // access index should not be set wantErr := leveldb.ErrNotFound @@ -281,7 +291,8 @@ func newRetrieveIndexesTestWithAccess(db *DB, ch swarm.Chunk, storeTimestamp, ac t.Fatal(err) } } - validateItem(t, item, ch.Address().Bytes(), ch.Data(), storeTimestamp, accessTimestamp, ch.Stamp()) + validateItem(t, item, ch.Address().Bytes(), storeTimestamp, accessTimestamp, ch.Stamp()) + validateData(t, db, item, ch.Data()) } } @@ -299,7 +310,7 @@ func newPullIndexTest(db *DB, ch swarm.Chunk, binID uint64, wantError error) fun t.Errorf("got error %v, want %v", err, wantError) } if err == nil { - validateItem(t, item, ch.Address().Bytes(), nil, 0, 0, postage.NewStamp(ch.Stamp().BatchID(), nil, nil, nil)) + validateItem(t, item, ch.Address().Bytes(), 0, 0, postage.NewStamp(ch.Stamp().BatchID(), nil, nil, nil)) } } } @@ -318,7 +329,7 @@ func newPushIndexTest(db *DB, ch swarm.Chunk, storeTimestamp int64, wantError er t.Errorf("got error %v, want %v", err, wantError) } if err == nil { - validateItem(t, item, ch.Address().Bytes(), nil, storeTimestamp, 0, postage.NewStamp(nil, nil, nil, nil)) + validateItem(t, item, ch.Address().Bytes(), storeTimestamp, 0, postage.NewStamp(nil, nil, nil, nil)) } } } @@ -338,7 +349,7 @@ func newGCIndexTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTimestamp i t.Errorf("got error %v, want %v", err, wantError) } if err == nil { - validateItem(t, item, chunk.Address().Bytes(), nil, 0, accessTimestamp, stamp) + validateItem(t, item, chunk.Address().Bytes(), 0, accessTimestamp, stamp) } } } @@ -356,7 +367,7 @@ func newPinIndexTest(db *DB, chunk swarm.Chunk, wantError error) func(t *testing t.Errorf("got error %v, want %v", err, wantError) } if err == nil { - validateItem(t, item, chunk.Address().Bytes(), nil, 0, 0, postage.NewStamp(nil, nil, nil, nil)) + validateItem(t, item, chunk.Address().Bytes(), 0, 0, postage.NewStamp(nil, nil, nil, nil)) } } } @@ -455,15 +466,12 @@ func testItemsOrder(t *testing.T, i shed.Index, chunks []testIndexChunk, sortFun } // validateItem is a helper function that checks Item values. -func validateItem(t *testing.T, item shed.Item, address, data []byte, storeTimestamp, accessTimestamp int64, stamp swarm.Stamp) { +func validateItem(t *testing.T, item shed.Item, address []byte, storeTimestamp, accessTimestamp int64, stamp swarm.Stamp) { t.Helper() if !bytes.Equal(item.Address, address) { t.Errorf("got item address %x, want %x", item.Address, address) } - if !bytes.Equal(item.Data, data) { - t.Errorf("got item data %x, want %x", item.Data, data) - } if item.StoreTimestamp != storeTimestamp { t.Errorf("got item store timestamp %v, want %v", item.StoreTimestamp, storeTimestamp) } @@ -478,6 +486,23 @@ func validateItem(t *testing.T, item shed.Item, address, data []byte, storeTimes } } +func validateData(t *testing.T, db *DB, item shed.Item, data []byte) { + t.Helper() + + loc, err := sharky.LocationFromBinary(item.Location) + if err != nil { + t.Fatal("failed reading sharky location", err) + } + buf := make([]byte, loc.Length) + err = db.sharky.Read(context.TODO(), loc, buf) + if err != nil { + t.Fatal("failed reading data from sharky", err) + } + if !bytes.Equal(buf, data) { + t.Errorf("got item data %x, want %x", buf, data) + } +} + // setNow replaces now function and // returns a function that will reset it to the // value before the change. diff --git a/pkg/localstore/metrics.go b/pkg/localstore/metrics.go index 39a878db07a..415e2d68c86 100644 --- a/pkg/localstore/metrics.go +++ b/pkg/localstore/metrics.go @@ -383,5 +383,6 @@ func newMetrics() metrics { } func (db *DB) Metrics() []prometheus.Collector { - return m.PrometheusCollectorsFromFields(db.metrics) + componentMetrics := append(db.sharky.Metrics(), db.shed.Metrics()...) + return append(m.PrometheusCollectorsFromFields(db.metrics), componentMetrics...) } diff --git a/pkg/localstore/migration.go b/pkg/localstore/migration.go index 9b2ffd582ab..6227ac480a4 100644 --- a/pkg/localstore/migration.go +++ b/pkg/localstore/migration.go @@ -41,6 +41,7 @@ var schemaMigrations = []migration{ {schemaName: DBSchemaYuj, fn: migrateYuj}, {schemaName: DBSchemaBatchIndex, fn: migrateBatchIndex}, {schemaName: DBSchemaDeadPush, fn: migrateDeadPush}, + {schemaName: DBSchemaSharky, fn: migrateSharky}, } func (db *DB) migrate(schemaName string) error { @@ -54,6 +55,7 @@ func (db *DB) migrate(schemaName string) error { } db.logger.Infof("localstore migration: need to run %v data migrations on localstore to schema %s", len(migrations), schemaName) + db.logger.Info("localstore migration: warning: if one of the migration fails it wouldn't be possible to downgrade back to the old version") for i, migration := range migrations { if err := migration.fn(db); err != nil { return err diff --git a/pkg/localstore/migration_sharky.go b/pkg/localstore/migration_sharky.go new file mode 100644 index 00000000000..0a8fefb6ba1 --- /dev/null +++ b/pkg/localstore/migration_sharky.go @@ -0,0 +1,215 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package localstore + +import ( + "context" + "encoding/binary" + "fmt" + "time" + + "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/sharky" + "github.com/ethersphere/bee/pkg/shed" + "github.com/hashicorp/go-multierror" + "github.com/syndtr/goleveldb/leveldb" +) + +// DBSchemaSharky is the bee schema identifier for sharky. +const DBSchemaSharky = "sharky" + +// migrateSharky writes the new retrievalDataIndex format by storing chunk data in sharky +func migrateSharky(db *DB) error { + db.logger.Info("starting sharky migration; have patience, this might take a while...") + var ( + start = time.Now() + batch = new(leveldb.Batch) + batchSize = 10000 + batchesCount = 0 + headerSize = 16 + postage.StampSize + compactionRate = 100 + compactionSize = batchSize * compactionRate + ) + + compaction := func(start, end []byte) (time.Duration, error) { + compactStart := time.Now() + if err := db.shed.Compact(start, end); err != nil { + return 0, fmt.Errorf("leveldb compaction failed: %w", err) + } + return time.Since(compactStart), nil + } + + retrievalDataIndex, err := db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Data", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + b := make([]byte, headerSize) + binary.BigEndian.PutUint64(b[:8], fields.BinID) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + stamp, err := postage.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary() + if err != nil { + return nil, err + } + copy(b[16:], stamp) + value = append(b, fields.Data...) + return value, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.BinID = binary.BigEndian.Uint64(value[:8]) + stamp := new(postage.Stamp) + if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil { + return e, err + } + e.BatchID = stamp.BatchID() + e.Index = stamp.Index() + e.Timestamp = stamp.Timestamp() + e.Sig = stamp.Sig() + e.Data = value[headerSize:] + return e, nil + }, + }) + if err != nil { + return err + } + + newRetrievalDataIndex, err := db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Location", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + b := make([]byte, headerSize) + binary.BigEndian.PutUint64(b[:8], fields.BinID) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + stamp, err := postage.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary() + if err != nil { + return nil, err + } + copy(b[16:], stamp) + value = append(b, fields.Location...) + return value, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.BinID = binary.BigEndian.Uint64(value[:8]) + stamp := new(postage.Stamp) + if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil { + return e, err + } + e.BatchID = stamp.BatchID() + e.Index = stamp.Index() + e.Timestamp = stamp.Timestamp() + e.Sig = stamp.Sig() + e.Location = value[headerSize:] + return e, nil + }, + }) + if err != nil { + return err + } + + db.gcSize, err = db.shed.NewUint64Field("gc-size") + if err != nil { + return err + } + + db.reserveSize, err = db.shed.NewUint64Field("reserve-size") + if err != nil { + return err + } + + var ( + compactionTime time.Duration + dirtyLocations []sharky.Location + compactStart, compactEnd *shed.Item + ) + + db.logger.Debugf("starting to move entries with batch size %d", batchSize) + for { + isBatchEmpty := true + + err = retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) { + if compactStart == nil { + compactStart = &item + } + loc, err := db.sharky.Write(context.TODO(), item.Data) + if err != nil { + return false, err + } + dirtyLocations = append(dirtyLocations, loc) + item.Location, err = loc.MarshalBinary() + if err != nil { + return false, err + } + if err = newRetrievalDataIndex.PutInBatch(batch, item); err != nil { + return false, err + } + if err = retrievalDataIndex.DeleteInBatch(batch, item); err != nil { + return false, err + } + batchesCount++ + isBatchEmpty = false + if batchesCount%batchSize == 0 { + compactEnd = &item + db.logger.Debugf("collected %d entries; trying to flush...", batchSize) + return true, nil + } + return false, nil + }, nil) + if err != nil { + return fmt.Errorf("iterate index: %w", err) + } + + if isBatchEmpty { + break + } + + if err := db.shed.WriteBatch(batch); err != nil { + for _, loc := range dirtyLocations { + err = multierror.Append(err, db.sharky.Release(context.TODO(), loc)) + } + return fmt.Errorf("write batch: %w", err) + } + dirtyLocations = nil + db.logger.Debugf("flush ok; progress so far: %d chunks", batchesCount) + batch.Reset() + + if batchesCount%compactionSize == 0 { + db.logger.Debugf("starting intermediate compaction") + + // the items are references from the iteration so encoding should be error-free + start, _ := retrievalDataIndex.ItemKey(*compactStart) + end, _ := retrievalDataIndex.ItemKey(*compactEnd) + + dur, err := compaction(start, end) + if err != nil { + return err + } + compactionTime += dur + db.logger.Debugf("intermediate compaction done %s", dur) + compactStart = nil + } + } + + // do a full compaction at the end + dur, err := compaction(nil, nil) + if err != nil { + return err + } + compactionTime += dur + + db.logger.Debugf("leveldb compaction took: %v", compactionTime) + db.logger.Infof("done migrating to sharky; it took me %s to move %d chunks.", time.Since(start), batchesCount) + return nil +} diff --git a/pkg/localstore/mode_get.go b/pkg/localstore/mode_get.go index 66bc807a6b2..a6182da8400 100644 --- a/pkg/localstore/mode_get.go +++ b/pkg/localstore/mode_get.go @@ -22,6 +22,7 @@ import ( "time" "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" @@ -43,7 +44,7 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) } }() - out, err := db.get(mode, addr) + out, err := db.get(ctx, mode, addr) if err != nil { if errors.Is(err, leveldb.ErrNotFound) { return nil, storage.ErrNotFound @@ -56,13 +57,25 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) // get returns Item from the retrieval index // and updates other indexes. -func (db *DB) get(mode storage.ModeGet, addr swarm.Address) (out shed.Item, err error) { +func (db *DB) get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (out shed.Item, err error) { item := addressToItem(addr) out, err = db.retrievalDataIndex.Get(item) if err != nil { return out, err } + + l, err := sharky.LocationFromBinary(out.Location) + if err != nil { + return out, err + } + + out.Data = make([]byte, l.Length) + err = db.sharky.Read(ctx, l, out.Data) + if err != nil { + return out, err + } + switch mode { // update the access timestamp and gc index case storage.ModeGetRequest: diff --git a/pkg/localstore/mode_get_multi.go b/pkg/localstore/mode_get_multi.go index 2d3167ed962..7005810baff 100644 --- a/pkg/localstore/mode_get_multi.go +++ b/pkg/localstore/mode_get_multi.go @@ -22,6 +22,7 @@ import ( "time" "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" @@ -43,7 +44,7 @@ func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm } }() - out, err := db.getMulti(mode, addrs...) + out, err := db.getMulti(ctx, mode, addrs...) if err != nil { if errors.Is(err, leveldb.ErrNotFound) { return nil, storage.ErrNotFound @@ -60,7 +61,7 @@ func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm // getMulti returns Items from the retrieval index // and updates other indexes. -func (db *DB) getMulti(mode storage.ModeGet, addrs ...swarm.Address) (out []shed.Item, err error) { +func (db *DB) getMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm.Address) (out []shed.Item, err error) { out = make([]shed.Item, len(addrs)) for i, addr := range addrs { out[i].Address = addr.Bytes() @@ -71,6 +72,19 @@ func (db *DB) getMulti(mode storage.ModeGet, addrs ...swarm.Address) (out []shed return nil, err } + for i, item := range out { + l, err := sharky.LocationFromBinary(item.Location) + if err != nil { + return nil, err + } + + out[i].Data = make([]byte, l.Length) + err = db.sharky.Read(ctx, l, out[i].Data) + if err != nil { + return nil, err + } + } + switch mode { // update the access timestamp and gc index case storage.ModeGetRequest: diff --git a/pkg/localstore/mode_put.go b/pkg/localstore/mode_put.go index 85e6a8e6e0e..ef297635620 100644 --- a/pkg/localstore/mode_put.go +++ b/pkg/localstore/mode_put.go @@ -23,6 +23,7 @@ import ( "fmt" "time" + "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" @@ -42,7 +43,7 @@ func (db *DB) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) db.metrics.ModePut.Inc() defer totalTimeMetric(db.metrics.TotalTimePut, time.Now()) - exist, err = db.put(mode, chs...) + exist, err = db.put(ctx, mode, chs...) if err != nil { db.metrics.ModePutFailure.Inc() } @@ -50,6 +51,12 @@ func (db *DB) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) return exist, err } +type releaseLocations []sharky.Location + +func (r *releaseLocations) add(loc sharky.Location) { + *r = append(*r, loc) +} + // put stores Chunks to database and updates other indexes. It acquires batchMu // to protect two calls of this function for the same address in parallel. Item // fields Address and Data must not be with their nil values. If chunks with the @@ -57,7 +64,7 @@ func (db *DB) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) // and following ones will have exist set to true for their index in exist // slice. This is the same behaviour as if the same chunks are passed one by one // in multiple put method calls. -func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) { +func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, retErr error) { // this is an optimization that tries to optimize on already existing chunks // not needing to acquire batchMu. This is in order to reduce lock contention // when chunks are retried across the network for whatever reason. @@ -99,17 +106,56 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e // Values from this map are stored with the batch binIDs := make(map[uint8]uint64) + var ( + // this is the list of locations that need to be released if the batch is + // successfully committed due to postageIndex collisions + releaseLocs = new(releaseLocations) + // this is the list of locations that need to be released if the batch is NOT + // successfully committed as they have already been committed to sharky + committedLocations []sharky.Location + ) + + putChunk := func(ch swarm.Chunk, index int, putOp func(shed.Item) (bool, int64, int64, error)) (bool, int64, int64, error) { + if containsChunk(ch.Address(), chs[:index]...) { + return true, 0, 0, nil + } + item := chunkToItem(ch) + loc, exists, err := db.putSharky(ctx, item) + if err != nil { + return false, 0, 0, err + } + if exists { + return true, 0, 0, nil + } + committedLocations = append(committedLocations, loc) + item.Location, err = loc.MarshalBinary() + if err != nil { + return false, 0, 0, err + } + return putOp(item) + } + + // If for whatever reason we fail to commit the batch, we should release all + // the chunks that have been committed to sharky + defer func() { + if retErr != nil { + for _, l := range committedLocations { + err := db.sharky.Release(ctx, l) + if err != nil { + db.logger.Warningf("failed releasing sharky location on error %v", err) + } + } + } + }() + switch mode { case storage.ModePutRequest, storage.ModePutRequestPin, storage.ModePutRequestCache: for i, ch := range chs { - if containsChunk(ch.Address(), chs[:i]...) { - exist[i] = true - continue - } - item := chunkToItem(ch) pin := mode == storage.ModePutRequestPin // force pin in this mode cache := mode == storage.ModePutRequestCache // force cache - exists, c, r, err := db.putRequest(batch, binIDs, item, pin, cache) + exists, c, r, err := putChunk(ch, i, func(item shed.Item) (bool, int64, int64, error) { + return db.putRequest(ctx, releaseLocs, batch, binIDs, item, pin, cache) + }) if err != nil { return nil, fmt.Errorf("put request: %w", err) } @@ -120,12 +166,16 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e case storage.ModePutUpload, storage.ModePutUploadPin: for i, ch := range chs { - if containsChunk(ch.Address(), chs[:i]...) { - exist[i] = true - continue - } - item := chunkToItem(ch) - exists, c, err := db.putUpload(batch, binIDs, item) + exists, c, _, err := putChunk(ch, i, func(item shed.Item) (bool, int64, int64, error) { + chExists, gcChange, err := db.putUpload(batch, releaseLocs, binIDs, item) + if err == nil && mode == storage.ModePutUploadPin { + c2, err := db.setPin(batch, item) + if err == nil { + gcChange += c2 + } + } + return chExists, gcChange, 0, err + }) if err != nil { return nil, fmt.Errorf("put upload: %w", err) } @@ -137,22 +187,13 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e triggerPushFeed = true } gcSizeChange += c - if mode == storage.ModePutUploadPin { - c, err = db.setPin(batch, item) - if err != nil { - return nil, fmt.Errorf("upload set pin: %w", err) - } - gcSizeChange += c - } } case storage.ModePutSync: for i, ch := range chs { - if containsChunk(ch.Address(), chs[:i]...) { - exist[i] = true - continue - } - exists, c, r, err := db.putSync(batch, binIDs, chunkToItem(ch)) + exists, c, r, err := putChunk(ch, i, func(item shed.Item) (bool, int64, int64, error) { + return db.putSync(batch, releaseLocs, binIDs, item) + }) if err != nil { return nil, fmt.Errorf("put sync: %w", err) } @@ -174,7 +215,7 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e db.binIDs.PutInBatch(batch, uint64(po), id) } - err = db.incGCSizeInBatch(batch, gcSizeChange) + err := db.incGCSizeInBatch(batch, gcSizeChange) if err != nil { return nil, fmt.Errorf("inc gc: %w", err) } @@ -189,6 +230,13 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e return nil, fmt.Errorf("write batch: %w", err) } + for _, v := range *releaseLocs { + err = db.sharky.Release(ctx, v) + if err != nil { + db.logger.Warning("failed releasing sharky location", v) + } + } + for po := range triggerPullFeed { db.triggerPullSubscriptions(po) } @@ -198,19 +246,35 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e return exist, nil } -// putRequest adds an Item to the batch by updating required indexes: -// - put to indexes: retrieve, gc -// - it does not enter the syncpool -// The batch can be written to the database. -// Provided batch and binID map are updated. -func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item, forcePin, forceCache bool) (exists bool, gcSizeChange, reserveSizeChange int64, err error) { +// putSharky will add the item to sharky storage if it doesnt exist. +func (db *DB) putSharky(ctx context.Context, item shed.Item) (loc sharky.Location, exists bool, err error) { exists, err = db.retrievalDataIndex.Has(item) if err != nil { - return false, 0, 0, err + return loc, false, err } if exists { - return true, 0, 0, nil + return loc, true, nil } + l, err := db.sharky.Write(ctx, item.Data) + if err != nil { + return loc, false, err + } + return l, false, nil +} + +// putRequest adds an Item to the batch by updating required indexes: +// - put to indexes: retrieve, gc +// - it does not enter the syncpool +// The batch can be written to the database. +// Provided batch and binID map are updated. +func (db *DB) putRequest( + ctx context.Context, + loc *releaseLocations, + batch *leveldb.Batch, + binIDs map[uint8]uint64, + item shed.Item, + forcePin, forceCache bool, +) (exists bool, gcSizeChange, reserveSizeChange int64, err error) { previous, err := db.postageIndexIndex.Get(item) if err != nil { @@ -230,6 +294,13 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she if err != nil { return false, 0, 0, err } + + l, err := sharky.LocationFromBinary(previous.Location) + if err != nil { + return false, 0, 0, err + } + loc.add(l) + radius, err := db.postageRadiusIndex.Get(item) if err != nil { if !errors.Is(err, leveldb.ErrNotFound) { @@ -240,7 +311,6 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she reserveSizeChange-- } } - } item.StoreTimestamp = now() @@ -287,14 +357,12 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she // - put to indexes: retrieve, push, pull // The batch can be written to the database. // Provided batch and binID map are updated. -func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { - exists, err = db.retrievalDataIndex.Has(item) - if err != nil { - return false, 0, fmt.Errorf("retrieval has: %w", err) - } - if exists { - return true, 0, nil - } +func (db *DB) putUpload( + batch *leveldb.Batch, + loc *releaseLocations, + binIDs map[uint8]uint64, + item shed.Item, +) (exists bool, gcSizeChange int64, err error) { previous, err := db.postageIndexIndex.Get(item) if err != nil { @@ -314,6 +382,13 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed if err != nil { return false, 0, fmt.Errorf("same slot remove: %w", err) } + + l, err := sharky.LocationFromBinary(previous.Location) + if err != nil { + return false, 0, err + } + + loc.add(l) } item.StoreTimestamp = now() @@ -348,15 +423,7 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed // - put to indexes: retrieve, pull, gc // The batch can be written to the database. // Provided batch and binID map are updated. -func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange, reserveSizeChange int64, err error) { - exists, err = db.retrievalDataIndex.Has(item) - if err != nil { - return false, 0, 0, err - } - if exists { - return true, 0, 0, nil - } - +func (db *DB) putSync(batch *leveldb.Batch, loc *releaseLocations, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange, reserveSizeChange int64, err error) { previous, err := db.postageIndexIndex.Get(item) if err != nil { if !errors.Is(err, leveldb.ErrNotFound) { @@ -375,6 +442,14 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I if err != nil { return false, 0, 0, err } + + l, err := sharky.LocationFromBinary(previous.Location) + if err != nil { + return false, 0, 0, err + } + + loc.add(l) + radius, err := db.postageRadiusIndex.Get(item) if err != nil { if !errors.Is(err, leveldb.ErrNotFound) { diff --git a/pkg/localstore/mode_put_test.go b/pkg/localstore/mode_put_test.go index 2a7384d2bd5..b531ae698f1 100644 --- a/pkg/localstore/mode_put_test.go +++ b/pkg/localstore/mode_put_test.go @@ -28,6 +28,7 @@ import ( "github.com/ethersphere/bee/pkg/postage" postagetesting "github.com/ethersphere/bee/pkg/postage/testing" + "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" @@ -743,6 +744,18 @@ func TestPutDuplicateChunks(t *testing.T) { } } +func TestReleaseLocations(t *testing.T) { + locs := new(releaseLocations) + + for i := 0; i < 5; i++ { + locs.add(sharky.Location{Shard: 0, Slot: 100, Length: 100}) + } + + if len(*locs) != 5 { + t.Fatal("incorrect length of release locations expected", 5, "found", len(*locs)) + } +} + // BenchmarkPutUpload runs a series of benchmarks that upload // a specific number of chunks in parallel. // diff --git a/pkg/localstore/mode_set.go b/pkg/localstore/mode_set.go index 9e6fb03ac10..f7fcce5e4d8 100644 --- a/pkg/localstore/mode_set.go +++ b/pkg/localstore/mode_set.go @@ -21,10 +21,12 @@ import ( "errors" "time" + "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/tags" + "github.com/hashicorp/go-multierror" "github.com/syndtr/goleveldb/leveldb" ) @@ -35,7 +37,7 @@ import ( func (db *DB) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) { db.metrics.ModeSet.Inc() defer totalTimeMetric(db.metrics.TotalTimeSet, time.Now()) - err = db.set(mode, addrs...) + err = db.set(ctx, mode, addrs...) if err != nil { db.metrics.ModeSetFailure.Inc() } @@ -44,7 +46,7 @@ func (db *DB) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr // set updates database indexes for // chunks represented by provided addresses. -func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { +func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) { // protect parallel updates db.batchMu.Lock() defer db.batchMu.Unlock() @@ -53,6 +55,7 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { } batch := new(leveldb.Batch) + var committedLocations []sharky.Location // variables that provide information for operations // to be done after write batch function successfully executes @@ -77,10 +80,19 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { case storage.ModeSetRemove: for _, addr := range addrs { item := addressToItem(addr) + item, err = db.retrievalDataIndex.Get(item) + if err != nil { + return err + } c, err := db.setRemove(batch, item, true) if err != nil { return err } + l, err := sharky.LocationFromBinary(item.Location) + if err != nil { + return err + } + committedLocations = append(committedLocations, l) gcSizeChange += c } @@ -119,6 +131,15 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { if err != nil { return err } + + sharkyErr := new(multierror.Error) + for _, l := range committedLocations { + sharkyErr = multierror.Append(sharkyErr, db.sharky.Release(ctx, l)) + } + if sharkyErr.ErrorOrNil() != nil { + return sharkyErr.ErrorOrNil() + } + for po := range triggerPullFeed { db.triggerPullSubscriptions(po) } @@ -218,7 +239,8 @@ func (db *DB) setRemove(batch *leveldb.Batch, item shed.Item, check bool) (gcSiz return 0, err } } - if item.StoreTimestamp == 0 { + + if item.StoreTimestamp == 0 || item.Location == nil { item, err = db.retrievalDataIndex.Get(item) if err != nil { return 0, err diff --git a/pkg/localstore/schema.go b/pkg/localstore/schema.go index 8355eb74ae4..6572f30bd9d 100644 --- a/pkg/localstore/schema.go +++ b/pkg/localstore/schema.go @@ -21,4 +21,4 @@ const DBSchemaCode = "code" // DBSchemaCurrent represents the DB schema we want to use. // The actual/current DB schema might differ until migrations are run. -var DBSchemaCurrent = DBSchemaDeadPush +var DBSchemaCurrent = DBSchemaSharky diff --git a/pkg/localstore/subscription_push.go b/pkg/localstore/subscription_push.go index 0946a3e9df1..28a9d541067 100644 --- a/pkg/localstore/subscription_push.go +++ b/pkg/localstore/subscription_push.go @@ -22,6 +22,7 @@ import ( "time" "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/swarm" ) @@ -98,9 +99,19 @@ func (db *DB) SubscribePush(ctx context.Context, skipf func([]byte) bool) (c <-c return true, err } + loc, err := sharky.LocationFromBinary(dataItem.Location) + if err != nil { + return true, err + } + itemData := make([]byte, loc.Length) + err = db.sharky.Read(ctx, loc, itemData) + if err != nil { + return true, err + } + stamp := postage.NewStamp(dataItem.BatchID, dataItem.Index, dataItem.Timestamp, dataItem.Sig) select { - case chunks <- swarm.NewChunk(swarm.NewAddress(dataItem.Address), dataItem.Data).WithTagID(item.Tag).WithStamp(stamp): + case chunks <- swarm.NewChunk(swarm.NewAddress(dataItem.Address), itemData).WithTagID(item.Tag).WithStamp(stamp): count++ // set next iteration start item // when its chunk is successfully sent to channel diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 2d49827e6cd..c49996619a6 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -35,14 +35,18 @@ const ( streamName = "pullsync" cursorStreamName = "cursors" cancelStreamName = "cancel" - - logMore = false // enable this for more logging ) +const logMore = false // enable this for more logging + var ( ErrUnsolicitedChunk = errors.New("peer sent unsolicited chunk") +) - cancellationTimeout = 5 * time.Second // explicit ruid cancellation message timeout +const ( + storagePutTimeout = 5 * time.Second + // explicit ruid cancellation message timeout + cancellationTimeout = 5 * time.Second ) // how many maximum chunks in a batch @@ -252,6 +256,8 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 } if len(chunksToPut) > 0 { s.metrics.DbOps.Inc() + ctx, cancel := context.WithTimeout(ctx, storagePutTimeout) + defer cancel() if ierr := s.storage.Put(ctx, storage.ModePutSync, chunksToPut...); ierr != nil { if err != nil { ierr = fmt.Errorf(", sync err: %w", err) diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 160eee8c1c1..cb286f88857 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -66,6 +66,8 @@ var ( ErrShallowReceipt = errors.New("shallow recipt") ) +const chunkStoreTimeout = 2 * time.Second + func New(networkID uint64, storer storage.Storer, depther topology.NeighborhoodDepther, pushSyncer pushsync.PushSyncer, validStamp postage.ValidStampFn, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer, warmupTime time.Duration) *Service { p := &Service{ networkID: networkID, @@ -181,9 +183,13 @@ func (s *Service) chunksWorker(warmupTime time.Duration, tracer *tracing.Tracer) // synced which makes it available to the node but not to the network if err := s.valid(ch); err != nil { logger.Warningf("pusher: stamp with batch ID %x is no longer valid, skipping syncing for chunk %s: %v", ch.Stamp().BatchID(), ch.Address().String(), err) + + ctx, cancel := context.WithTimeout(ctx, chunkStoreTimeout) + if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil { s.logger.Errorf("pusher: set sync: %v", err) } + cancel() } cc <- &Op{Chunk: ch, Direct: false} } @@ -246,6 +252,9 @@ func (s *Service) pushChunk(ctx context.Context, ch swarm.Chunk, logger *logrus. } else if err = s.checkReceipt(receipt); err != nil { return err } + + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil { return fmt.Errorf("pusher: set sync: %w", err) } diff --git a/pkg/retrieval/retrieval.go b/pkg/retrieval/retrieval.go index c74c482cc1c..7a688ccf5af 100644 --- a/pkg/retrieval/retrieval.go +++ b/pkg/retrieval/retrieval.go @@ -391,6 +391,9 @@ func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address, all } func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { + ctx, cancel := context.WithTimeout(ctx, retrieveChunkTimeout) + defer cancel() + w, r := protobuf.NewWriterAndReader(stream) defer func() { if err != nil { @@ -408,6 +411,7 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e defer span.Finish() ctx = context.WithValue(ctx, requestSourceContextKey{}, p.Address.String()) + addr := swarm.NewAddress(req.Addr) forwarded := false diff --git a/pkg/sharky/metrics.go b/pkg/sharky/metrics.go new file mode 100644 index 00000000000..0d191334014 --- /dev/null +++ b/pkg/sharky/metrics.go @@ -0,0 +1,98 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sharky + +import ( + m "github.com/ethersphere/bee/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +// metrics groups sharky related prometheus counters. +type metrics struct { + TotalWriteCalls prometheus.Counter + TotalWriteCallsErr prometheus.Counter + TotalReadCalls prometheus.Counter + TotalReadCallsErr prometheus.Counter + TotalReleaseCalls prometheus.Counter + TotalReleaseCallsErr prometheus.Counter + ShardCount prometheus.Gauge + CurrentShardSize *prometheus.GaugeVec + ShardFragmentation *prometheus.GaugeVec +} + +// newMetrics is a convenient constructor for creating new metrics. +func newMetrics() metrics { + const subsystem = "sharky" + + return metrics{ + TotalWriteCalls: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_write_calls", + Help: "The total write calls made.", + }), + TotalWriteCallsErr: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_write_calls_err", + Help: "The total write calls ended up with error.", + }), + TotalReadCalls: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_read_calls", + Help: "The total read calls made.", + }), + TotalReadCallsErr: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_read_calls_err", + Help: "The total read calls ended up with error.", + }), + TotalReleaseCalls: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_release_calls", + Help: "The total release calls made.", + }), + TotalReleaseCallsErr: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_release_calls_err", + Help: "The total release calls ended up with error.", + }), + ShardCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "shard_count", + Help: "The number of shards.", + }), + CurrentShardSize: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "current_shard_size", + Help: "The current size of the shard derived as: length in bytes/data length per chunk", + }, + []string{"current_shard_size"}, + ), + ShardFragmentation: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "shard_fragmentation", + Help: ` +The total fragmentation of the files on disc for current shard. This is obtained by keeping track of the difference +between actual lengths of chunks and the length of slot. + `, + }, []string{"shard_fragmentation"}, + ), + } +} + +// Metrics returns set of prometheus collectors. +func (s *Store) Metrics() []prometheus.Collector { + return m.PrometheusCollectorsFromFields(s.metrics) +} diff --git a/pkg/sharky/recovery.go b/pkg/sharky/recovery.go new file mode 100644 index 00000000000..4847b184e86 --- /dev/null +++ b/pkg/sharky/recovery.go @@ -0,0 +1,86 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sharky + +import ( + "errors" + "fmt" + "io/fs" + "os" + "path" + + "github.com/hashicorp/go-multierror" +) + +// Recovery allows disaster recovery. +type Recovery struct { + shards []*slots +} + +var ErrShardNotFound = errors.New("shard not found") + +func NewRecovery(dir string, shardCnt int, datasize int) (*Recovery, error) { + shards := make([]*slots, shardCnt) + for i := 0; i < shardCnt; i++ { + file, err := os.OpenFile(path.Join(dir, fmt.Sprintf("shard_%03d", i)), os.O_RDONLY, 0666) + if errors.Is(err, fs.ErrNotExist) { + return nil, fmt.Errorf("index %d: %w", i, ErrShardNotFound) + } + if err != nil { + return nil, err + } + fi, err := file.Stat() + if err != nil { + return nil, err + } + if err = file.Close(); err != nil { + return nil, err + } + size := uint32(fi.Size() / int64(datasize)) + ffile, err := os.OpenFile(path.Join(dir, fmt.Sprintf("free_%03d", i)), os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + return nil, err + } + sl := newSlots(ffile, nil) + sl.data = make([]byte, size/8) + shards[i] = sl + } + return &Recovery{shards}, nil +} + +// Add marks a location as used (not free). +func (r *Recovery) Add(loc Location) error { + sh := r.shards[loc.Shard] + l := len(sh.data) + if diff := int(loc.Slot/8) - l; diff >= 0 { + sh.extend(diff + 1) + for i := 0; i <= diff; i++ { + sh.data[l+i] = 0x0 + } + } + sh.push(loc.Slot) + return nil +} + +// Save saves all free slots files of the recovery (without closing). +func (r *Recovery) Save() error { + err := new(multierror.Error) + for _, sh := range r.shards { + for i := range sh.data { + sh.data[i] ^= 0xff + } + err = multierror.Append(err, sh.save()) + } + return err.ErrorOrNil() +} + +// Close closes data and free slots files of the recovery (without saving). +func (r *Recovery) Close() error { + err := new(multierror.Error) + for _, sh := range r.shards { + err = multierror.Append(err, sh.file.Close()) + } + return err.ErrorOrNil() +} diff --git a/pkg/sharky/recovery_test.go b/pkg/sharky/recovery_test.go new file mode 100644 index 00000000000..aa676a8169c --- /dev/null +++ b/pkg/sharky/recovery_test.go @@ -0,0 +1,169 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sharky_test + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "math/rand" + "testing" + "time" + + "github.com/ethersphere/bee/pkg/sharky" +) + +func TestMissingShard(t *testing.T) { + _, err := sharky.NewRecovery(t.TempDir(), 1, 8) + if !errors.Is(err, sharky.ErrShardNotFound) { + t.Fatalf("want %v, got %v", sharky.ErrShardNotFound, err) + } +} + +func TestRecovery(t *testing.T) { + datasize := 4 + shards := 8 + shardSize := uint32(16) + limitInChunks := shards * int(shardSize) + + dir := t.TempDir() + ctx := context.Background() + size := limitInChunks / 2 + data := make([]byte, 4) + locs := make([]sharky.Location, size) + preserved := make(map[uint32]bool) + + s := newSharky(t, dir, shards, datasize) + for i := range locs { + binary.BigEndian.PutUint32(data, uint32(i)) + loc, err := s.Write(ctx, data) + if err != nil { + t.Fatal(err) + } + locs[i] = loc + } + // extract locations to preserve / free in map + indexes := make([]uint32, size) + for i := range indexes { + indexes[i] = uint32(i) + } + rest := indexes[:] + for n := size; n > size/2; n-- { + i := rand.Intn(n) + preserved[rest[i]] = false + rest = append(rest[:i], rest[i+1:]...) + } + if len(rest) != len(preserved) { + t.Fatalf("incorrect set sizes: %d <> %d", len(rest), len(preserved)) + } + for _, i := range rest { + preserved[i] = true + } + + t.Run("recover based on preserved map", func(t *testing.T) { + r, err := sharky.NewRecovery(dir, shards, datasize) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + if err := r.Close(); err != nil { + t.Fatal(err) + } + }) + for i, add := range preserved { + if add { + if err := r.Add(locs[i]); err != nil { + t.Fatal(err) + } + } + } + if err := r.Save(); err != nil { + t.Fatal(err) + } + }) + + payload := []byte{0xff} + + t.Run("check integrity of recovered sharky", func(t *testing.T) { + s := newSharky(t, dir, shards, datasize) + buf := make([]byte, datasize) + t.Run("preserved are found", func(t *testing.T) { + for i := range preserved { + loc := locs[i] + if err := s.Read(ctx, loc, buf); err != nil { + t.Fatal(err) + } + j := binary.BigEndian.Uint32(buf) + if i != j { + t.Fatalf("data not preserved at location %v: want %d; got %d", loc, i, j) + } + } + }) + + var freelocs []sharky.Location + + t.Run("correct number of free slots", func(t *testing.T) { + s := newSharky(t, dir, 1, datasize) + cctx, cancel := context.WithTimeout(ctx, 800*time.Millisecond) + defer cancel() + + runs := 96 + for i := 0; i < runs; i++ { + loc, err := s.Write(cctx, payload) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + break + } + t.Fatal(err) + } + freelocs = append(freelocs, loc) + } + if len(freelocs) != runs { + t.Fatalf("incorrect number of free slots: wanted %d; got %d", runs, len(freelocs)) + } + }) + t.Run("added locs are still preserved", func(t *testing.T) { + for i, added := range preserved { + if !added { + continue + } + if err := s.Read(ctx, locs[int(i)], buf); err != nil { + t.Fatal(err) + } + j := binary.BigEndian.Uint32(buf) + if i != j { + t.Fatalf("data not preserved at location %v: want %d; got %d", locs[int(j)], i, j) + } + } + }) + t.Run("all other slots also overwritten", func(t *testing.T) { + for _, loc := range freelocs { + if err := s.Read(ctx, loc, buf); err != nil { + t.Fatal(err) + } + data := buf[:len(payload)] + if !bytes.Equal(data, payload) { + t.Fatalf("incorrect data on freed location %v: want %x; got %x", loc, payload, data) + } + } + }) + }) +} + +func newSharky(t *testing.T, dir string, shards, datasize int) *sharky.Store { + t.Helper() + s, err := sharky.New(&dirFS{basedir: dir}, shards, datasize) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + if err := s.Close(); err != nil { + t.Fatal(err) + } + }) + + return s +} diff --git a/pkg/sharky/shard.go b/pkg/sharky/shard.go new file mode 100644 index 00000000000..733321cc65f --- /dev/null +++ b/pkg/sharky/shard.go @@ -0,0 +1,186 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sharky + +import ( + "context" + "encoding/binary" + "io" +) + +// LocationSize is the size of the byte representation of Location +const LocationSize int = 7 + +// Location models the location of a chunk +type Location struct { + Shard uint8 + Slot uint32 + Length uint16 +} + +// MarshalBinary returns byte representation of location +func (l *Location) MarshalBinary() ([]byte, error) { + b := make([]byte, LocationSize) + b[0] = l.Shard + binary.LittleEndian.PutUint32(b[1:5], l.Slot) + binary.LittleEndian.PutUint16(b[5:], l.Length) + return b, nil +} + +// UnmarshalBinary constructs the location from byte representation +func (l *Location) UnmarshalBinary(buf []byte) error { + l.Shard = buf[0] + l.Slot = binary.LittleEndian.Uint32(buf[1:5]) + l.Length = binary.LittleEndian.Uint16(buf[5:]) + return nil +} + +// LocationFromBinary is a helper to construct a Location object from byte representation +func LocationFromBinary(buf []byte) (Location, error) { + l := new(Location) + err := l.UnmarshalBinary(buf) + if err != nil { + return Location{}, err + } + return *l, nil +} + +// sharkyFile defines the minimal interface that is required for a file type for it to +// be usable in sharky. This allows us to have different implementations of file types +// that can continue using the sharky logic +type sharkyFile interface { + io.ReadWriteCloser + io.ReaderAt + io.Seeker + io.WriterAt + Truncate(int64) error + Sync() error +} + +// write models the input to a write operation +type write struct { + buf []byte // variable size read buffer + res chan entry // to put the result through +} + +// entry models the output result of a write operation +type entry struct { + loc Location // shard, slot, length combo + err error // signal for end of operation +} + +// read models the input to read operation (the output is an error) +type read struct { + buf []byte // variable size read buffer + slot uint32 // slot to read from +} + +// shard models a shard writing to a file with periodic offsets due to fixed maxDataSize +type shard struct { + reads chan read // channel for reads + errc chan error // result for reads + writes chan write // channel for writes + index uint8 // index of the shard + maxDataSize int // max size of blobs + file sharkyFile // the file handle the shard is writing data to + slots *slots // component keeping track of freed slots + quit chan struct{} // channel to signal quitting +} + +// forever loop processing +func (sh *shard) process() { + var writes chan write + var slot uint32 + free := sh.slots.out +LOOP: + for { + select { + case op := <-sh.reads: + // prioritise read ops i.e., continue processing read ops (only) as long as any + // this will block any writes on this shard effectively making store-wide + // write op to use a differenct shard while this one is busy + for { + sh.errc <- sh.read(op) + select { + case op = <-sh.reads: + default: + continue LOOP + } + } + + // only enabled if there is a free slot previously popped + case op := <-writes: + op.res <- sh.write(op.buf, slot) + free = sh.slots.out // reenable popping a free slot next time we can write + writes = nil // disable popping a write operation until there is a free slot + + // pop a free slot + case slot = <-free: + // only if there is one can we pop a chunk to write otherwise keep back pressure on writes + // effectively enforcing another shard to be chosen + writes = sh.writes // enable popping a write operation + free = nil // disabling getting a new slot until a write is actually done + + case <-sh.quit: + // this condition checks if an slot is in limbo (popped but not used for write op) + if writes != nil { + sh.slots.wg.Add(1) // Done after the slots process pops from slots.in + go func() { + sh.slots.in <- slot + }() + } + return + } + } +} + +// close closes the shard: +// wait for pending operations to finish then saves free slots and blobs on disk +func (sh *shard) close() error { + sh.slots.wg.Wait() + if err := sh.slots.save(); err != nil { + return err + } + if err := sh.slots.file.Close(); err != nil { + return err + } + return sh.file.Close() +} + +// offset calculates the offset from the slot +// this is possible since all blobs are of fixed size +func (sh *shard) offset(slot uint32) int64 { + return int64(slot) * int64(sh.maxDataSize) +} + +// read reads loc.Length bytes to the buffer from the blob slot loc.Slot +func (sh *shard) read(r read) error { + _, err := sh.file.ReadAt(r.buf, sh.offset(r.slot)) + return err +} + +// write writes loc.Length bytes to the buffer from the blob slot loc.Slot +func (sh *shard) write(buf []byte, slot uint32) entry { + n, err := sh.file.WriteAt(buf, sh.offset(slot)) + return entry{ + loc: Location{ + Shard: sh.index, + Slot: slot, + Length: uint16(n), + }, + err: err, + } +} + +// release frees the slot allowing new entry to overwrite +func (sh *shard) release(ctx context.Context, slot uint32) error { + select { + case sh.slots.in <- slot: + return nil + case <-ctx.Done(): + sh.slots.wg.Done() + return ctx.Err() + } +} diff --git a/pkg/sharky/shard_slots_test.go b/pkg/sharky/shard_slots_test.go new file mode 100644 index 00000000000..2aaa31623c6 --- /dev/null +++ b/pkg/sharky/shard_slots_test.go @@ -0,0 +1,168 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package sharky + +import ( + "bytes" + "context" + "fmt" + "io/fs" + "os" + "path/filepath" + "sync" + "testing" + "time" +) + +// TestShard ensures that released slots eventually become available for writes +func TestShard(t *testing.T) { + shard := newShard(t) + + payload := write{buf: []byte{0xff}, res: make(chan entry)} + loc := writePayload(t, shard, payload) + buf := readFromLocation(t, shard, loc) + + if !bytes.Equal(buf, payload.buf) { + t.Fatalf("want %x, got %x", buf, payload.buf) + } + + if loc.Slot != 0 { + t.Fatalf("expected to write to slot 0, got %d", loc.Slot) + } + + // in order for the test to succeed this slot is expected to become available before test finishes + releaseSlot(t, shard, loc) + + payload = write{buf: []byte{0xff >> 1}, res: make(chan entry)} + loc = writePayload(t, shard, payload) + + // immediate write should pick the next slot + if loc.Slot != 1 { + t.Fatalf("expected to write to slot 1, got %d", loc.Slot) + } + + releaseSlot(t, shard, loc) + + // we make ten writes expecting that slot 0 is released and becomes available for writing eventually + i, runs := 0, 10 + for ; i < runs; i++ { + payload = write{buf: []byte{0x01 << i}, res: make(chan entry)} + loc = writePayload(t, shard, payload) + releaseSlot(t, shard, loc) + if loc.Slot == 0 { + break + } + } + + if i == runs { + t.Errorf("expected to write to slot 0 within %d runs, write did not occur", runs) + } +} + +func writePayload(t *testing.T, shard *shard, payload write) (loc Location) { + t.Helper() + + shard.slots.wg.Add(1) + + select { + case shard.writes <- payload: + e := <-payload.res + if e.err != nil { + t.Fatal("write entry", e.err) + } + loc = e.loc + case <-time.After(100 * time.Millisecond): + t.Fatal("write timeout") + } + + return loc +} + +func readFromLocation(t *testing.T, shard *shard, loc Location) []byte { + t.Helper() + buf := make([]byte, loc.Length) + + select { + case shard.reads <- read{buf[:loc.Length], loc.Slot}: + if err := <-shard.errc; err != nil { + t.Fatal("read", err) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout reading") + } + + return buf +} + +func releaseSlot(t *testing.T, shard *shard, loc Location) { + t.Helper() + ctx := context.Background() + cctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + if err := shard.release(cctx, loc.Slot); err != nil { + t.Fatal("release slot", loc.Slot, "err", err) + } + cancel() +} + +type dirFS string + +func (d *dirFS) Open(path string) (fs.File, error) { + return os.OpenFile(filepath.Join(string(*d), path), os.O_RDWR|os.O_CREATE, 0644) +} + +func newShard(t *testing.T) *shard { + t.Helper() + + basedir := dirFS(t.TempDir()) + index := 1 + + file, err := basedir.Open(fmt.Sprintf("shard_%03d", index)) + if err != nil { + t.Fatal(err) + } + + ffile, err := basedir.Open(fmt.Sprintf("free_%03d", index)) + if err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + + slots := newSlots(ffile.(sharkyFile), &wg) + err = slots.load() + if err != nil { + t.Fatal(err) + } + + quit := make(chan struct{}) + shard := &shard{ + reads: make(chan read), + errc: make(chan error), + writes: make(chan write), + index: uint8(index), + maxDataSize: 1, + file: file.(sharkyFile), + slots: slots, + quit: quit, + } + + t.Cleanup(func() { + quit <- struct{}{} + if err := shard.close(); err != nil { + t.Fatal("close shard", err) + } + }) + + terminated := make(chan struct{}) + + go func() { + shard.process() + close(terminated) + }() + + shard.slots.wg.Add(1) + go slots.process(terminated) + + return shard +} diff --git a/pkg/sharky/shard_test.go b/pkg/sharky/shard_test.go new file mode 100644 index 00000000000..e2f4e78f928 --- /dev/null +++ b/pkg/sharky/shard_test.go @@ -0,0 +1,54 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package sharky_test + +import ( + "fmt" + "math" + "testing" + + "github.com/ethersphere/bee/pkg/sharky" +) + +func TestLocationSerialization(t *testing.T) { + for _, tc := range []*sharky.Location{ + { + Shard: 1, + Slot: 100, + Length: 4096, + }, + { + Shard: 0, + Slot: 0, + Length: 0, + }, + { + Shard: math.MaxUint8, + Slot: math.MaxUint32, + Length: math.MaxUint16, + }, + } { + t.Run(fmt.Sprintf("%d_%d_%d", tc.Shard, tc.Slot, tc.Length), func(st *testing.T) { + buf, err := tc.MarshalBinary() + if err != nil { + st.Fatal(err) + } + + if len(buf) != sharky.LocationSize { + st.Fatal("unexpected length of buffer") + } + + l2 := &sharky.Location{} + + err = l2.UnmarshalBinary(buf) + if err != nil { + t.Fatal(err) + } + + if l2.Shard != tc.Shard || l2.Slot != tc.Slot || l2.Length != tc.Length { + t.Fatalf("read incorrect values from buf exp: %v found %v", tc, l2) + } + }) + } +} diff --git a/pkg/sharky/sharky_test.go b/pkg/sharky/sharky_test.go new file mode 100644 index 00000000000..6d3a7638438 --- /dev/null +++ b/pkg/sharky/sharky_test.go @@ -0,0 +1,282 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sharky_test + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "io/fs" + "math/rand" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/ethersphere/bee/pkg/sharky" + "golang.org/x/sync/errgroup" +) + +type dirFS struct { + basedir string +} + +func (d *dirFS) Open(path string) (fs.File, error) { + return os.OpenFile(filepath.Join(d.basedir, path), os.O_RDWR|os.O_CREATE, 0644) +} + +func TestSingleRetrieval(t *testing.T) { + datasize := 4 + dir := t.TempDir() + s, err := sharky.New(&dirFS{basedir: dir}, 2, datasize) + if err != nil { + t.Fatal(err) + } + defer s.Close() + ctx := context.Background() + + t.Run("write and read", func(t *testing.T) { + for _, tc := range []struct { + name string + want []byte + err error + }{ + { + "short data", + []byte{0x1}, + nil, + }, { + "exact size data", + []byte{1, 1, 1, 1}, + nil, + }, { + "exact size data 2", + []byte{1, 1, 1, 1}, + nil, + }, { + "long data", + []byte("long data"), + sharky.ErrTooLong, + }, { + "exact size data 3", + []byte{1, 1, 1, 1}, + nil, + }, + } { + buf := make([]byte, datasize) + t.Run(tc.name, func(t *testing.T) { + cctx, cancel := context.WithTimeout(ctx, 800*time.Millisecond) + defer cancel() + loc, err := s.Write(cctx, tc.want) + if !errors.Is(err, tc.err) { + t.Fatalf("error mismatch on write. want %v, got %v", tc.err, err) + } + if err != nil { + return + } + err = s.Read(ctx, loc, buf) + if err != nil { + t.Fatal(err) + } + got := buf[:loc.Length] + if !bytes.Equal(tc.want, got) { + t.Fatalf("data mismatch at location %v. want %x, got %x", loc, tc.want, got) + } + }) + } + }) +} + +// TestPersistence tests behaviour across several process sessions +// and checks if items and pregenerated free slots are persisted correctly +func TestPersistence(t *testing.T) { + datasize := 4 + shards := 2 + shardSize := uint32(16) + items := shards * int(shardSize) + + dir := t.TempDir() + buf := make([]byte, 4) + locs := make([]*sharky.Location, items) + i := 0 + j := 0 + ctx := context.Background() + // simulate several subsequent sessions filling up the store + for ; i < items; j++ { + cctx, cancel := context.WithTimeout(ctx, 10*time.Second) + s, err := sharky.New(&dirFS{basedir: dir}, shards, datasize) + if err != nil { + t.Fatal(err) + } + for ; i < items && rand.Intn(4) > 0; i++ { + if locs[i] != nil { + continue + } + binary.BigEndian.PutUint32(buf, uint32(i)) + loc, err := s.Write(cctx, buf) + if err != nil { + t.Fatal(err) + } + locs[i] = &loc + } + cancel() + if err := s.Close(); err != nil { + t.Fatal(err) + } + } + t.Logf("got full in %d sessions\n", j) + + // check location and data consisency + cctx, cancel := context.WithTimeout(ctx, 10*time.Second) + s, err := sharky.New(&dirFS{basedir: dir}, shards, datasize) + if err != nil { + t.Fatal(err) + } + buf = make([]byte, datasize) + j = 0 + for want, loc := range locs { + j++ + err := s.Read(cctx, *loc, buf) + if err != nil { + t.Fatal(err) + } + got := binary.BigEndian.Uint32(buf) + if int(got) != want { + t.Fatalf("data mismatch. want %d, got %d", want, got) + } + } + cancel() + if err := s.Close(); err != nil { + t.Fatal(err) + } +} + +func TestConcurrency(t *testing.T) { + datasize := 4 + test := func(t *testing.T, workers, shards int, shardSize uint32) { + limit := shards * int(shardSize) + + dir := t.TempDir() + defer os.RemoveAll(dir) + s, err := sharky.New(&dirFS{basedir: dir}, shards, datasize) + if err != nil { + t.Fatal(err) + } + c := make(chan sharky.Location, limit) + start := make(chan struct{}) + deleted := make(map[uint32]int) + entered := make(map[uint32]struct{}) + ctx := context.Background() + eg, ectx := errgroup.WithContext(ctx) + // a number of workers write sequential numbers to sharky + for k := 0; k < workers; k++ { + k := k + eg.Go(func() error { + <-start + buf := make([]byte, 4) + for i := 0; i < limit; i++ { + j := i*workers + k + binary.BigEndian.PutUint32(buf, uint32(j)) + loc, err := s.Write(ctx, buf) + if err != nil { + return err + } + select { + case <-ectx.Done(): + return ectx.Err() + case c <- loc: + } + } + return nil + }) + } + // parallel to these workers, other workers collect the taken slots and release them + // modelling some aggressive gc policy + mtx := sync.Mutex{} + for k := 0; k < workers-1; k++ { + eg.Go(func() error { + <-start + buf := make([]byte, datasize) + for i := 0; i < limit; i++ { + select { + case <-ectx.Done(): + return ectx.Err() + case loc := <-c: + if err := s.Read(ectx, loc, buf); err != nil { + return err + } + j := binary.BigEndian.Uint32(buf) + mtx.Lock() + deleted[j]++ + mtx.Unlock() + if err := s.Release(ectx, loc); err != nil { + return err + } + } + } + return nil + }) + } + close(start) + if err := eg.Wait(); err != nil { + t.Fatal(err) + } + close(c) + extraSlots := 0 + for i := uint32(0); i < uint32(workers*limit); i++ { + cnt, found := deleted[i] + if !found { + entered[i] = struct{}{} + continue + } + extraSlots += cnt - 1 + } + buf := make([]byte, datasize) + for loc := range c { + err := s.Read(ctx, loc, buf) + if err != nil { + t.Error(err) + return + } + i := binary.BigEndian.Uint32(buf) + + _, found := entered[i] + if !found { + t.Fatal("item at unreleased location incorrect") + } + } + + // the store has extra slots capacity + cctx, cancel := context.WithTimeout(ctx, 800*time.Millisecond) + for i := 0; i < extraSlots; i++ { + t.Logf("checking extra slot %d\n", i) + _, err = s.Write(cctx, []byte{0}) + if err != nil { + t.Fatal(err) + } + } + cancel() + + if err := s.Close(); err != nil { + t.Fatal(err) + } + } + for _, c := range []struct { + workers, shards int + shardSize uint32 + }{ + {3, 2, 2}, + {2, 64, 2}, + {32, 8, 32}, + {64, 32, 64}, + } { + t.Run(fmt.Sprintf("workers:%d,shards:%d,size:%d", c.workers, c.shards, c.shardSize), func(t *testing.T) { + test(t, c.workers, c.shards, c.shardSize) + }) + } +} diff --git a/pkg/sharky/slots.go b/pkg/sharky/slots.go new file mode 100644 index 00000000000..9399ad3df55 --- /dev/null +++ b/pkg/sharky/slots.go @@ -0,0 +1,133 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sharky + +import ( + "io" + "sync" +) + +type slots struct { + data []byte // byteslice serving as bitvector: i-t bit set <> + size uint32 // number of slots + head uint32 // the first free slot + file sharkyFile // file to persist free slots across sessions + in chan uint32 // incoming channel for free slots, + out chan uint32 // outgoing channel for free slots + wg *sync.WaitGroup // count started write operations +} + +func newSlots(file sharkyFile, wg *sync.WaitGroup) *slots { + return &slots{ + file: file, + in: make(chan uint32), + out: make(chan uint32), + wg: wg, + } +} + +// load inits the slots from file, called after init +func (sl *slots) load() (err error) { + sl.data, err = io.ReadAll(sl.file) + if err != nil { + return err + } + sl.size = uint32(len(sl.data) * 8) + sl.head = sl.next(0) + return err +} + +// save persists the free slot bitvector on disk (without closing) +func (sl *slots) save() error { + if err := sl.file.Truncate(0); err != nil { + return err + } + if _, err := sl.file.Seek(0, 0); err != nil { + return err + } + if _, err := sl.file.Write(sl.data); err != nil { + return err + } + return sl.file.Sync() +} + +// extend adapts the slots to an extended size shard +// extensions are bytewise: can only be multiples of 8 bits +func (sl *slots) extend(n int) { + sl.size += uint32(n) * 8 + for i := 0; i < n; i++ { + sl.data = append(sl.data, 0xff) + } +} + +// next returns the lowest free slot after start. +func (sl *slots) next(start uint32) uint32 { + for i := start; i < sl.size; i++ { + if sl.data[i/8]&(1<<(i%8)) > 0 { + return i + } + } + return sl.size +} + +// push inserts a free slot. +func (sl *slots) push(i uint32) { + if sl.head > i { + sl.head = i + } + sl.data[i/8] |= 1 << (i % 8) +} + +// pop returns the lowest available free slot. +func (sl *slots) pop() uint32 { + head := sl.head + if head == sl.size { + sl.extend(1) + } + sl.data[head/8] &= ^(1 << (head % 8)) + sl.head = sl.next(head + 1) + return head +} + +// forever loop processing. +func (sl *slots) process(quit chan struct{}) { + var head uint32 // the currently pending next free slots + var out chan uint32 // nullable output channel, need to pop a free slot when nil + for { + // if out is nil, need to pop a new head unless quitting + if out == nil && quit != nil { + // if read a free slot to head, switch on case 0 by assigning out channel + head = sl.pop() + out = sl.out + } + + select { + // listen to released slots and append one to the slots + case slot, more := <-sl.in: + if !more { + return + } + sl.push(slot) + sl.wg.Done() + + // let out channel capture the free slot and set out to nil to pop a new free slot + case out <- head: + out = nil + + // quit is effective only after all initiated releases are received + case <-quit: + if out != nil { + sl.push(head) + out = nil + } + quit = nil + sl.wg.Done() + go func() { + sl.wg.Wait() + close(sl.in) + }() + } + } +} diff --git a/pkg/sharky/store.go b/pkg/sharky/store.go new file mode 100644 index 00000000000..6726901646e --- /dev/null +++ b/pkg/sharky/store.go @@ -0,0 +1,193 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sharky + +import ( + "context" + "errors" + "fmt" + "io/fs" + "strconv" + "sync" + + "github.com/hashicorp/go-multierror" +) + +var ( + // ErrTooLong returned by Write if the blob length exceeds the max blobsize. + ErrTooLong = errors.New("data too long") + // ErrQuitting returned by Write when the store is Closed before the write completes. + ErrQuitting = errors.New("quitting") +) + +// Store models the sharded fix-length blobstore +// Design provides lockless sharding: +// - shard choice responding to backpressure by running operation +// - read prioritisation over writing +// - free slots allow write +type Store struct { + maxDataSize int // max length of blobs + writes chan write // shared write operations channel + shards []*shard // shards + wg *sync.WaitGroup // count started operations + quit chan struct{} // quit channel + metrics metrics +} + +// New constructs a sharded blobstore +// arguments: +// - base directory string +// - shard count - positive integer < 256 - cannot be zero or expect panic +// - shard size - positive integer multiple of 8 - for others expect undefined behaviour +// - maxDataSize - positive integer representing the maximum blob size to be stored +func New(basedir fs.FS, shardCnt int, maxDataSize int) (*Store, error) { + store := &Store{ + maxDataSize: maxDataSize, + writes: make(chan write), + shards: make([]*shard, shardCnt), + wg: &sync.WaitGroup{}, + quit: make(chan struct{}), + metrics: newMetrics(), + } + for i := range store.shards { + s, err := store.create(uint8(i), maxDataSize, basedir) + if err != nil { + return nil, err + } + store.shards[i] = s + } + store.metrics.ShardCount.Set(float64(len(store.shards))) + + return store, nil +} + +// Close closes each shard and return incidental errors from each shard +func (s *Store) Close() error { + close(s.quit) + err := new(multierror.Error) + for _, sh := range s.shards { + err = multierror.Append(err, sh.close()) + } + + return err.ErrorOrNil() +} + +// create creates a new shard with index, max capacity limit, file within base directory +func (s *Store) create(index uint8, maxDataSize int, basedir fs.FS) (*shard, error) { + file, err := basedir.Open(fmt.Sprintf("shard_%03d", index)) + if err != nil { + return nil, err + } + ffile, err := basedir.Open(fmt.Sprintf("free_%03d", index)) + if err != nil { + return nil, err + } + sl := newSlots(ffile.(sharkyFile), s.wg) + err = sl.load() + if err != nil { + return nil, err + } + sh := &shard{ + reads: make(chan read), + errc: make(chan error), + writes: s.writes, + index: index, + maxDataSize: maxDataSize, + file: file.(sharkyFile), + slots: sl, + quit: s.quit, + } + terminated := make(chan struct{}) + sh.slots.wg.Add(1) + go func() { + sh.process() + close(terminated) + }() + go sl.process(terminated) + return sh, nil +} + +// Read reads the content of the blob found at location into the byte buffer given +// The location is assumed to be obtained by an earlier Write call storing the blob +func (s *Store) Read(ctx context.Context, loc Location, buf []byte) (err error) { + sh := s.shards[loc.Shard] + select { + case sh.reads <- read{buf[:loc.Length], loc.Slot}: + s.metrics.TotalReadCalls.Inc() + case <-ctx.Done(): + return ctx.Err() + } + + select { + case err = <-sh.errc: + if err != nil { + s.metrics.TotalReadCallsErr.Inc() + } + return err + case <-s.quit: + return ErrQuitting + case <-ctx.Done(): + return ctx.Err() + } +} + +// Write stores a new blob and returns its location to be used as a reference +// It can be given to a Read call to return the stored blob. +func (s *Store) Write(ctx context.Context, data []byte) (loc Location, err error) { + if len(data) > s.maxDataSize { + return loc, ErrTooLong + } + s.wg.Add(1) + defer s.wg.Done() + + c := make(chan entry) + + select { + case s.writes <- write{data, c}: + s.metrics.TotalWriteCalls.Inc() + case <-s.quit: + return loc, ErrQuitting + case <-ctx.Done(): + return loc, ctx.Err() + } + + select { + case e := <-c: + if e.err == nil { + shard := strconv.Itoa(int(e.loc.Shard)) + s.metrics.CurrentShardSize.WithLabelValues(shard).Inc() + s.metrics.ShardFragmentation.WithLabelValues(shard).Add(float64(s.maxDataSize - int(e.loc.Length))) + } else { + s.metrics.TotalWriteCallsErr.Inc() + } + return e.loc, e.err + case <-s.quit: + return loc, ErrQuitting + case <-ctx.Done(): + return loc, ctx.Err() + } +} + +// Release gives back the slot to the shard +// From here on the slot can be reused and overwritten +// Release is meant to be called when an entry in the upstream db is removed +// Note that releasing is not safe for obfuscating earlier content, since +// even after reuse, the slot may be used by a very short blob and leaves the +// rest of the old blob bytes untouched +func (s *Store) Release(ctx context.Context, loc Location) error { + sh := s.shards[loc.Shard] + // we add the current routine and will be Done in slots.process + sh.slots.wg.Add(1) + err := sh.release(ctx, loc.Slot) + s.metrics.TotalReleaseCalls.Inc() + if err == nil { + shard := strconv.Itoa(int(sh.index)) + s.metrics.CurrentShardSize.WithLabelValues(shard).Inc() + s.metrics.ShardFragmentation.WithLabelValues(shard).Sub(float64(s.maxDataSize - int(loc.Length))) + } else { + s.metrics.TotalReleaseCallsErr.Inc() + } + return err +} diff --git a/pkg/shed/db.go b/pkg/shed/db.go index 0798606c86b..79cda4e5f82 100644 --- a/pkg/shed/db.go +++ b/pkg/shed/db.go @@ -29,6 +29,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/storage" + "github.com/syndtr/goleveldb/leveldb/util" ) var ( @@ -183,6 +184,13 @@ func (db *DB) WriteBatch(batch *leveldb.Batch) (err error) { return nil } +// Compact triggers a full database compaction on the underlying +// LevelDB instance. Use with care! This can be very expensive! +func (db *DB) Compact(start, end []byte) error { + r := util.Range{Start: start, Limit: end} + return db.ldb.CompactRange(r) +} + // Close closes LevelDB database. func (db *DB) Close() (err error) { close(db.quit) diff --git a/pkg/shed/index.go b/pkg/shed/index.go index 044a2fdbded..573dd9fd51a 100644 --- a/pkg/shed/index.go +++ b/pkg/shed/index.go @@ -40,6 +40,7 @@ import ( type Item struct { Address []byte Data []byte + Location []byte // Location of the chunk in sharky context AccessTimestamp int64 StoreTimestamp int64 BinID uint64 @@ -65,6 +66,9 @@ func (i Item) Merge(i2 Item) Item { if i.Data == nil { i.Data = i2.Data } + if i.Location == nil { + i.Location = i2.Location + } if i.AccessTimestamp == 0 { i.AccessTimestamp = i2.AccessTimestamp } diff --git a/pkg/soc/soc.go b/pkg/soc/soc.go index a524caa41a0..2ff49c648cd 100644 --- a/pkg/soc/soc.go +++ b/pkg/soc/soc.go @@ -15,12 +15,6 @@ import ( "github.com/ethersphere/bee/pkg/swarm" ) -const ( - IdSize = 32 - SignatureSize = 65 - minChunkSize = IdSize + SignatureSize + swarm.SpanSize -) - var ( errInvalidAddress = errors.New("soc: invalid address") errWrongChunkSize = errors.New("soc: chunk length is less than minimum") @@ -124,7 +118,7 @@ func (s *SOC) Sign(signer crypto.Signer) (swarm.Chunk, error) { // FromChunk recreates a SOC representation from swarm.Chunk data. func FromChunk(sch swarm.Chunk) (*SOC, error) { chunkData := sch.Data() - if len(chunkData) < minChunkSize { + if len(chunkData) < swarm.SocMinChunkSize { return nil, errWrongChunkSize } @@ -132,11 +126,11 @@ func FromChunk(sch swarm.Chunk) (*SOC, error) { s := &SOC{} cursor := 0 - s.id = chunkData[cursor:IdSize] - cursor += IdSize + s.id = chunkData[cursor:swarm.HashSize] + cursor += swarm.HashSize - s.signature = chunkData[cursor : cursor+SignatureSize] - cursor += SignatureSize + s.signature = chunkData[cursor : cursor+swarm.SocSignatureSize] + cursor += swarm.SocSignatureSize ch, err := cac.NewWithDataSpan(chunkData[cursor:]) if err != nil { diff --git a/pkg/soc/soc_test.go b/pkg/soc/soc_test.go index c121cd0e8fb..1205ebe5bcc 100644 --- a/pkg/soc/soc_test.go +++ b/pkg/soc/soc_test.go @@ -25,7 +25,7 @@ func TestNew(t *testing.T) { t.Fatal(err) } - id := make([]byte, soc.IdSize) + id := make([]byte, swarm.HashSize) s := soc.New(id, ch) // check SOC fields @@ -59,7 +59,7 @@ func TestNewSigned(t *testing.T) { t.Fatal(err) } - id := make([]byte, soc.IdSize) + id := make([]byte, swarm.HashSize) s, err := soc.NewSigned(id, ch, owner.Bytes(), sig) if err != nil { t.Fatal(err) @@ -105,7 +105,7 @@ func TestChunk(t *testing.T) { t.Fatal(err) } - id := make([]byte, soc.IdSize) + id := make([]byte, swarm.HashSize) // creates a new signed SOC s, err := soc.NewSigned(id, ch, owner.Bytes(), sig) if err != nil { @@ -131,16 +131,16 @@ func TestChunk(t *testing.T) { chunkData := sch.Data() // verifies that id, signature, payload is in place in the SOC chunk cursor := 0 - if !bytes.Equal(chunkData[cursor:soc.IdSize], id) { - t.Fatalf("id mismatch. got %x want %x", chunkData[cursor:soc.IdSize], id) + if !bytes.Equal(chunkData[cursor:swarm.HashSize], id) { + t.Fatalf("id mismatch. got %x want %x", chunkData[cursor:swarm.HashSize], id) } - cursor += soc.IdSize + cursor += swarm.HashSize - signature := chunkData[cursor : cursor+soc.SignatureSize] + signature := chunkData[cursor : cursor+swarm.SocSignatureSize] if !bytes.Equal(signature, sig) { t.Fatalf("signature mismatch. got %x want %x", signature, sig) } - cursor += soc.SignatureSize + cursor += swarm.SocSignatureSize spanBytes := make([]byte, swarm.SpanSize) binary.LittleEndian.PutUint64(spanBytes, uint64(len(payload))) @@ -160,7 +160,7 @@ func TestChunkErrorWithoutOwner(t *testing.T) { if err != nil { t.Fatal(err) } - id := make([]byte, soc.IdSize) + id := make([]byte, swarm.HashSize) // creates a new soc s := soc.New(id, ch) @@ -185,7 +185,7 @@ func TestSign(t *testing.T) { t.Fatal(err) } - id := make([]byte, soc.IdSize) + id := make([]byte, swarm.HashSize) // creates the soc s := soc.New(id, ch) @@ -197,8 +197,8 @@ func TestSign(t *testing.T) { chunkData := sch.Data() // get signature in the chunk - cursor := soc.IdSize - signature := chunkData[cursor : cursor+soc.SignatureSize] + cursor := swarm.HashSize + signature := chunkData[cursor : cursor+swarm.SocSignatureSize] // get the public key of the signer publicKey, err := signer.PublicKey() @@ -238,10 +238,10 @@ func TestFromChunk(t *testing.T) { // owner: 0x8d3766440f0d7b949a5e32995d09619a7f86e632 sch := swarm.NewChunk(socAddress, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 90, 205, 56, 79, 235, 193, 51, 183, 178, 69, 229, 221, 198, 45, 130, 210, 205, 237, 145, 130, 210, 113, 97, 38, 205, 136, 68, 80, 154, 246, 90, 5, 61, 235, 65, 130, 8, 2, 127, 84, 142, 62, 136, 52, 58, 246, 248, 74, 135, 114, 251, 60, 235, 192, 161, 131, 58, 14, 167, 236, 12, 19, 72, 49, 27, 3, 0, 0, 0, 0, 0, 0, 0, 102, 111, 111}) - cursor := soc.IdSize + soc.SignatureSize + cursor := swarm.HashSize + swarm.SocSignatureSize data := sch.Data() - id := data[:soc.IdSize] - sig := data[soc.IdSize:cursor] + id := data[:swarm.HashSize] + sig := data[swarm.HashSize:cursor] chunkData := data[cursor:] chunkAddress := swarm.MustParseHexAddress("2387e8e7d8a48c2a9339c97c1dc3461a9a7aa07e994c5cb8b38fd7c1b3e6ea48") @@ -283,7 +283,7 @@ func TestFromChunk(t *testing.T) { } func TestCreateAddress(t *testing.T) { - id := make([]byte, soc.IdSize) + id := make([]byte, swarm.HashSize) owner := common.HexToAddress("8d3766440f0d7b949a5e32995d09619a7f86e632") socAddress := swarm.MustParseHexAddress("9d453ebb73b2fedaaf44ceddcf7a0aa37f3e3d6453fea5841c31f0ea6d61dc85") @@ -298,7 +298,7 @@ func TestCreateAddress(t *testing.T) { func TestRecoverAddress(t *testing.T) { owner := common.HexToAddress("8d3766440f0d7b949a5e32995d09619a7f86e632") - id := make([]byte, soc.IdSize) + id := make([]byte, swarm.HashSize) chunkAddress := swarm.MustParseHexAddress("2387e8e7d8a48c2a9339c97c1dc3461a9a7aa07e994c5cb8b38fd7c1b3e6ea48") signedDigest, err := soc.Hash(id, chunkAddress.Bytes()) if err != nil { diff --git a/pkg/soc/testing/soc.go b/pkg/soc/testing/soc.go index 9279822e075..c8449e820ce 100644 --- a/pkg/soc/testing/soc.go +++ b/pkg/soc/testing/soc.go @@ -51,7 +51,7 @@ func GenerateMockSOC(t *testing.T, data []byte) *MockSOC { t.Fatal(err) } - id := make([]byte, soc.IdSize) + id := make([]byte, swarm.HashSize) hasher := swarm.NewHasher() _, err = hasher.Write(append(id, ch.Address().Bytes()...)) if err != nil { diff --git a/pkg/soc/validator_test.go b/pkg/soc/validator_test.go index 074b0816341..a7509b18fe7 100644 --- a/pkg/soc/validator_test.go +++ b/pkg/soc/validator_test.go @@ -58,7 +58,7 @@ func TestInvalid(t *testing.T) { chunk: func() swarm.Chunk { data := make([]byte, len(sch.Data())) copy(data, sch.Data()) - cursor := soc.IdSize + soc.SignatureSize + cursor := swarm.HashSize + swarm.SocSignatureSize chunkData := data[cursor:] chunkData[0] = 0x01 return swarm.NewChunk(socAddress, data) @@ -69,7 +69,7 @@ func TestInvalid(t *testing.T) { chunk: func() swarm.Chunk { data := make([]byte, len(sch.Data())) copy(data, sch.Data()) - id := data[:soc.IdSize] + id := data[:swarm.HashSize] id[0] = 0x01 return swarm.NewChunk(socAddress, data) }, @@ -80,8 +80,8 @@ func TestInvalid(t *testing.T) { data := make([]byte, len(sch.Data())) copy(data, sch.Data()) // modify signature - cursor := soc.IdSize + soc.SignatureSize - sig := data[soc.IdSize:cursor] + cursor := swarm.HashSize + swarm.SocSignatureSize + sig := data[swarm.HashSize:cursor] sig[0] = 0x01 return swarm.NewChunk(socAddress, data) }, diff --git a/pkg/swarm/swarm.go b/pkg/swarm/swarm.go index 957aef0bd86..30eb10c06a7 100644 --- a/pkg/swarm/swarm.go +++ b/pkg/swarm/swarm.go @@ -28,6 +28,9 @@ const ( ExtendedPO uint8 = MaxPO + 5 MaxBins = MaxPO + 1 ChunkWithSpanSize = ChunkSize + SpanSize + SocSignatureSize = 65 + SocMinChunkSize = HashSize + SocSignatureSize + SpanSize + SocMaxChunkSize = SocMinChunkSize + ChunkSize ) var (