Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: only consider backbone nodes for core protocols #2565

Merged
merged 57 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
3072998
feat: reacher service
istae Oct 4, 2021
09f67c8
fix: unit test
istae Oct 5, 2021
b02b734
fix: metric names
istae Oct 5, 2021
a3d7652
feat: update reachability state of node
aloknerurkar Oct 5, 2021
900ea85
fix: elads comments
notanatol Oct 5, 2021
4057d07
fix: wire-in
notanatol Oct 5, 2021
67eb113
fix: clean-up
notanatol Oct 5, 2021
a9e634f
fix: move to internal, revert puller
notanatol Oct 5, 2021
4a3c701
fix: ping worker return too early fix
istae Oct 6, 2021
281698c
fix: reacher unit test
istae Oct 6, 2021
07e116e
fix: more reverts
notanatol Oct 6, 2021
213cf2a
fix: move blocker fix to new PR
notanatol Oct 6, 2021
770bf31
feat: protocols use only reachable peers
istae Oct 11, 2021
ac9bcea
feat(kademlia): use only reachable peers for depth calculation
aloknerurkar Oct 11, 2021
87522e7
fix: reacher unit test
istae Oct 13, 2021
1e35178
fix: integration tests
acud Oct 13, 2021
3edb00e
refactor: move JSON reachability status field above bins
Oct 13, 2021
073a1ad
fix: panic before notifier set
acud Oct 13, 2021
8cc4074
revert: fix integration tests (7fa23be)
acud Oct 13, 2021
997b624
fix: announce uses reachable peers only
istae Oct 13, 2021
22a0b79
feat: add flag that overrides libp2p announced reachability status
Oct 13, 2021
0955085
feat: add kademlia metrics for reachability status
Oct 14, 2021
c7ac1e3
fix: reacher unique queue
istae Oct 14, 2021
6ca1ac7
fix: reachability metrics counting
Oct 15, 2021
063b0ae
feat: set the kademlia node reachability initial status to unknown
mrekucci Oct 18, 2021
a61d204
fix: rename funcs
istae Oct 20, 2021
7b7c3b2
fix: disable linter
istae Oct 20, 2021
713bf37
feat: reacher exponential backoff
istae Oct 26, 2021
1487543
chore: bump version
istae Oct 26, 2021
67046ef
fix: retrieval unit test
istae Oct 26, 2021
3d99759
fix: reacher options and unit test
istae Oct 28, 2021
d636217
fix: see if settlements flake
istae Oct 28, 2021
fd194c5
chore: libp2p log
istae Oct 28, 2021
0b71ae6
fix: artifact logs
istae Oct 28, 2021
2f55fe6
fix: artifact logs
istae Oct 28, 2021
76d5bfc
fix: artifact logs
istae Oct 28, 2021
b40a269
fix: artifact logs
istae Oct 28, 2021
ba22fbd
fix: artifact logs
istae Oct 28, 2021
b4e148e
fix: artifact logs
istae Oct 28, 2021
228f9e1
fix: reduce beekepeer verbosity
istae Nov 1, 2021
035a32c
fix: extra test
istae Nov 1, 2021
204c084
fix: changed infof to errorf
istae Nov 1, 2021
0c7697c
fix: skip test temporarily
istae Nov 1, 2021
1e639a9
fix: unskip test
istae Nov 1, 2021
1c0b3cf
fix: flakey unit test
istae Nov 1, 2021
9abf8e2
fix: skip failing test
istae Nov 1, 2021
e0b8ca0
fix: retest settlements
istae Nov 1, 2021
df01d25
fix: reduced test by one
istae Nov 1, 2021
75ef507
fix: added public override to reachability flag
istae Nov 1, 2021
0b0263a
fix: renable tests
istae Nov 1, 2021
e9da8df
fix: changes beekeeper branch
istae Nov 1, 2021
c022a5e
chore: reachability log in kademlia
istae Nov 2, 2021
89706b7
fix: comment
istae Nov 2, 2021
2a7623d
perf: parallel pings
istae Nov 2, 2021
f75f1ce
revert: parallel pings
istae Nov 2, 2021
9cb1eda
fix: moved now out of lock
istae Nov 2, 2021
3e0ade3
fix: makefile missing char
istae Nov 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/beekeeper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ jobs:
SETUP_CONTRACT_IMAGE_TAG: "0.2.0"
BEEKEEPER_BRANCH: "master"
BEEKEEPER_METRICS_ENABLED: false
REACHABILITY_OVERRIDE_PUBLIC: true
runs-on: ubuntu-latest
steps:
- name: Setup Go
Expand Down Expand Up @@ -130,6 +131,7 @@ jobs:
SETUP_CONTRACT_IMAGE_TAG: "0.2.0"
BEEKEEPER_BRANCH: "master"
BEEKEEPER_METRICS_ENABLED: false
REACHABILITY_OVERRIDE_PUBLIC: true
runs-on: ubuntu-latest
steps:
- name: Setup Go
Expand Down Expand Up @@ -214,6 +216,7 @@ jobs:
SETUP_CONTRACT_IMAGE_TAG: "0.2.0"
BEEKEEPER_BRANCH: "master"
BEEKEEPER_METRICS_ENABLED: false
REACHABILITY_OVERRIDE_PUBLIC: true
runs-on: ubuntu-latest
steps:
- name: Setup Go
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ BEEKEEPER_USE_SUDO ?= false
BEEKEEPER_CLUSTER ?= local
BEELOCAL_BRANCH ?= main
BEEKEEPER_BRANCH ?= master
REACHABILITY_OVERRIDE_PUBLIC ?= false

