Skip to content

Commit

Permalink
swarm: emit PeerConnectedness event from swarm instead of from hosts (#…
Browse files Browse the repository at this point in the history
…1574)

* pass an event bus to the swarm constructor

* make the eventbus parameter a required swarm constructor parameter

* emit Connectedness notifications from the swarm

* remove peer connectedness watchers from hosts

* swarm: emit connectedness events when holding the mutex
  • Loading branch information
marten-seemann authored Feb 25, 2023
1 parent c1cfe6c commit 581a015
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 270 deletions.
12 changes: 8 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
Expand Down Expand Up @@ -123,7 +125,7 @@ type Config struct {
PrometheusRegisterer prometheus.Registerer
}

func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) {
func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
if cfg.Peerstore == nil {
return nil, fmt.Errorf("no peerstore specified")
}
Expand Down Expand Up @@ -176,7 +178,7 @@ func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) {
swarm.WithMetricsTracer(swarm.NewMetricsTracer(swarm.WithRegisterer(cfg.PrometheusRegisterer))))
}
// TODO: Make the swarm implementation configurable.
return swarm.NewSwarm(pid, cfg.Peerstore, opts...)
return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...)
}

func (cfg *Config) addTransports(h host.Host) error {
Expand Down Expand Up @@ -284,12 +286,14 @@ func (cfg *Config) addTransports(h host.Host) error {
//
// This function consumes the config. Do not reuse it (really!).
func (cfg *Config) NewNode() (host.Host, error) {
swrm, err := cfg.makeSwarm(!cfg.DisableMetrics)
eventBus := eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer))))
swrm, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
if err != nil {
return nil, err
}

h, err := bhost.NewHost(swrm, &bhost.HostOpts{
EventBus: eventBus,
ConnManager: cfg.ConnManager,
AddrsFactory: cfg.AddrsFactory,
NATManager: cfg.NATManager,
Expand Down Expand Up @@ -397,7 +401,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
Peerstore: ps,
}

dialer, err := autoNatCfg.makeSwarm(false)
dialer, err := autoNatCfg.makeSwarm(eventbus.NewBus(), false)
if err != nil {
h.Close()
return nil, err
Expand Down
22 changes: 7 additions & 15 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/libp2p/go-netroute"

logging "github.com/ipfs/go-log/v2"

ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr/net"
Expand Down Expand Up @@ -108,6 +107,9 @@ var _ host.Host = (*BasicHost)(nil)
// HostOpts holds options that can be passed to NewHost in order to
// customize construction of the *BasicHost.
type HostOpts struct {
// EventBus sets the event bus. Will construct a new event bus if omitted.
EventBus event.Bus

// MultistreamMuxer is essential for the *BasicHost and will use a sensible default value if omitted.
MultistreamMuxer *msmux.MultistreamMuxer[protocol.ID]

Expand Down Expand Up @@ -164,16 +166,11 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
if opts == nil {
opts = &HostOpts{}
}

var eventBus event.Bus
if opts.EnableMetrics {
eventBus = eventbus.NewBus(
eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(opts.PrometheusRegisterer))))
} else {
eventBus = eventbus.NewBus()
if opts.EventBus == nil {
opts.EventBus = eventbus.NewBus()
}

psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), eventBus)
psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), opts.EventBus)
if err != nil {
return nil, err
}
Expand All @@ -186,7 +183,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
negtimeout: DefaultNegotiationTimeout,
AddrsFactory: DefaultAddrsFactory,
maResolver: madns.DefaultResolver,
eventbus: eventBus,
eventbus: opts.EventBus,
addrChangeChan: make(chan struct{}, 1),
ctx: hostCtx,
ctxCancel: cancel,
Expand All @@ -201,11 +198,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
return nil, err
}
evtPeerConnectednessChanged, err := h.eventbus.Emitter(&event.EvtPeerConnectednessChanged{})
if err != nil {
return nil, err
}
h.Network().Notify(newPeerConnectWatcher(evtPeerConnectednessChanged))

if !h.disableSignedPeerRecord {
cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore())
Expand Down
71 changes: 0 additions & 71 deletions p2p/host/basic/peer_connectedness.go

This file was deleted.

48 changes: 0 additions & 48 deletions p2p/host/basic/peer_connectedness_test.go

This file was deleted.

5 changes: 0 additions & 5 deletions p2p/host/blank/blank.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil
}
evtPeerConnectednessChanged, err := bh.eventbus.Emitter(&event.EvtPeerConnectednessChanged{})
if err != nil {
return nil
}
n.Notify(newPeerConnectWatcher(evtPeerConnectednessChanged))

n.SetStreamHandler(bh.newStreamHandler)

Expand Down
71 changes: 0 additions & 71 deletions p2p/host/blank/peer_connectedness.go

This file was deleted.

46 changes: 0 additions & 46 deletions p2p/host/blank/peer_connectedness_test.go

This file was deleted.

3 changes: 2 additions & 1 deletion p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p/core/sec"
"github.com/libp2p/go-libp2p/core/sec/insecure"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
Expand Down Expand Up @@ -43,7 +44,7 @@ func makeSwarm(t *testing.T) *Swarm {
ps.AddPrivKey(id, priv)
t.Cleanup(func() { ps.Close() })

s, err := NewSwarm(id, ps, WithDialTimeout(time.Second))
s, err := NewSwarm(id, ps, eventbus.NewBus(), WithDialTimeout(time.Second))
require.NoError(t, err)

upgrader := makeUpgrader(t, s)
Expand Down
Loading

0 comments on commit 581a015

Please sign in to comment.