diff --git a/.github/workflows/beekeeper.yml b/.github/workflows/beekeeper.yml index ff534225377..7d77af5195f 100644 --- a/.github/workflows/beekeeper.yml +++ b/.github/workflows/beekeeper.yml @@ -14,7 +14,7 @@ jobs: REPLICA: 3 RUN_TYPE: "PR RUN" SETUP_CONTRACT_IMAGE_TAG: "0.2.0" - BEEKEEPER_BRANCH: "pss-fix" + BEEKEEPER_BRANCH: "master" runs-on: ubuntu-latest steps: - name: Setup Go diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 62756832158..efc8eb17916 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -698,6 +698,8 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. s.metrics.CreatedConnectionCount.Inc() + s.reacher.Connected(overlay, i.BzzAddress.Underlay) + peerUserAgent := appendSpace(s.peerUserAgent(ctx, info.ID)) s.logger.Debugf("successfully connected to peer %s%s%s (outbound)", i.BzzAddress.ShortString(), i.LightString(), peerUserAgent) diff --git a/pkg/reacher/metrics.go b/pkg/reacher/metrics.go new file mode 100644 index 00000000000..ab745943b2c --- /dev/null +++ b/pkg/reacher/metrics.go @@ -0,0 +1,38 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package reacher + +import ( + m "github.com/ethersphere/bee/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type metrics struct { + Pings prometheus.CounterVec + PingTime prometheus.HistogramVec +} + +func newMetrics() metrics { + subsystem := "reacher" + + return metrics{ + Pings: *prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "pings", + Help: "Ping counter.", + }, []string{"status"}), + PingTime: *prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "ping_timer", + Help: "Ping timer.", + }, []string{"status"}), + } +} + +func (s *reacher) Metrics() []prometheus.Collector { + return m.PrometheusCollectorsFromFields(s.metrics) +} diff --git a/pkg/reacher/reacher.go b/pkg/reacher/reacher.go index b799a30afbe..f555a9d7c42 100644 --- a/pkg/reacher/reacher.go +++ b/pkg/reacher/reacher.go @@ -11,18 +11,16 @@ import ( "sync" "time" - "golang.org/x/sync/semaphore" - "github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/swarm" ma "github.com/multiformats/go-multiaddr" ) const ( - pingTimeoutMin = time.Second * 2 - pingTimeoutMax = time.Second * 5 - pingMaxAttemps = 3 - maxWorkers = 16 + pingTimeoutMin = time.Second * 2 + pingTimeoutMax = time.Second * 5 + pingMaxAttempts = 3 + workers = 8 ) type peer struct { @@ -34,15 +32,16 @@ type reacher struct { mu sync.Mutex queue []peer - ctx context.Context - ctxClose context.CancelFunc - run chan struct{} + ctx context.Context + ctxCancel context.CancelFunc + run chan struct{} pinger p2p.Pinger notifier p2p.ReachableNotifier - wg sync.WaitGroup - sem *semaphore.Weighted + wg sync.WaitGroup + + metrics metrics } func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier) *reacher { @@ -50,17 +49,19 @@ func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier) *reacher { ctx, cancel := context.WithCancel(context.Background()) r := &reacher{ - run: make(chan struct{}, 1), - pinger: streamer, - notifier: notifier, - wg: sync.WaitGroup{}, - sem: semaphore.NewWeighted(maxWorkers), - ctx: ctx, - ctxClose: cancel, + run: make(chan struct{}, 1), + pinger: streamer, + notifier: notifier, + wg: sync.WaitGroup{}, + ctx: ctx, + ctxCancel: cancel, + metrics: newMetrics(), } - r.wg.Add(1) - go r.worker() + r.wg.Add(workers) + for i := 0; i < workers; i++ { + go r.worker() + } return r } @@ -74,18 +75,12 @@ func (r *reacher) worker() { case <-r.ctx.Done(): return case <-r.run: - if err := r.sem.Acquire(r.ctx, 1); err == nil { - r.wg.Add(1) - go r.ping() - } + r.work() } } } -func (r *reacher) ping() { - - defer r.wg.Done() - defer r.sem.Release(1) +func (r *reacher) work() { for { @@ -111,16 +106,23 @@ func (r *reacher) ping() { attempts++ + now := time.Now() + ctxt, cancel := context.WithTimeout(r.ctx, timeout) _, err := r.pinger.Ping(ctxt, p.addr) cancel() if err == nil { + r.metrics.Pings.WithLabelValues("success").Inc() + r.metrics.PingTime.WithLabelValues("success").Observe(time.Since(now).Seconds()) r.notifier.Reachable(p.overlay, true) break } - if attempts == pingMaxAttemps { + r.metrics.Pings.WithLabelValues("failure").Inc() + r.metrics.PingTime.WithLabelValues("failure").Observe(time.Since(now).Seconds()) + + if attempts == pingMaxAttempts { r.notifier.Reachable(p.overlay, false) break } @@ -157,7 +159,7 @@ func (r *reacher) Disconnected(overlay swarm.Address) { } func (r *reacher) Close() error { - r.ctxClose() + r.ctxCancel() r.wg.Wait() return nil }