GO_MIN_VERSION ?= "1.17"
GO_BUILD_VERSION ?= "1.17.2"
Expand All @@ -25,7 +26,8 @@ LDFLAGS ?= -s -w \
-X github.com/ethersphere/bee.commitHash="$(COMMIT_HASH)" \
-X github.com/ethersphere/bee.commitTime="$(COMMIT_TIME)" \
-X github.com/ethersphere/bee/pkg/api.Version="$(BEE_API_VERSION)" \
-X github.com/ethersphere/bee/pkg/debugapi.Version="$(BEE_DEBUG_API_VERSION)"
-X github.com/ethersphere/bee/pkg/debugapi.Version="$(BEE_DEBUG_API_VERSION)" \
-X github.com/ethersphere/bee/pkg/p2p/libp2p.reachabilityOverridePublic="$(REACHABILITY_OVERRIDE_PUBLIC)"

.PHONY: all
all: build lint vet test-race binary
Expand Down
2 changes: 1 addition & 1 deletion pkg/chainsyncer/chainsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *ChainSyncer) manage() {
atomic.AddInt32(&positives, 1)
}(p)
return false, false, nil
})
}, topology.Filter{Reachable: true})

// wait for all operations to finish
wg.Wait()
Expand Down
123 changes: 111 additions & 12 deletions pkg/p2p/libp2p/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,15 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/swarm/test"
"github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/libp2p/go-eventbus"
libp2pm "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
)

Expand Down Expand Up @@ -824,6 +831,7 @@ func TestWithBlocklistStreams(t *testing.T) {
}

func TestUserAgentLogging(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -859,6 +867,55 @@ func TestUserAgentLogging(t *testing.T) {
testUserAgentLogLine(t, s2Logs, "(outbound)")
}

func TestReachabilityUpdate(t *testing.T) {
s1, _ := newService(t, 1, libp2pServiceOpts{
libp2pOpts: libp2p.WithHostFactory(
func(ctx context.Context, _ ...libp2pm.Option) (host.Host, error) {
return bhost.NewHost(context.TODO(), swarmt.GenSwarm(t, context.TODO()), &bhost.HostOpts{})
},
),
})
defer s1.Close()

emitReachabilityChanged, _ := s1.Host().EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)

firstUpdate := make(chan struct{})
s1.SetPickyNotifier(mockReachabilityNotifier(func(status p2p.ReachabilityStatus) {
if status == p2p.ReachabilityStatusPublic {
close(firstUpdate)
}
}))

err := emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPublic})
if err != nil {
t.Fatal(err)
}

select {
case <-firstUpdate:
case <-time.After(time.Second):
t.Fatalf("test timed out")
}

secondUpdate := make(chan struct{})
s1.SetPickyNotifier(mockReachabilityNotifier(func(status p2p.ReachabilityStatus) {
if status == p2p.ReachabilityStatusPrivate {
close(secondUpdate)
}
}))

err = emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate})
if err != nil {
t.Fatal(err)
}

select {
case <-secondUpdate:
case <-time.After(time.Second):
t.Fatalf("test timed out")
}
}

