Skip to content

Commit

Permalink
feat(nodebuilder/p2p): Implement Module (#1285)
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay authored Nov 23, 2022
1 parent b4a8145 commit 3d8817a
Show file tree
Hide file tree
Showing 10 changed files with 455 additions and 9 deletions.
4 changes: 4 additions & 0 deletions api/rpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/das"
"github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
)
Expand All @@ -18,6 +19,7 @@ type API interface {
state.Module
share.Module
das.Module
p2p.Module
}

type Client struct {
Expand All @@ -26,6 +28,7 @@ type Client struct {
State state.API
Share share.API
DAS das.API
P2P p2p.API

closer multiClientCloser
}
Expand Down Expand Up @@ -64,6 +67,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) {
"header": &client.Header,
"fraud": &client.Fraud,
"das": &client.DAS,
"p2p": &client.P2P,
}
for name, module := range modules {
closer, err := jsonrpc.NewClient(ctx, addr, name, module, nil)
Expand Down
6 changes: 3 additions & 3 deletions docs/adr/adr-009-public-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ SyncHead(ctx context.Context) (*header.ExtendedHeader, error)

```go
type P2PModule interface {
// Info returns basic information about the node's p2p host/operations.
Info() p2p.Info
// Info returns address information about the host.
Info() peer.AddrInfo
// Peers returns all peer IDs used across all inner stores.
Peers() peer.IDSlice
Peers() []peer.ID
// PeerInfo returns a small slice of information Peerstore has on the
// given peer.
PeerInfo(id peer.ID) peer.AddrInfo
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/libp2p/go-libp2p-peerstore v0.7.1
github.com/libp2p/go-libp2p-pubsub v0.7.0
github.com/libp2p/go-libp2p-record v0.1.3
github.com/libp2p/go-libp2p-resource-manager v0.5.1
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
github.com/minio/sha256-simd v1.0.0
github.com/mitchellh/go-homedir v1.1.0
Expand Down Expand Up @@ -210,7 +211,6 @@ require (
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.4.7 // indirect
github.com/libp2p/go-libp2p-loggables v0.1.0 // indirect
github.com/libp2p/go-libp2p-resource-manager v0.5.1 // indirect
github.com/libp2p/go-msgio v0.2.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/p2p/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func Flags() *flag.FlagSet {
mutualFlag,
nil,
`Comma-separated multiaddresses of mutual peers to keep a prioritized connection with.
Such connection is immune to peer scoring slashing and connection manager trimming.
Such connection is immune to peer scoring slashing and connection module trimming.
Peers must bidirectionally point to each other. (Format: multiformats.io/multiaddr)
`,
)
Expand Down
15 changes: 11 additions & 4 deletions nodebuilder/p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ import (
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
p2pconfig "github.com/libp2p/go-libp2p/config"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

// RoutedHost constructs a wrapped Host that may fallback to address discovery,
Expand All @@ -24,7 +28,7 @@ func RoutedHost(base HostBase, r routing.PeerRouting) host.Host {
}

// Host returns constructor for Host.
func Host(cfg Config, params hostParams) (HostBase, error) {
func Host(cfg Config, params hostParams, bw *metrics.BandwidthCounter, rm network.ResourceManager) (HostBase, error) {
opts := []libp2p.Option{
libp2p.NoListenAddrs, // do not listen automatically
libp2p.AddrsFactory(params.AddrF),
Expand All @@ -35,15 +39,16 @@ func Host(cfg Config, params hostParams) (HostBase, error) {
libp2p.UserAgent(fmt.Sprintf("celestia-%s", params.Net)),
libp2p.NATPortMap(), // enables upnp
libp2p.DisableRelay(),
libp2p.BandwidthReporter(bw),
libp2p.ResourceManager(rm),
// to clearly define what defaults we rely upon
libp2p.DefaultSecurity,
libp2p.DefaultTransports,
libp2p.DefaultMuxers,
}

// TODO(@Wondertan): Other, non Celestia bootstrapper may also enable NATService to contribute the
// network.
if cfg.Bootstrapper {
// All node types except light (bridge, full) will enable NATService
if params.Tp != node.Light {
opts = append(opts, libp2p.EnableNATService())
}

Expand Down Expand Up @@ -72,4 +77,6 @@ type hostParams struct {
PStore peerstore.Peerstore
ConnMngr connmgr.ConnManager
ConnGater *conngater.BasicConnectionGater

Tp node.Type
}
8 changes: 8 additions & 0 deletions nodebuilder/p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package p2p

import (
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/network"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/nodebuilder/node"
Expand Down Expand Up @@ -30,6 +33,11 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Provide(PeerRouting),
fx.Provide(ContentRouting),
fx.Provide(AddrsFactory(cfg.AnnounceAddresses, cfg.NoAnnounceAddresses)),
fx.Provide(metrics.NewBandwidthCounter),
fx.Provide(func() (network.ResourceManager, error) {
return rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale()))
}),
fx.Provide(newModule),
fx.Invoke(Listen(cfg.ListenAddresses)),
)

Expand Down
209 changes: 209 additions & 0 deletions nodebuilder/p2p/p2p.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package p2p

import (
"context"
"fmt"
"reflect"

libhost "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
)

// Module represents all accessible methods related to the node's p2p
// host / operations.
//
//nolint:dupl
type Module interface {
// Info returns address information about the host.
Info() peer.AddrInfo
// Peers returns all peer IDs used across all inner stores.
Peers() []peer.ID
// PeerInfo returns a small slice of information Peerstore has on the
// given peer.
PeerInfo(id peer.ID) peer.AddrInfo

// Connect ensures there is a connection between this host and the peer with
// given peer.
Connect(ctx context.Context, pi peer.AddrInfo) error
// ClosePeer closes the connection to a given peer.
ClosePeer(id peer.ID) error
// Connectedness returns a state signaling connection capabilities.
Connectedness(id peer.ID) network.Connectedness
// NATStatus returns the current NAT status.
NATStatus() (network.Reachability, error)

// BlockPeer adds a peer to the set of blocked peers.
BlockPeer(p peer.ID) error
// UnblockPeer removes a peer from the set of blocked peers.
UnblockPeer(p peer.ID) error
// ListBlockedPeers returns a list of blocked peers.
ListBlockedPeers() []peer.ID
// Protect adds a peer to the list of peers who have a bidirectional
// peering agreement that they are protected from being trimmed, dropped
// or negatively scored.
Protect(id peer.ID, tag string)
// Unprotect removes a peer from the list of peers who have a bidirectional
// peering agreement that they are protected from being trimmed, dropped
// or negatively scored, returning a bool representing whether the given
// peer is protected or not.
Unprotect(id peer.ID, tag string) bool
// IsProtected returns whether the given peer is protected.
IsProtected(id peer.ID, tag string) bool

// BandwidthStats returns a Stats struct with bandwidth metrics for all
// data sent/received by the local peer, regardless of protocol or remote
// peer IDs.
BandwidthStats() metrics.Stats
// BandwidthForPeer returns a Stats struct with bandwidth metrics associated with the given peer.ID.
// The metrics returned include all traffic sent / received for the peer, regardless of protocol.
BandwidthForPeer(id peer.ID) metrics.Stats
// BandwidthForProtocol returns a Stats struct with bandwidth metrics associated with the given protocol.ID.
BandwidthForProtocol(proto protocol.ID) metrics.Stats

// ResourceState returns the state of the resource manager.
ResourceState() (rcmgr.ResourceManagerStat, error)

// PubSubPeers returns the peer IDs of the peers joined on
// the given topic.
PubSubPeers(topic string) []peer.ID
}

// module contains all components necessary to access information and
// perform actions related to the node's p2p Host / operations.
type module struct {
host HostBase
ps *pubsub.PubSub
connGater *conngater.BasicConnectionGater
bw *metrics.BandwidthCounter
rm network.ResourceManager
}

func newModule(
host HostBase,
ps *pubsub.PubSub,
cg *conngater.BasicConnectionGater,
bw *metrics.BandwidthCounter,
rm network.ResourceManager,
) Module {
return &module{
host: host,
ps: ps,
connGater: cg,
bw: bw,
rm: rm,
}
}

func (m *module) Info() peer.AddrInfo {
return *libhost.InfoFromHost(m.host)
}

func (m *module) Peers() []peer.ID {
return m.host.Peerstore().Peers()
}

func (m *module) PeerInfo(id peer.ID) peer.AddrInfo {
return m.host.Peerstore().PeerInfo(id)
}

func (m *module) Connect(ctx context.Context, pi peer.AddrInfo) error {
return m.host.Connect(ctx, pi)
}

func (m *module) ClosePeer(id peer.ID) error {
return m.host.Network().ClosePeer(id)
}

func (m *module) Connectedness(id peer.ID) network.Connectedness {
return m.host.Network().Connectedness(id)
}

func (m *module) NATStatus() (network.Reachability, error) {
basic, ok := m.host.(*basichost.BasicHost)
if !ok {
return 0, fmt.Errorf("unexpected implementation of host.Host, expected %s, got %T",
reflect.TypeOf(&basichost.BasicHost{}).String(), m.host)
}
return basic.GetAutoNat().Status(), nil
}

func (m *module) BlockPeer(p peer.ID) error {
return m.connGater.BlockPeer(p)
}

func (m *module) UnblockPeer(p peer.ID) error {
return m.connGater.UnblockPeer(p)
}

func (m *module) ListBlockedPeers() []peer.ID {
return m.connGater.ListBlockedPeers()
}

func (m *module) Protect(id peer.ID, tag string) {
m.host.ConnManager().Protect(id, tag)
}

func (m *module) Unprotect(id peer.ID, tag string) bool {
return m.host.ConnManager().Unprotect(id, tag)
}

func (m *module) IsProtected(id peer.ID, tag string) bool {
return m.host.ConnManager().IsProtected(id, tag)
}

func (m *module) BandwidthStats() metrics.Stats {
return m.bw.GetBandwidthTotals()
}

func (m *module) BandwidthForPeer(id peer.ID) metrics.Stats {
return m.bw.GetBandwidthForPeer(id)
}

func (m *module) BandwidthForProtocol(proto protocol.ID) metrics.Stats {
return m.bw.GetBandwidthForProtocol(proto)
}

func (m *module) ResourceState() (rcmgr.ResourceManagerStat, error) {
rms, ok := m.rm.(rcmgr.ResourceManagerState)
if !ok {
return rcmgr.ResourceManagerStat{}, fmt.Errorf("network.ResourceManager does not implement " +
"rcmgr.ResourceManagerState")
}
return rms.Stat(), nil
}

func (m *module) PubSubPeers(topic string) []peer.ID {
return m.ps.ListPeers(topic)
}

// API is a wrapper around Module for the RPC.
// TODO(@distractedm1nd): These structs need to be autogenerated.
//
//nolint:dupl
type API struct {
Info func() peer.AddrInfo
Peers func() []peer.ID
PeerInfo func(id peer.ID) peer.AddrInfo
Connect func(ctx context.Context, pi peer.AddrInfo) error
ClosePeer func(id peer.ID) error
Connectedness func(id peer.ID) network.Connectedness
NATStatus func() (network.Reachability, error)
BlockPeer func(p peer.ID) error
UnblockPeer func(p peer.ID) error
ListBlockedPeers func() []peer.ID
Protect func(id peer.ID, tag string)
Unprotect func(id peer.ID, tag string) bool
IsProtected func(id peer.ID, tag string) bool
BandwidthStats func() metrics.Stats
BandwidthForPeer func(id peer.ID) metrics.Stats
BandwidthForProtocol func(proto protocol.ID) metrics.Stats
ResourceState func() (rcmgr.ResourceManagerStat, error)
PubSubPeers func(topic string) []peer.ID
}
Loading

0 comments on commit 3d8817a

Please sign in to comment.