From ca79d9a5a1643238d0372767fffd61d865cf6e77 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 13 Dec 2023 03:04:15 +0300 Subject: [PATCH 1/2] feat(puller): limiter --- pkg/puller/puller.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index dc0965aa868..df69b66c698 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -24,6 +24,7 @@ import ( "github.com/ethersphere/bee/pkg/storer" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology" + ratelimit "golang.org/x/time/rate" ) // loggerName is the tree path name of the logger for this package. @@ -65,6 +66,8 @@ type Puller struct { rate *rate.Rate // rate of historical syncing start sync.Once + + limiter *ratelimit.Limiter } func New( @@ -92,6 +95,7 @@ func New( blockLister: blockLister, rate: rate.New(DefaultHistRateWindow), cancel: func() { /* Noop, since the context is initialized in the Start(). */ }, + limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second), 64), // allows for 1 sync per second, and 64 bursts } return p @@ -301,6 +305,8 @@ func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, for { + _ = p.limiter.Wait(ctx) + select { case <-ctx.Done(): loggerV2.Debug("syncWorker context cancelled", "peer_address", peer, "bin", bin) @@ -308,8 +314,6 @@ func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, default: } - p.metrics.SyncWorkerIterCounter.Inc() - s, _, _, err := p.nextPeerInterval(peer, bin) if err != nil { p.metrics.SyncWorkerErrCounter.Inc() From 4ed71490528e57c8654055e83b0fbd999b4aebb7 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 13 Dec 2023 13:25:33 +0300 Subject: [PATCH 2/2] fix: limiter --- pkg/puller/puller.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index df69b66c698..b25acdc3dc2 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -95,7 +95,7 @@ func New( blockLister: blockLister, rate: rate.New(DefaultHistRateWindow), cancel: func() { /* Noop, since the context is initialized in the Start(). */ }, - limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second), 64), // allows for 1 sync per second, and 64 bursts + limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second), int(swarm.MaxBins)), // allows for 1 sync per second, max bins bursts } return p @@ -305,7 +305,10 @@ func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, for { - _ = p.limiter.Wait(ctx) + // rate limit within neighborhood + if bin >= p.radius.StorageRadius() { + _ = p.limiter.Wait(ctx) + } select { case <-ctx.Done():