func testUserAgentLogLine(t *testing.T, logs *buffer, substring string) {
t.Helper()

Expand Down Expand Up @@ -990,11 +1047,13 @@ func checkAddressbook(t *testing.T, ab addressbook.Getter, overlay swarm.Address
}

type notifiee struct {
connected cFunc
disconnected dFunc
pick bool
announce announceFunc
announceTo announceToFunc
connected cFunc
disconnected dFunc
pick bool
announce announceFunc
announceTo announceToFunc
updateReachability reachabilityFunc
reachable reachableFunc
}

func (n *notifiee) Connected(c context.Context, p p2p.Peer, f bool) error {
Expand All @@ -1005,7 +1064,7 @@ func (n *notifiee) Disconnected(p p2p.Peer) {
n.disconnected(p)
}

func (n *notifiee) Pick(p p2p.Peer) bool {
func (n *notifiee) Pick(p2p.Peer) bool {
return n.pick
}

Expand All @@ -1017,22 +1076,62 @@ func (n *notifiee) AnnounceTo(ctx context.Context, a, b swarm.Address, full bool
return n.announceTo(ctx, a, b, full)
}

func (n *notifiee) UpdateReachability(status p2p.ReachabilityStatus) {
n.updateReachability(status)
}

func (n *notifiee) Reachable(addr swarm.Address, status p2p.ReachabilityStatus) {
n.reachable(addr, status)
}

func mockNotifier(c cFunc, d dFunc, pick bool) p2p.PickyNotifier {
return &notifiee{connected: c, disconnected: d, pick: pick, announce: noopAnnounce, announceTo: noopAnnounceTo}
return &notifiee{
connected: c,
disconnected: d,
pick: pick,
announce: noopAnnounce,
announceTo: noopAnnounceTo,
updateReachability: noopReachability,
reachable: noopReachable,
}
}

func mockAnnouncingNotifier(a announceFunc, at announceToFunc) p2p.PickyNotifier {
return &notifiee{connected: noopCf, disconnected: noopDf, pick: true, announce: a, announceTo: at}
return &notifiee{
connected: noopCf,
disconnected: noopDf,
pick: true,
announce: a,
announceTo: at,
updateReachability: noopReachability,
reachable: noopReachable,
}
}

func mockReachabilityNotifier(r reachabilityFunc) p2p.PickyNotifier {
return &notifiee{
connected: noopCf,
disconnected: noopDf,
pick: true,
announce: noopAnnounce,
announceTo: noopAnnounceTo,
updateReachability: r,
reachable: noopReachable,
}
}

type (
cFunc func(context.Context, p2p.Peer, bool) error
dFunc func(p2p.Peer)
announceFunc func(context.Context, swarm.Address, bool) error
announceToFunc func(context.Context, swarm.Address, swarm.Address, bool) error
cFunc func(context.Context, p2p.Peer, bool) error
dFunc func(p2p.Peer)
announceFunc func(context.Context, swarm.Address, bool) error
announceToFunc func(context.Context, swarm.Address, swarm.Address, bool) error
reachabilityFunc func(p2p.ReachabilityStatus)
reachableFunc func(swarm.Address, p2p.ReachabilityStatus)
)

var noopCf = func(context.Context, p2p.Peer, bool) error { return nil }
var noopDf = func(p2p.Peer) {}
var noopAnnounce = func(context.Context, swarm.Address, bool) error { return nil }
var noopAnnounceTo = func(context.Context, swarm.Address, swarm.Address, bool) error { return nil }
var noopReachability = func(p2p.ReachabilityStatus) {}
var noopReachable = func(swarm.Address, p2p.ReachabilityStatus) {}
4 changes: 4 additions & 0 deletions pkg/p2p/libp2p/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func (s *Service) NewStreamForPeerID(peerID libp2ppeer.ID, protocolName, protoco
return s.newStreamForPeerID(context.Background(), peerID, protocolName, protocolVersion, streamName)
}

func (s *Service) Host() host.Host {
return s.host
}

type StaticAddressResolver = staticAddressResolver

var NewStaticAddressResolver = newStaticAddressResolver
Expand Down
38 changes: 38 additions & 0 deletions pkg/p2p/libp2p/internal/reacher/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package reacher

import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)

type metrics struct {
Pings prometheus.CounterVec
PingTime prometheus.HistogramVec
}

func newMetrics() metrics {
subsystem := "reacher"

return metrics{
Pings: *prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "pings",
Help: "Ping counter.",
}, []string{"status"}),
PingTime: *prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "ping_timer",
Help: "Ping timer.",
}, []string{"status"}),
}
}

func (s *reacher) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
Loading