Skip to content

Commit

Permalink
feat(puller): limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Dec 13, 2023
1 parent 5a867fe commit b92cd56
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
ratelimit "golang.org/x/time/rate"
)

// loggerName is the tree path name of the logger for this package.
Expand Down Expand Up @@ -65,6 +66,8 @@ type Puller struct {
rate *rate.Rate // rate of historical syncing

start sync.Once

limiter *ratelimit.Limiter
}

func New(
Expand Down Expand Up @@ -92,6 +95,7 @@ func New(
blockLister: blockLister,
rate: rate.New(DefaultHistRateWindow),
cancel: func() { /* Noop, since the context is initialized in the Start(). */ },
limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second), 64), // allows for 1 sync per second, and 64 bursts
}

return p
Expand Down Expand Up @@ -301,15 +305,15 @@ func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8,

for {

_ = p.limiter.Wait(ctx)

select {
case <-ctx.Done():
loggerV2.Debug("syncWorker context cancelled", "peer_address", peer, "bin", bin)
return
default:
}

p.metrics.SyncWorkerIterCounter.Inc()

s, _, _, err := p.nextPeerInterval(peer, bin)
if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
Expand Down

0 comments on commit b92cd56

Please sign in to comment.