Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: localstore transaction #4626

Merged
merged 57 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
5514f5d
fix(reserve): evict just enough chunks to reach the capacity (#4549)
istae Jan 23, 2024
97383b1
fix(localstore): leveldb-batch based transactions for both index and …
istae Jan 30, 2024
049345a
feat: cache
istae Feb 4, 2024
cacc07b
fix: removed cache and fixed shallow copy
istae Feb 4, 2024
38b3da1
fix: rebase
istae Feb 5, 2024
70bbd74
fix: metrics
istae Feb 5, 2024
193ad34
fix: puller
istae Feb 6, 2024
f1d0814
fix: asd
istae Feb 6, 2024
d027b3e
fix: reserve evict
istae Feb 6, 2024
7eff0af
fix: reserve cleanup
istae Feb 7, 2024
cb7e708
fix: remove readonly
istae Feb 7, 2024
d53c298
fix: enable test
istae Feb 7, 2024
73a24fc
fix: cleanup
istae Feb 7, 2024
805bf46
fix: credit
istae Feb 7, 2024
594f8b1
fix: single reserve worker
istae Feb 8, 2024
d131ede
fix: puller reset
istae Feb 8, 2024
a9da1b1
fix: cleanup
istae Feb 9, 2024
be2c261
fix: mem cache
istae Feb 9, 2024
af05cdd
fix: hint
istae Feb 9, 2024
33678e2
fix: comment
istae Feb 9, 2024
79c7d3d
refactor: rebase
istae Mar 27, 2024
024239d
fix: unit test
istae Mar 27, 2024
28d9b0d
feat: reserve repair cmd
istae Apr 1, 2024
359a75e
chore: reset sync intervals
istae Apr 1, 2024
25deb5c
fix: cleanup
istae Apr 1, 2024
a151f44
fix: completly remove chunk
istae Apr 2, 2024
cf32717
fix: upload
istae Apr 15, 2024
29d5b75
fix: stuck pending pusher
istae Apr 17, 2024
565849e
revert: upload changes and swallow error
istae Apr 17, 2024
bc8e40a
fix: sleep after
istae Apr 24, 2024
9309206
fix: test rename
istae Apr 29, 2024
57313d2
fix: unit test
istae Apr 29, 2024
1bffb5e
fix: unit test
istae Apr 29, 2024
aa68e2b
fix: unit test
istae Apr 29, 2024
ceb5cf4
fix: migration steps
istae Apr 29, 2024
1feb4c8
fix: cache fix
istae May 1, 2024
f0c52d2
fix: cache capacity
istae May 1, 2024
fb224f0
fix: cache fix
istae May 1, 2024
72261b2
fix: parallel reserve repair
istae May 2, 2024
3d1a653
fix: lock inside trx
istae May 2, 2024
adef563
fix: cleanup
istae May 2, 2024
d7e982c
fix: cache speed up
istae May 2, 2024
4b3e755
fix: evict min
istae May 2, 2024
132c7fd
fix: cache evict
istae May 6, 2024
6bdfd8d
fix: removed trx mem
istae May 6, 2024
70b4dcd
fix: size
istae May 6, 2024
541ad2a
fix: extra field
istae May 6, 2024
c7507a8
fix: uploadstore cleanup
istae May 6, 2024
c6e6167
fix: cache store cleanup
istae May 7, 2024
38d11da
fix: cache and reserve size err check
istae May 7, 2024
55a656f
fix: asd
istae May 7, 2024
cd8dde1
fix: extra check
istae May 7, 2024
3dcffb2
fix: cleanup
istae May 7, 2024
8364a13
fix: reserve fix
istae May 7, 2024
8f0f905
fix: reserve repair
istae May 8, 2024
709e2b7
fix: comments
istae May 8, 2024
e4b3da7
fix: num cpu
istae May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (c *command) setHomeDir() (err error) {

func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameDataDir, filepath.Join(c.homeDir, ".bee"), "data directory")
cmd.Flags().Uint64(optionNameCacheCapacity, 1000000, fmt.Sprintf("cache capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize))
cmd.Flags().Uint64(optionNameCacheCapacity, 1_000_000, fmt.Sprintf("cache capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize))
cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 32*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 32*1024*1024, "size of the database write buffer in bytes")
Expand Down
84 changes: 81 additions & 3 deletions cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (

"github.com/ethersphere/bee/v2/pkg/node"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/puller"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer/migration"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/spf13/cobra"
)
Expand All @@ -46,6 +48,7 @@ func (c *command) initDBCmd() {
dbCompactCmd(cmd)
dbValidateCmd(cmd)
dbValidatePinsCmd(cmd)
dbRepairReserve(cmd)

c.root.AddCommand(cmd)
}
Expand Down Expand Up @@ -81,6 +84,7 @@ func dbInfoCmd(cmd *cobra.Command) {
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
Expand Down Expand Up @@ -227,6 +231,76 @@ func dbValidatePinsCmd(cmd *cobra.Command) {
cmd.AddCommand(c)
}

func dbRepairReserve(cmd *cobra.Command) {
c := &cobra.Command{
Use: "repair-reserve",
Short: "Repairs the reserve by resetting the binIDs and removes dangling entries.",
RunE: func(cmd *cobra.Command, args []string) (err error) {
v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}

dataDir, err := cmd.Flags().GetString(optionNameDataDir)
if err != nil {
return fmt.Errorf("get data-dir: %w", err)
}
if dataDir == "" {
return errors.New("no data-dir provided")
}

logger.Warning("Repair will recreate the reserve entries based on the chunk availability in the chunkstore. The epoch time and bin IDs will be reset.")
logger.Warning("The pullsync peer sync intervals are reset so on the next run, the node will perform historical syncing.")
logger.Warning("This is a destructive process. If the process is stopped for any reason, the reserve may become corrupted.")
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
logger.Warning("You have another 10 seconds to change your mind and kill this process with CTRL-C...")
time.Sleep(10 * time.Second)
logger.Warning("proceeding with repair...")

d, err := cmd.Flags().GetDuration(optionNameSleepAfter)
if err != nil {
logger.Error(err, "getting sleep value failed")
}
defer func() { time.Sleep(d) }()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the advantage of sleeping after the operation successfully completes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a infra requirement, to prevent the pod from restarting when it quits

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a comment then.


db, err := storer.New(cmd.Context(), path.Join(dataDir, "localstore"), &storer.Options{
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
}
defer db.Close()

err = migration.ReserveRepairer(db.Storage(), storage.ChunkType, logger)()
if err != nil {
return fmt.Errorf("repair: %w", err)
}

stateStore, _, err := node.InitStateStore(logger, dataDir, 1000)
if err != nil {
return fmt.Errorf("new statestore: %w", err)
}
defer stateStore.Close()

return stateStore.Iterate(puller.IntervalPrefix, func(key, val []byte) (stop bool, err error) {
return false, stateStore.Delete(string(key))
})
},
}
c.Flags().String(optionNameDataDir, "", "data directory")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")
c.Flags().Duration(optionNameSleepAfter, time.Duration(0), "time to sleep after the operation finished")
cmd.AddCommand(c)
}

func dbValidateCmd(cmd *cobra.Command) {
c := &cobra.Command{
Use: "validate",
Expand Down Expand Up @@ -257,7 +331,7 @@ func dbValidateCmd(cmd *cobra.Command) {

localstorePath := path.Join(dataDir, "localstore")

err = storer.Validate(context.Background(), localstorePath, &storer.Options{
err = storer.ValidateRetrievalIndex(context.Background(), localstorePath, &storer.Options{
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
Expand Down Expand Up @@ -325,6 +399,7 @@ func dbExportReserveCmd(cmd *cobra.Command) {
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
Expand Down Expand Up @@ -406,7 +481,8 @@ func dbExportPinningCmd(cmd *cobra.Command) {
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: 4_194_304,
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
Expand Down Expand Up @@ -516,6 +592,7 @@ func dbImportReserveCmd(cmd *cobra.Command) {
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
Expand Down Expand Up @@ -598,7 +675,8 @@ func dbImportPinningCmd(cmd *cobra.Command) {
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: 4_194_304,
ReserveCapacity: node.ReserveCapacity,
CacheCapacity: 1_000_000,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
Expand Down
13 changes: 2 additions & 11 deletions cmd/bee/cmd/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func TestDBExportImport(t *testing.T) {
if err != nil {
t.Fatal(err)
}
fmt.Println("put chunk: ", ch.Address().String())
chunks[ch.Address().String()] = 0
}
db1.Close()
Expand Down Expand Up @@ -115,7 +114,6 @@ func TestDBExportImportPinning(t *testing.T) {
if err != nil {
t.Fatal(err)
}
fmt.Println("collection ", rootAddr.String(), " put chunk: ", ch.Address().String())
chunks[ch.Address().String()] = 0
}
err = collection.Done(rootAddr)
Expand All @@ -125,16 +123,9 @@ func TestDBExportImportPinning(t *testing.T) {
pins[rootAddr.String()] = nil
}

addresses, err := db1.Pins()
if err != nil {
t.Fatal(err)
}
for _, addr := range addresses {
fmt.Println("pin: ", addr.String())
}
db1.Close()

err = newCommand(t, cmd.WithArgs("db", "export", "pinning", export, "--data-dir", dir1)).Execute()
err := newCommand(t, cmd.WithArgs("db", "export", "pinning", export, "--data-dir", dir1)).Execute()
if err != nil {
t.Fatal(err)
}
Expand All @@ -150,7 +141,7 @@ func TestDBExportImportPinning(t *testing.T) {
Logger: testutil.NewLogger(t),
ReserveCapacity: node.ReserveCapacity,
}, dir2)
addresses, err = db2.Pins()
addresses, err := db2.Pins()
if err != nil {
t.Fatal(err)
}
Expand Down
15 changes: 2 additions & 13 deletions pkg/api/stewardship.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request)
}

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id"`
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
janos marked this conversation as resolved.
Show resolved Hide resolved
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
Expand All @@ -41,18 +41,7 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request)
err error
)

if len(headers.BatchID) == 0 {
logger.Debug("missing postage batch id for re-upload")
batchID, err = s.storer.BatchHint(paths.Address)
if err != nil {
logger.Debug("unable to find old batch for reference", "error", err)
logger.Error(nil, "unable to find old batch for reference")
jsonhttp.NotFound(w, "unable to find old batch for reference, provide new batch id")
return
}
} else {
batchID = headers.BatchID
}
batchID = headers.BatchID
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
stamper, save, err := s.getStamper(batchID)
if err != nil {
switch {
Expand Down
1 change: 1 addition & 0 deletions pkg/api/stewardship_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestStewardship(t *testing.T) {
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
jsonhttptest.WithRequestHeader("Swarm-Postage-Batch-Id", "aa"),
)
if !stewardMock.LastAddress().Equal(addr) {
t.Fatalf("\nhave address: %q\nwant address: %q", stewardMock.LastAddress().String(), addr.String())
Expand Down
72 changes: 69 additions & 3 deletions pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ func (m *mockPutter) store(cnt int) error {
}

// nolint:thelper
func TestJoinerRedundancy(t *testing.T) {
func TestJoinerRedundancy_FLAKY(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
rLevel redundancy.Level
Expand Down Expand Up @@ -1229,8 +1229,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
t.Parallel()
test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, size int) {
t.Helper()
store := mockstorer.NewForgettingStore(inmemchunkstore.New())
testutil.CleanupCloser(t, store)
store := mockstorer.NewForgettingStore(newChunkStore())
seed, err := pseudorand.NewSeed()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1367,3 +1366,70 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
})
}
}

type chunkStore struct {
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
mu sync.Mutex
chunks map[string]swarm.Chunk
}

func newChunkStore() *chunkStore {
return &chunkStore{
chunks: make(map[string]swarm.Chunk),
}
}

func (c *chunkStore) Get(_ context.Context, addr swarm.Address) (swarm.Chunk, error) {
c.mu.Lock()
defer c.mu.Unlock()

chunk, ok := c.chunks[addr.ByteString()]
if !ok {
return nil, storage.ErrNotFound
}
return chunk, nil
}

func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error {
c.mu.Lock()
defer c.mu.Unlock()
c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp())
return nil
}

func (c *chunkStore) Has(_ context.Context, addr swarm.Address) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()

_, exists := c.chunks[addr.ByteString()]

return exists, nil
}

func (c *chunkStore) Delete(_ context.Context, addr swarm.Address) error {
c.mu.Lock()
defer c.mu.Unlock()

delete(c.chunks, addr.ByteString())
return nil
}

func (c *chunkStore) Iterate(_ context.Context, fn storage.IterateChunkFn) error {
c.mu.Lock()
defer c.mu.Unlock()

for _, c := range c.chunks {
stop, err := fn(c)
if err != nil {
return err
}
if stop {
return nil
}
}

return nil
}

func (c *chunkStore) Close() error {
return nil
}
8 changes: 0 additions & 8 deletions pkg/node/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/ethersphere/bee/v2/pkg/pricing"
"github.com/ethersphere/bee/v2/pkg/retrieval"
"github.com/ethersphere/bee/v2/pkg/settlement/pseudosettle"
"github.com/ethersphere/bee/v2/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/v2/pkg/spinlock"
"github.com/ethersphere/bee/v2/pkg/storage"
storer "github.com/ethersphere/bee/v2/pkg/storer"
Expand All @@ -42,7 +41,6 @@ import (
"github.com/ethersphere/bee/v2/pkg/topology/kademlia"
"github.com/ethersphere/bee/v2/pkg/topology/lightnode"
"github.com/ethersphere/bee/v2/pkg/tracing"
"github.com/ethersphere/bee/v2/pkg/transaction"
"github.com/hashicorp/go-multierror"
ma "github.com/multiformats/go-multiaddr"
)
Expand All @@ -65,15 +63,9 @@ func bootstrapNode(
addr string,
swarmAddress swarm.Address,
nonce []byte,
chainID int64,
overlayEthAddress common.Address,
addressbook addressbook.Interface,
bootnodes []ma.Multiaddr,
lightNodes *lightnode.Container,
chequebookService chequebook.Service,
chequeStore chequebook.ChequeStore,
cashoutService chequebook.CashoutService,
transactionService transaction.Service,
stateStore storage.StateStorer,
signer crypto.Signer,
networkID uint64,
Expand Down
Loading
Loading