diff --git a/pkg/storer/debug.go b/pkg/storer/debug.go index f78c866c906..9128141a430 100644 --- a/pkg/storer/debug.go +++ b/pkg/storer/debug.go @@ -16,8 +16,8 @@ import ( ) type UploadStat struct { - TotalUploaded int - TotalSynced int + TotalUploaded uint64 + TotalSynced uint64 } type PinningStat struct { @@ -80,28 +80,22 @@ func (db *DB) DebugInfo(ctx context.Context) (Info, error) { }) var ( - uploaded int - synced int + uploaded uint64 + synced uint64 ) eg.Go(func() error { - return upload.IterateAll( - db.repo.IndexStore(), - func(_ swarm.Address, isSynced bool) (bool, error) { - select { - case <-ctx.Done(): - return true, ctx.Err() - case <-db.quit: - return true, ErrDBQuit - default: - } - - uploaded++ - if isSynced { - synced++ - } - return false, nil - }, - ) + return upload.IterateAllTagItems(db.repo.IndexStore(), func(ti *upload.TagItem) (bool, error) { + select { + case <-ctx.Done(): + return true, ctx.Err() + case <-db.quit: + return true, ErrDBQuit + default: + } + uploaded += ti.Split + synced += ti.Synced + return false, nil + }) }) var ( diff --git a/pkg/storer/internal/upload/uploadstore.go b/pkg/storer/internal/upload/uploadstore.go index 1126770176a..81fed2a8bcb 100644 --- a/pkg/storer/internal/upload/uploadstore.go +++ b/pkg/storer/internal/upload/uploadstore.go @@ -17,7 +17,6 @@ import ( "github.com/ethersphere/bee/pkg/storage/storageutil" "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/chunkstamp" - "github.com/ethersphere/bee/pkg/storer/internal/stampindex" "github.com/ethersphere/bee/pkg/swarm" ) @@ -231,10 +230,17 @@ type uploadItem struct { TagID uint64 Uploaded int64 Synced int64 + + // IdFunc overrides the ID method. + // This used to get the ID from the item where the address and batchID were not marshalled. + IdFunc func() string } // ID implements the storage.Item interface. func (i uploadItem) ID() string { + if i.IdFunc != nil { + return i.IdFunc() + } return storageutil.JoinFields(i.Address.ByteString(), string(i.BatchID)) } @@ -351,10 +357,6 @@ func (i dirtyTagItem) String() string { return storageutil.JoinFields(i.Namespace(), i.ID()) } -// stampIndexUploadNamespace represents the -// namespace name of the stamp index for upload. -const stampIndexUploadNamespace = "upload" - var ( // errPutterAlreadyClosed is returned when trying to Put a new chunk // after the putter has been closed. @@ -420,28 +422,6 @@ func (u *uploadPutter) Put(ctx context.Context, s internal.Storage, writer stora return nil } - switch item, loaded, err := stampindex.LoadOrStore( - s.IndexStore(), - writer, - stampIndexUploadNamespace, - chunk, - ); { - case err != nil: - return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) - case loaded && item.ChunkIsImmutable: - return errOverwriteOfImmutableBatch - case loaded && !item.ChunkIsImmutable: - prev := binary.BigEndian.Uint64(item.StampTimestamp) - curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) - if prev > curr { - return errOverwriteOfNewerBatch - } - err = stampindex.Store(writer, stampIndexUploadNamespace, chunk) - if err != nil { - return fmt.Errorf("failed updating stamp index: %w", err) - } - } - u.split++ if err := s.ChunkStore().Put(ctx, chunk); err != nil { @@ -711,10 +691,9 @@ func Report( return fmt.Errorf("failed deleting chunk %s: %w", chunk.Address(), err) } - ui.Synced = now().UnixNano() - err = batch.Put(ui) + err = batch.Delete(ui) if err != nil { - return fmt.Errorf("failed updating uploadItem %s: %w", ui, err) + return fmt.Errorf("failed deleting uploadItem %s: %w", ui, err) } return batch.Commit() @@ -848,15 +827,31 @@ func DeleteTag(st storage.Store, tagID uint64) error { return nil } -func IterateAll(st storage.Store, iterateFn func(addr swarm.Address, isSynced bool) (bool, error)) error { +func IterateAll(st storage.Store, iterateFn func(item storage.Item) (bool, error)) error { return st.Iterate( storage.Query{ Factory: func() storage.Item { return new(uploadItem) }, }, func(r storage.Result) (bool, error) { - address := swarm.NewAddress([]byte(r.ID[:32])) - synced := r.Entry.(*uploadItem).Synced != 0 - return iterateFn(address, synced) + ui := r.Entry.(*uploadItem) + ui.IdFunc = func() string { + return r.ID + } + return iterateFn(ui) + }, + ) +} + +func IterateAllTagItems(st storage.Store, cb func(ti *TagItem) (bool, error)) error { + return st.Iterate( + storage.Query{ + Factory: func() storage.Item { + return new(TagItem) + }, + }, + func(result storage.Result) (bool, error) { + ti := result.Entry.(*TagItem) + return cb(ti) }, ) } diff --git a/pkg/storer/internal/upload/uploadstore_test.go b/pkg/storer/internal/upload/uploadstore_test.go index 569b8c7999e..a2f7f931913 100644 --- a/pkg/storer/internal/upload/uploadstore_test.go +++ b/pkg/storer/internal/upload/uploadstore_test.go @@ -7,7 +7,6 @@ package upload_test import ( "bytes" "context" - "encoding/binary" "errors" "fmt" "math" @@ -16,7 +15,6 @@ import ( "testing" "time" - "github.com/ethersphere/bee/pkg/postage" storage "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage/storagetest" chunktest "github.com/ethersphere/bee/pkg/storage/testing" @@ -528,20 +526,27 @@ func TestChunkPutter(t *testing.T) { t.Run("iterate all", func(t *testing.T) { count := 0 - err := upload.IterateAll(ts.IndexStore(), func(addr swarm.Address, synced bool) (bool, error) { - count++ - if synced { - t.Fatal("expected synced to be false") - } - has, err := ts.ChunkStore().Has(context.Background(), addr) - if err != nil { - t.Fatalf("unexpected error in Has(...): %v", err) - } - if !has { - t.Fatalf("expected chunk to be present %s", addr.String()) - } - return false, nil - }) + err := ts.IndexStore().Iterate( + storage.Query{ + Factory: func() storage.Item { return new(upload.UploadItem) }, + }, + func(r storage.Result) (bool, error) { + address := swarm.NewAddress([]byte(r.ID[:32])) + synced := r.Entry.(*upload.UploadItem).Synced != 0 + count++ + if synced { + t.Fatal("expected synced to be false") + } + has, err := ts.ChunkStore().Has(context.Background(), address) + if err != nil { + t.Fatalf("unexpected error in Has(...): %v", err) + } + if !has { + t.Fatalf("expected chunk to be present %s", address.String()) + } + return false, nil + }, + ) if err != nil { t.Fatalf("IterateAll(...): unexpected error %v", err) } @@ -573,6 +578,28 @@ func TestChunkPutter(t *testing.T) { if diff := cmp.Diff(wantTI, ti); diff != "" { t.Fatalf("Get(...): unexpected TagItem (-want +have):\n%s", diff) } + + t.Run("iterate all tag items", func(t *testing.T) { + var tagItemsCount, uploaded, synced uint64 + err := upload.IterateAllTagItems(ts.IndexStore(), func(ti *upload.TagItem) (bool, error) { + uploaded += ti.Split + synced += ti.Synced + tagItemsCount++ + return false, nil + }) + if err != nil { + t.Fatalf("IterateAllTagItems(...): unexpected error %v", err) + } + if tagItemsCount != 1 { + t.Fatalf("unexpected tagItemsCount: want 1 have %d", tagItemsCount) + } + if uploaded != 20 { + t.Fatalf("unexpected uploaded: want 20 have %d", uploaded) + } + if synced != 0 { + t.Fatalf("unexpected synced: want 0 have %d", synced) + } + }) }) t.Run("error after close", func(t *testing.T) { @@ -711,20 +738,12 @@ func TestChunkReporter(t *testing.T) { Address: chunk.Address(), BatchID: chunk.Stamp().BatchID(), } - err = ts.IndexStore().Get(ui) + has, err := ts.IndexStore().Has(ui) if err != nil { - t.Fatalf("Get(...): unexpected error: %v", err) - } - wantUI := &upload.UploadItem{ - Address: chunk.Address(), - BatchID: chunk.Stamp().BatchID(), - TagID: tag.TagID, - Uploaded: now().UnixNano(), - Synced: now().UnixNano(), + t.Fatalf("unexpected error: %v", err) } - - if diff := cmp.Diff(wantUI, ui); diff != "" { - t.Fatalf("Get(...): unexpected UploadItem (-want +have):\n%s", diff) + if has { + t.Fatalf("expected to not be found: %s", ui) } pi := &upload.PushItem{ @@ -732,7 +751,7 @@ func TestChunkReporter(t *testing.T) { Address: chunk.Address(), BatchID: chunk.Stamp().BatchID(), } - has, err := ts.IndexStore().Has(pi) + has, err = ts.IndexStore().Has(pi) if err != nil { t.Fatalf("Has(...): unexpected error: %v", err) } @@ -780,93 +799,6 @@ func TestChunkReporter(t *testing.T) { }) } -func TestStampIndexHandling(t *testing.T) { - t.Parallel() - - ts := newTestStorage(t) - - tag, err := upload.NextTag(ts.IndexStore()) - if err != nil { - t.Fatalf("failed creating tag: %v", err) - } - - putter, err := upload.NewPutter(ts, tag.TagID) - if err != nil { - t.Fatalf("failed creating putter: %v", err) - } - - t.Run("put chunk with immutable batch", func(t *testing.T) { - chunk := chunktest.GenerateTestRandomChunk() - chunk = chunk.WithBatch( - chunk.Radius(), - chunk.Depth(), - chunk.BucketDepth(), - true, - ) - if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk); err != nil { - t.Fatalf("Put(...): unexpected error: %v", err) - } - - chunk2 := chunktest.GenerateTestRandomChunk().WithStamp(chunk.Stamp()) - - want := upload.ErrOverwriteOfImmutableBatch - have := putter.Put(context.Background(), ts, ts.IndexStore(), chunk2) - if !errors.Is(have, want) { - t.Fatalf("Put(...): unexpected error:\n\twant: %v\n\thave: %v", want, have) - } - }) - - t.Run("put existing index with older batch timestamp", func(t *testing.T) { - chunk := chunktest.GenerateTestRandomChunk() - if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk); err != nil { - t.Fatalf("Put(...): unexpected error: %v", err) - } - - decTS := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) - encTS := make([]byte, 8) - binary.BigEndian.PutUint64(encTS, decTS-1) - - stamp := postage.NewStamp( - chunk.Stamp().BatchID(), - chunk.Stamp().Index(), - encTS, - chunk.Stamp().Sig(), - ) - - chunk2 := chunktest.GenerateTestRandomChunk().WithStamp(stamp) - - want := upload.ErrOverwriteOfNewerBatch - have := putter.Put(context.Background(), ts, ts.IndexStore(), chunk2) - if !errors.Is(have, want) { - t.Fatalf("Put(...): unexpected error:\n\twant: %v\n\thave: %v", want, have) - } - }) - - t.Run("put existing chunk with newer batch timestamp", func(t *testing.T) { - chunk := chunktest.GenerateTestRandomChunk() - if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk); err != nil { - t.Fatalf("Put(...): unexpected error: %v", err) - } - - decTS := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) - encTS := make([]byte, 8) - binary.BigEndian.PutUint64(encTS, decTS+1) - - stamp := postage.NewStamp( - chunk.Stamp().BatchID(), - chunk.Stamp().Index(), - encTS, - chunk.Stamp().Sig(), - ) - - chunk2 := chunktest.GenerateTestRandomChunk().WithStamp(stamp) - - if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk2); err != nil { - t.Fatalf("Put(...): unexpected error: %v", err) - } - }) -} - func TestNextTagID(t *testing.T) { t.Parallel() diff --git a/pkg/storer/migration/all_steps.go b/pkg/storer/migration/all_steps.go index ef028659f6d..69aa419747f 100644 --- a/pkg/storer/migration/all_steps.go +++ b/pkg/storer/migration/all_steps.go @@ -21,6 +21,7 @@ func AfterInitSteps( 2: step_02, 3: step_03(chunkStore, reserve.ChunkType), 4: step_04(sharkyPath, sharkyNoOfShards), + 5: step_05, } } diff --git a/pkg/storer/migration/export_test.go b/pkg/storer/migration/export_test.go index 210d6973607..bfebec5cb42 100644 --- a/pkg/storer/migration/export_test.go +++ b/pkg/storer/migration/export_test.go @@ -9,4 +9,5 @@ var ( Step_02 = step_02 Step_03 = step_03 Step_04 = step_04 + Step_05 = step_05 ) diff --git a/pkg/storer/migration/step_05.go b/pkg/storer/migration/step_05.go new file mode 100644 index 00000000000..f9aa75b2b00 --- /dev/null +++ b/pkg/storer/migration/step_05.go @@ -0,0 +1,53 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package migration + +import ( + "fmt" + "os" + + "github.com/ethersphere/bee/pkg/log" + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storer/internal/upload" +) + +// step_05 is a migration step that removes all upload items from the store. +func step_05(st storage.BatchedStore) error { + logger := log.NewLogger("migration-step-05", log.WithSink(os.Stdout)) + logger.Info("start removing upload items") + + itemC := make(chan storage.Item) + errC := make(chan error) + go func() { + for item := range itemC { + err := st.Delete(item) + if err != nil { + errC <- fmt.Errorf("delete upload item: %w", err) + return + } + } + close(errC) + }() + + go func() { + defer close(itemC) + err := upload.IterateAll(st, func(u storage.Item) (bool, error) { + itemC <- u + return false, nil + }) + if err != nil { + errC <- fmt.Errorf("iterate upload items: %w", err) + return + } + }() + + err := <-errC + if err != nil { + return err + } + + logger.Info("finished removing upload items") + return nil +} diff --git a/pkg/storer/migration/step_05_test.go b/pkg/storer/migration/step_05_test.go new file mode 100644 index 00000000000..cc7a88214a8 --- /dev/null +++ b/pkg/storer/migration/step_05_test.go @@ -0,0 +1,102 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package migration_test + +import ( + "context" + "testing" + + "github.com/ethersphere/bee/pkg/node" + "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/storage" + chunktest "github.com/ethersphere/bee/pkg/storage/testing" + "github.com/ethersphere/bee/pkg/storer" + "github.com/ethersphere/bee/pkg/storer/internal" + "github.com/ethersphere/bee/pkg/storer/internal/upload" + localmigration "github.com/ethersphere/bee/pkg/storer/migration" + "github.com/ethersphere/bee/pkg/swarm" + kademlia "github.com/ethersphere/bee/pkg/topology/mock" + "github.com/ethersphere/bee/pkg/util/testutil" +) + +func Test_Step_05(t *testing.T) { + t.Parallel() + + db, err := storer.New(context.Background(), "", &storer.Options{ + Logger: testutil.NewLogger(t), + RadiusSetter: kademlia.NewTopologyDriver(), + Batchstore: new(postage.NoOpBatchStore), + ReserveCapacity: node.ReserveCapacity, + }) + if err != nil { + t.Fatalf("New(...): unexpected error: %v", err) + } + + t.Cleanup(func() { + err := db.Close() + if err != nil { + t.Fatalf("Close(): unexpected closing storer: %v", err) + } + }) + + wantCount := func(t *testing.T, st internal.Storage, want int) { + t.Helper() + count := 0 + err = upload.IterateAll(st.IndexStore(), func(_ storage.Item) (bool, error) { + count++ + return false, nil + }) + if err != nil { + t.Fatalf("iterate upload items: %v", err) + } + if count != want { + t.Fatalf("expected %d upload items, got %d", want, count) + } + } + + err = db.Execute(context.Background(), func(st internal.Storage) error { + tag, err := upload.NextTag(st.IndexStore()) + if err != nil { + t.Fatalf("create tag: %v", err) + } + + putter, err := upload.NewPutter(st, tag.TagID) + if err != nil { + t.Fatalf("create putter: %v", err) + } + ctx := context.Background() + chunks := chunktest.GenerateTestRandomChunks(10) + b, err := st.IndexStore().Batch(ctx) + if err != nil { + t.Fatalf("create batch: %v", err) + } + + for _, ch := range chunks { + err := putter.Put(ctx, st, b, ch) + if err != nil { + t.Fatalf("put chunk: %v", err) + } + } + err = putter.Close(st, st.IndexStore(), swarm.RandAddress(t)) + if err != nil { + t.Fatalf("close putter: %v", err) + } + err = b.Commit() + if err != nil { + t.Fatalf("commit batch: %v", err) + } + + wantCount(t, st, 10) + err = localmigration.Step_05(st.IndexStore()) + if err != nil { + t.Fatalf("step 05: %v", err) + } + wantCount(t, st, 0) + return nil + }) + if err != nil { + t.Fatalf("execute: %v", err) + } +}