Skip to content

Commit

Permalink
fix: upload
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Apr 15, 2024
1 parent 6d08b94 commit db4ead4
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ func NewBee(
retrieval := retrieval.New(swarmAddress, waitNetworkRFunc, localStore, p2ps, kad, logger, acc, pricer, tracer, o.RetrievalCaching)
localStore.SetRetrievalService(retrieval)

pusherService := pusher.New(networkID, localStore, waitNetworkRFunc, pushSyncProtocol, validStamp, logger, tracer, warmupTime, pusher.DefaultRetryCount)
pusherService := pusher.New(networkID, localStore, waitNetworkRFunc, pushSyncProtocol, validStamp, logger, warmupTime, pusher.DefaultRetryCount)
b.pusherCloser = pusherService

pusherService.AddFeed(localStore.PusherFeed())
Expand Down
7 changes: 2 additions & 5 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type Service struct {
inflight *inflight
attempts *attempts
smuggler chan OpChan
tracer *tracing.Tracer
}

const (
Expand All @@ -82,7 +81,6 @@ func New(
pushSyncer pushsync.PushSyncer,
validStamp postage.ValidStampFn,
logger log.Logger,
tracer *tracing.Tracer,
warmupTime time.Duration,
retryCount int,
) *Service {
Expand All @@ -99,15 +97,14 @@ func New(
inflight: newInflight(),
attempts: &attempts{retryCount: retryCount, attempts: make(map[string]int)},
smuggler: make(chan OpChan),
tracer: tracer,
}
go p.chunksWorker(warmupTime, tracer)
go p.chunksWorker(warmupTime)
return p
}

// chunksWorker is a loop that keeps looking for chunks that are locally uploaded ( by monitoring pushIndex )
// and pushes them to the closest peer and get a receipt.
func (s *Service) chunksWorker(warmupTime time.Duration, tracer *tracing.Tracer) {
func (s *Service) chunksWorker(warmupTime time.Duration) {
defer close(s.chunksWorkerQuitC)
select {
case <-time.After(warmupTime):
Expand Down
2 changes: 1 addition & 1 deletion pkg/pusher/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func createPusher(

radiusFunc := func() (uint8, error) { return radius, nil }

pusherService := pusher.New(1, storer, radiusFunc, pushSyncService, validStamp, log.Noop, nil, 0, retryCount)
pusherService := pusher.New(1, storer, radiusFunc, pushSyncService, validStamp, log.Noop, 0, retryCount)
testutil.CleanupCloser(t, pusherService)

return pusherService
Expand Down
4 changes: 2 additions & 2 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo

inflight--

ps.measurePushPeer(result.pushTime, result.err, origin)
ps.measurePushPeer(result.pushTime, result.err)

if result.err == nil {
return result.receipt, nil
Expand Down Expand Up @@ -558,7 +558,7 @@ func (ps *PushSync) prepareCredit(ctx context.Context, peer swarm.Address, ch sw
return creditAction, nil
}

func (ps *PushSync) measurePushPeer(t time.Time, err error, origin bool) {
func (ps *PushSync) measurePushPeer(t time.Time, err error) {
var status string
if err != nil {
status = "failure"
Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/internal/upload/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ func ListAllTags(st storage.Reader) ([]TagItem, error) {
return tags, nil
}

func Iterate(ctx context.Context, s transaction.ReadOnlyStore, consumerFn func(chunk swarm.Chunk) (bool, error)) error {
func IteratePending(ctx context.Context, s transaction.ReadOnlyStore, consumerFn func(chunk swarm.Chunk) (bool, error)) error {
return s.IndexStore().Iterate(storage.Query{
Factory: func() storage.Item { return &pushItem{} },
}, func(r storage.Result) (bool, error) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/storer/internal/upload/uploadstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ func TestIterate(t *testing.T) {
ts := newTestStorage(t)

t.Run("on empty storage does not call the callback fn", func(t *testing.T) {
err := upload.Iterate(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) {
err := upload.IteratePending(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) {
t.Fatal("unexpected call")
return false, nil
})
Expand Down Expand Up @@ -938,7 +938,7 @@ func TestIterate(t *testing.T) {

var count int

err = upload.Iterate(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) {
err = upload.IteratePending(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) {
count++
if !chunk.Equal(chunk1) && !chunk.Equal(chunk2) {
return true, fmt.Errorf("unknown chunk %s", chunk.Address())
Expand All @@ -958,7 +958,7 @@ func TestIterate(t *testing.T) {
t.Fatalf("Close(...) error: %v", err)
}

err = upload.Iterate(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) {
err = upload.IteratePending(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) {
count++
if !chunk.Equal(chunk1) && !chunk.Equal(chunk2) {
return true, fmt.Errorf("unknown chunk %s", chunk.Address())
Expand Down Expand Up @@ -1081,7 +1081,7 @@ func TestCleanup(t *testing.T) {
}

count := 0
_ = upload.Iterate(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) {
_ = upload.IteratePending(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) {
count++
return false, nil
})
Expand Down Expand Up @@ -1130,7 +1130,7 @@ func TestCleanup(t *testing.T) {
}

count := 0
_ = upload.Iterate(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) {
_ = upload.IteratePending(context.Background(), ts, func(chunk swarm.Chunk) (bool, error) {
count++
return false, nil
})
Expand Down
3 changes: 1 addition & 2 deletions pkg/storer/subscribe_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/ethersphere/bee/v2/pkg/storer/internal/upload"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

Expand Down Expand Up @@ -37,7 +36,7 @@ func (db *DB) SubscribePush(ctx context.Context) (<-chan swarm.Chunk, func()) {

var count int

err := upload.Iterate(ctx, db.storage, func(chunk swarm.Chunk) (bool, error) {
err := db.IteratePendingUpload(ctx, db.storage, func(chunk swarm.Chunk) (bool, error) {
select {
case chunks <- chunk:
count++
Expand Down
8 changes: 8 additions & 0 deletions pkg/storer/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ import (

const uploadsLock = "pin-upload-store"

// IteratePendingUpload iterates the chunks that are pending to be uploaded to the network.
func (db *DB) IteratePendingUpload(ctx context.Context, s transaction.ReadOnlyStore, consumerFn func(chunk swarm.Chunk) (bool, error)) error {
unlock := db.Lock(uploadsLock)
defer unlock()

return upload.IteratePending(ctx, s, consumerFn)
}

// Report implements the storage.PushReporter by wrapping the internal reporter
// with a transaction.
func (db *DB) Report(ctx context.Context, chunk swarm.Chunk, state storage.ChunkState) error {
Expand Down

0 comments on commit db4ead4

Please sign in to comment.