Skip to content

Commit

Permalink
fix: static workers
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Oct 5, 2021
1 parent 8524238 commit 14b0cac
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/beekeeper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
REPLICA: 3
RUN_TYPE: "PR RUN"
SETUP_CONTRACT_IMAGE_TAG: "0.2.0"
BEEKEEPER_BRANCH: "pss-fix"
BEEKEEPER_BRANCH: "master"
runs-on: ubuntu-latest
steps:
- name: Setup Go
Expand Down
2 changes: 2 additions & 0 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,8 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.

s.metrics.CreatedConnectionCount.Inc()

s.reacher.Connected(overlay, i.BzzAddress.Underlay)

peerUserAgent := appendSpace(s.peerUserAgent(ctx, info.ID))

s.logger.Debugf("successfully connected to peer %s%s%s (outbound)", i.BzzAddress.ShortString(), i.LightString(), peerUserAgent)
Expand Down
38 changes: 38 additions & 0 deletions pkg/reacher/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package reacher

import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)

type metrics struct {
Pings prometheus.CounterVec
PingTime prometheus.HistogramVec
}

func newMetrics() metrics {
subsystem := "reacher"

return metrics{
Pings: *prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "pings",
Help: "Ping counter.",
}, []string{"status"}),
PingTime: *prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "ping_timer",
Help: "Ping timer.",
}, []string{"status"}),
}
}

func (s *reacher) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
62 changes: 32 additions & 30 deletions pkg/reacher/reacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,16 @@ import (
"sync"
"time"

"golang.org/x/sync/semaphore"

"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)

const (
pingTimeoutMin = time.Second * 2
pingTimeoutMax = time.Second * 5
pingMaxAttemps = 3
maxWorkers = 16
pingTimeoutMin = time.Second * 2
pingTimeoutMax = time.Second * 5
pingMaxAttempts = 3
workers = 8
)

type peer struct {
Expand All @@ -34,33 +32,36 @@ type reacher struct {
mu sync.Mutex
queue []peer

ctx context.Context
ctxClose context.CancelFunc
run chan struct{}
ctx context.Context
ctxCancel context.CancelFunc
run chan struct{}

pinger p2p.Pinger
notifier p2p.ReachableNotifier

wg sync.WaitGroup
sem *semaphore.Weighted
wg sync.WaitGroup

metrics metrics
}

func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier) *reacher {

ctx, cancel := context.WithCancel(context.Background())

r := &reacher{
run: make(chan struct{}, 1),
pinger: streamer,
notifier: notifier,
wg: sync.WaitGroup{},
sem: semaphore.NewWeighted(maxWorkers),
ctx: ctx,
ctxClose: cancel,
run: make(chan struct{}, 1),
pinger: streamer,
notifier: notifier,
wg: sync.WaitGroup{},
ctx: ctx,
ctxCancel: cancel,
metrics: newMetrics(),
}

r.wg.Add(1)
go r.worker()
r.wg.Add(workers)
for i := 0; i < workers; i++ {
go r.worker()
}

return r
}
Expand All @@ -74,18 +75,12 @@ func (r *reacher) worker() {
case <-r.ctx.Done():
return
case <-r.run:
if err := r.sem.Acquire(r.ctx, 1); err == nil {
r.wg.Add(1)
go r.ping()
}
r.work()
}
}
}

func (r *reacher) ping() {

defer r.wg.Done()
defer r.sem.Release(1)
func (r *reacher) work() {

for {

Expand All @@ -111,16 +106,23 @@ func (r *reacher) ping() {

attempts++

now := time.Now()

ctxt, cancel := context.WithTimeout(r.ctx, timeout)
_, err := r.pinger.Ping(ctxt, p.addr)
cancel()

if err == nil {
r.metrics.Pings.WithLabelValues("success").Inc()
r.metrics.PingTime.WithLabelValues("success").Observe(time.Since(now).Seconds())
r.notifier.Reachable(p.overlay, true)
break
}

if attempts == pingMaxAttemps {
r.metrics.Pings.WithLabelValues("failure").Inc()
r.metrics.PingTime.WithLabelValues("failure").Observe(time.Since(now).Seconds())

if attempts == pingMaxAttempts {
r.notifier.Reachable(p.overlay, false)
break
}
Expand Down Expand Up @@ -157,7 +159,7 @@ func (r *reacher) Disconnected(overlay swarm.Address) {
}

func (r *reacher) Close() error {
r.ctxClose()
r.ctxCancel()
r.wg.Wait()
return nil
}

0 comments on commit 14b0cac

Please sign in to comment.