diff --git a/api/rpc/client/client.go b/api/rpc/client/client.go index 8e328f600c..c22d1beca1 100644 --- a/api/rpc/client/client.go +++ b/api/rpc/client/client.go @@ -8,6 +8,7 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/das" "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/header" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/nodebuilder/state" ) @@ -18,6 +19,7 @@ type API interface { state.Module share.Module das.Module + p2p.Module } type Client struct { @@ -26,6 +28,7 @@ type Client struct { State state.API Share share.API DAS das.API + P2P p2p.API closer multiClientCloser } @@ -64,6 +67,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { "header": &client.Header, "fraud": &client.Fraud, "das": &client.DAS, + "p2p": &client.P2P, } for name, module := range modules { closer, err := jsonrpc.NewClient(ctx, addr, name, module, nil) diff --git a/docs/adr/adr-009-public-api.md b/docs/adr/adr-009-public-api.md index 575ad3b9fd..56ae47a14e 100644 --- a/docs/adr/adr-009-public-api.md +++ b/docs/adr/adr-009-public-api.md @@ -140,10 +140,10 @@ SyncHead(ctx context.Context) (*header.ExtendedHeader, error) ```go type P2PModule interface { - // Info returns basic information about the node's p2p host/operations. - Info() p2p.Info + // Info returns address information about the host. + Info() peer.AddrInfo // Peers returns all peer IDs used across all inner stores. - Peers() peer.IDSlice + Peers() []peer.ID // PeerInfo returns a small slice of information Peerstore has on the // given peer. PeerInfo(id peer.ID) peer.AddrInfo diff --git a/go.mod b/go.mod index 0d1cb3c960..b3a8abd6d9 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/libp2p/go-libp2p-peerstore v0.7.1 github.com/libp2p/go-libp2p-pubsub v0.7.0 github.com/libp2p/go-libp2p-record v0.1.3 + github.com/libp2p/go-libp2p-resource-manager v0.5.1 github.com/libp2p/go-libp2p-routing-helpers v0.2.3 github.com/minio/sha256-simd v1.0.0 github.com/mitchellh/go-homedir v1.1.0 @@ -208,7 +209,6 @@ require ( github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect github.com/libp2p/go-libp2p-kbucket v0.4.7 // indirect github.com/libp2p/go-libp2p-loggables v0.1.0 // indirect - github.com/libp2p/go-libp2p-resource-manager v0.5.1 // indirect github.com/libp2p/go-msgio v0.2.0 // indirect github.com/libp2p/go-nat v0.1.0 // indirect github.com/libp2p/go-netroute v0.2.0 // indirect diff --git a/nodebuilder/p2p/flags.go b/nodebuilder/p2p/flags.go index 82d03da697..f7a3d9c463 100644 --- a/nodebuilder/p2p/flags.go +++ b/nodebuilder/p2p/flags.go @@ -26,7 +26,7 @@ func Flags() *flag.FlagSet { mutualFlag, nil, `Comma-separated multiaddresses of mutual peers to keep a prioritized connection with. -Such connection is immune to peer scoring slashing and connection manager trimming. +Such connection is immune to peer scoring slashing and connection module trimming. Peers must bidirectionally point to each other. (Format: multiformats.io/multiaddr) `, ) diff --git a/nodebuilder/p2p/host.go b/nodebuilder/p2p/host.go index 23dfce9845..3c4f4bda15 100644 --- a/nodebuilder/p2p/host.go +++ b/nodebuilder/p2p/host.go @@ -8,6 +8,8 @@ import ( "github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/metrics" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" @@ -15,6 +17,8 @@ import ( routedhost "github.com/libp2p/go-libp2p/p2p/host/routed" "github.com/libp2p/go-libp2p/p2p/net/conngater" "go.uber.org/fx" + + "github.com/celestiaorg/celestia-node/nodebuilder/node" ) // RoutedHost constructs a wrapped Host that may fallback to address discovery, @@ -24,7 +28,7 @@ func RoutedHost(base HostBase, r routing.PeerRouting) host.Host { } // Host returns constructor for Host. -func Host(cfg Config, params hostParams) (HostBase, error) { +func Host(cfg Config, params hostParams, bw *metrics.BandwidthCounter, rm network.ResourceManager) (HostBase, error) { opts := []libp2p.Option{ libp2p.NoListenAddrs, // do not listen automatically libp2p.AddrsFactory(params.AddrF), @@ -35,15 +39,16 @@ func Host(cfg Config, params hostParams) (HostBase, error) { libp2p.UserAgent(fmt.Sprintf("celestia-%s", params.Net)), libp2p.NATPortMap(), // enables upnp libp2p.DisableRelay(), + libp2p.BandwidthReporter(bw), + libp2p.ResourceManager(rm), // to clearly define what defaults we rely upon libp2p.DefaultSecurity, libp2p.DefaultTransports, libp2p.DefaultMuxers, } - // TODO(@Wondertan): Other, non Celestia bootstrapper may also enable NATService to contribute the - // network. - if cfg.Bootstrapper { + // All node types except light (bridge, full) will enable NATService + if params.Tp != node.Light { opts = append(opts, libp2p.EnableNATService()) } @@ -72,4 +77,6 @@ type hostParams struct { PStore peerstore.Peerstore ConnMngr connmgr.ConnManager ConnGater *conngater.BasicConnectionGater + + Tp node.Type } diff --git a/nodebuilder/p2p/module.go b/nodebuilder/p2p/module.go index f02c9a0bf1..524e4d44b7 100644 --- a/nodebuilder/p2p/module.go +++ b/nodebuilder/p2p/module.go @@ -2,6 +2,9 @@ package p2p import ( logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-core/metrics" + "github.com/libp2p/go-libp2p-core/network" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" "go.uber.org/fx" "github.com/celestiaorg/celestia-node/nodebuilder/node" @@ -30,6 +33,11 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Provide(PeerRouting), fx.Provide(ContentRouting), fx.Provide(AddrsFactory(cfg.AnnounceAddresses, cfg.NoAnnounceAddresses)), + fx.Provide(metrics.NewBandwidthCounter), + fx.Provide(func() (network.ResourceManager, error) { + return rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale())) + }), + fx.Provide(newModule), fx.Invoke(Listen(cfg.ListenAddresses)), ) diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go new file mode 100644 index 0000000000..e27e30034a --- /dev/null +++ b/nodebuilder/p2p/p2p.go @@ -0,0 +1,209 @@ +package p2p + +import ( + "context" + "fmt" + "reflect" + + libhost "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/metrics" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + pubsub "github.com/libp2p/go-libp2p-pubsub" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" + basichost "github.com/libp2p/go-libp2p/p2p/host/basic" + "github.com/libp2p/go-libp2p/p2p/net/conngater" +) + +// Module represents all accessible methods related to the node's p2p +// host / operations. +// +//nolint:dupl +type Module interface { + // Info returns address information about the host. + Info() peer.AddrInfo + // Peers returns all peer IDs used across all inner stores. + Peers() []peer.ID + // PeerInfo returns a small slice of information Peerstore has on the + // given peer. + PeerInfo(id peer.ID) peer.AddrInfo + + // Connect ensures there is a connection between this host and the peer with + // given peer. + Connect(ctx context.Context, pi peer.AddrInfo) error + // ClosePeer closes the connection to a given peer. + ClosePeer(id peer.ID) error + // Connectedness returns a state signaling connection capabilities. + Connectedness(id peer.ID) network.Connectedness + // NATStatus returns the current NAT status. + NATStatus() (network.Reachability, error) + + // BlockPeer adds a peer to the set of blocked peers. + BlockPeer(p peer.ID) error + // UnblockPeer removes a peer from the set of blocked peers. + UnblockPeer(p peer.ID) error + // ListBlockedPeers returns a list of blocked peers. + ListBlockedPeers() []peer.ID + // Protect adds a peer to the list of peers who have a bidirectional + // peering agreement that they are protected from being trimmed, dropped + // or negatively scored. + Protect(id peer.ID, tag string) + // Unprotect removes a peer from the list of peers who have a bidirectional + // peering agreement that they are protected from being trimmed, dropped + // or negatively scored, returning a bool representing whether the given + // peer is protected or not. + Unprotect(id peer.ID, tag string) bool + // IsProtected returns whether the given peer is protected. + IsProtected(id peer.ID, tag string) bool + + // BandwidthStats returns a Stats struct with bandwidth metrics for all + // data sent/received by the local peer, regardless of protocol or remote + // peer IDs. + BandwidthStats() metrics.Stats + // BandwidthForPeer returns a Stats struct with bandwidth metrics associated with the given peer.ID. + // The metrics returned include all traffic sent / received for the peer, regardless of protocol. + BandwidthForPeer(id peer.ID) metrics.Stats + // BandwidthForProtocol returns a Stats struct with bandwidth metrics associated with the given protocol.ID. + BandwidthForProtocol(proto protocol.ID) metrics.Stats + + // ResourceState returns the state of the resource manager. + ResourceState() (rcmgr.ResourceManagerStat, error) + + // PubSubPeers returns the peer IDs of the peers joined on + // the given topic. + PubSubPeers(topic string) []peer.ID +} + +// module contains all components necessary to access information and +// perform actions related to the node's p2p Host / operations. +type module struct { + host HostBase + ps *pubsub.PubSub + connGater *conngater.BasicConnectionGater + bw *metrics.BandwidthCounter + rm network.ResourceManager +} + +func newModule( + host HostBase, + ps *pubsub.PubSub, + cg *conngater.BasicConnectionGater, + bw *metrics.BandwidthCounter, + rm network.ResourceManager, +) Module { + return &module{ + host: host, + ps: ps, + connGater: cg, + bw: bw, + rm: rm, + } +} + +func (m *module) Info() peer.AddrInfo { + return *libhost.InfoFromHost(m.host) +} + +func (m *module) Peers() []peer.ID { + return m.host.Peerstore().Peers() +} + +func (m *module) PeerInfo(id peer.ID) peer.AddrInfo { + return m.host.Peerstore().PeerInfo(id) +} + +func (m *module) Connect(ctx context.Context, pi peer.AddrInfo) error { + return m.host.Connect(ctx, pi) +} + +func (m *module) ClosePeer(id peer.ID) error { + return m.host.Network().ClosePeer(id) +} + +func (m *module) Connectedness(id peer.ID) network.Connectedness { + return m.host.Network().Connectedness(id) +} + +func (m *module) NATStatus() (network.Reachability, error) { + basic, ok := m.host.(*basichost.BasicHost) + if !ok { + return 0, fmt.Errorf("unexpected implementation of host.Host, expected %s, got %T", + reflect.TypeOf(&basichost.BasicHost{}).String(), m.host) + } + return basic.GetAutoNat().Status(), nil +} + +func (m *module) BlockPeer(p peer.ID) error { + return m.connGater.BlockPeer(p) +} + +func (m *module) UnblockPeer(p peer.ID) error { + return m.connGater.UnblockPeer(p) +} + +func (m *module) ListBlockedPeers() []peer.ID { + return m.connGater.ListBlockedPeers() +} + +func (m *module) Protect(id peer.ID, tag string) { + m.host.ConnManager().Protect(id, tag) +} + +func (m *module) Unprotect(id peer.ID, tag string) bool { + return m.host.ConnManager().Unprotect(id, tag) +} + +func (m *module) IsProtected(id peer.ID, tag string) bool { + return m.host.ConnManager().IsProtected(id, tag) +} + +func (m *module) BandwidthStats() metrics.Stats { + return m.bw.GetBandwidthTotals() +} + +func (m *module) BandwidthForPeer(id peer.ID) metrics.Stats { + return m.bw.GetBandwidthForPeer(id) +} + +func (m *module) BandwidthForProtocol(proto protocol.ID) metrics.Stats { + return m.bw.GetBandwidthForProtocol(proto) +} + +func (m *module) ResourceState() (rcmgr.ResourceManagerStat, error) { + rms, ok := m.rm.(rcmgr.ResourceManagerState) + if !ok { + return rcmgr.ResourceManagerStat{}, fmt.Errorf("network.ResourceManager does not implement " + + "rcmgr.ResourceManagerState") + } + return rms.Stat(), nil +} + +func (m *module) PubSubPeers(topic string) []peer.ID { + return m.ps.ListPeers(topic) +} + +// API is a wrapper around Module for the RPC. +// TODO(@distractedm1nd): These structs need to be autogenerated. +// +//nolint:dupl +type API struct { + Info func() peer.AddrInfo + Peers func() []peer.ID + PeerInfo func(id peer.ID) peer.AddrInfo + Connect func(ctx context.Context, pi peer.AddrInfo) error + ClosePeer func(id peer.ID) error + Connectedness func(id peer.ID) network.Connectedness + NATStatus func() (network.Reachability, error) + BlockPeer func(p peer.ID) error + UnblockPeer func(p peer.ID) error + ListBlockedPeers func() []peer.ID + Protect func(id peer.ID, tag string) + Unprotect func(id peer.ID, tag string) bool + IsProtected func(id peer.ID, tag string) bool + BandwidthStats func() metrics.Stats + BandwidthForPeer func(id peer.ID) metrics.Stats + BandwidthForProtocol func(proto protocol.ID) metrics.Stats + ResourceState func() (rcmgr.ResourceManagerStat, error) + PubSubPeers func(topic string) []peer.ID +} diff --git a/nodebuilder/p2p/p2p_test.go b/nodebuilder/p2p/p2p_test.go new file mode 100644 index 0000000000..da77ff4562 --- /dev/null +++ b/nodebuilder/p2p/p2p_test.go @@ -0,0 +1,214 @@ +package p2p + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p" + libhost "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/metrics" + "github.com/libp2p/go-libp2p-core/network" + libpeer "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + pubsub "github.com/libp2p/go-libp2p-pubsub" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestP2PModule_Host tests P2P Module methods on +// the instance of Host. +func TestP2PModule_Host(t *testing.T) { + net, err := mocknet.FullMeshConnected(2) + require.NoError(t, err) + host, peer := net.Hosts()[0], net.Hosts()[1] + + mgr := newModule(host, nil, nil, nil, nil) + + // test all methods on `manager.host` + assert.Equal(t, []libpeer.ID(host.Peerstore().Peers()), mgr.Peers()) + assert.Equal(t, libhost.InfoFromHost(peer).ID, mgr.PeerInfo(peer.ID()).ID) + + assert.Equal(t, host.Network().Connectedness(peer.ID()), mgr.Connectedness(peer.ID())) + // now disconnect using manager and check for connectedness match again + assert.NoError(t, mgr.ClosePeer(peer.ID())) + assert.Equal(t, host.Network().Connectedness(peer.ID()), mgr.Connectedness(peer.ID())) +} + +// TestP2PModule_ConnManager tests P2P Module methods on +// the Host's ConnManager. Note that this test is constructed differently +// than the one above because mocknet does not provide a ConnManager to its +// mock peers. +func TestP2PModule_ConnManager(t *testing.T) { + // make two full peers and connect them + host, err := libp2p.New() + require.NoError(t, err) + + peer, err := libp2p.New() + require.NoError(t, err) + + mgr := newModule(host, nil, nil, nil, nil) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + err = mgr.Connect(ctx, *libhost.InfoFromHost(peer)) + require.NoError(t, err) + + mgr.Protect(peer.ID(), "test") + assert.True(t, mgr.IsProtected(peer.ID(), "test")) + mgr.Unprotect(peer.ID(), "test") + assert.False(t, mgr.IsProtected(peer.ID(), "test")) +} + +// TestP2PModule_Autonat tests P2P Module methods on +// the node's instance of AutoNAT. +func TestP2PModule_Autonat(t *testing.T) { + host, err := libp2p.New(libp2p.EnableNATService()) + require.NoError(t, err) + + mgr := newModule(host, nil, nil, nil, nil) + + status, err := mgr.NATStatus() + assert.NoError(t, err) + assert.Equal(t, network.ReachabilityUnknown, status) +} + +// TestP2PModule_Bandwidth tests P2P Module methods on +// the Host's bandwidth reporter. +func TestP2PModule_Bandwidth(t *testing.T) { + bw := metrics.NewBandwidthCounter() + host, err := libp2p.New(libp2p.BandwidthReporter(bw)) + require.NoError(t, err) + + protoID := protocol.ID("test") + // define a buf size, so we know how many bytes to read + bufSize := 1000 + + // create a peer to connect to + peer, err := libp2p.New(libp2p.BandwidthReporter(bw)) + require.NoError(t, err) + + // set stream handler on the host + host.SetStreamHandler(protoID, func(stream network.Stream) { + buf := make([]byte, bufSize) + _, err := stream.Read(buf) + require.NoError(t, err) + + _, err = stream.Write(buf) + require.NoError(t, err) + }) + + mgr := newModule(host, nil, nil, bw, nil) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // connect to the peer + err = mgr.Connect(ctx, *libhost.InfoFromHost(peer)) + require.NoError(t, err) + // check to ensure they're actually connected + require.Equal(t, network.Connected, mgr.Connectedness(peer.ID())) + + // open stream with host + stream, err := peer.NewStream(ctx, mgr.Info().ID, protoID) + require.NoError(t, err) + + // write to stream to increase bandwidth usage get some substantive + // data to read from the bandwidth counter + buf := make([]byte, bufSize) + _, err = rand.Read(buf) + require.NoError(t, err) + _, err = stream.Write(buf) + require.NoError(t, err) + + _, err = stream.Read(buf) + require.NoError(t, err) + + // has to be ~2 seconds for the metrics reporter to collect the stats + // in the background process + time.Sleep(time.Second * 2) + + stats := mgr.BandwidthStats() + assert.NotNil(t, stats) + peerStat := mgr.BandwidthForPeer(peer.ID()) + assert.NotZero(t, peerStat.TotalIn) + assert.Greater(t, int(peerStat.TotalIn), bufSize) // should be slightly more than buf size due negotiations, etc + protoStat := mgr.BandwidthForProtocol(protoID) + assert.NotZero(t, protoStat.TotalIn) + assert.Greater(t, int(protoStat.TotalIn), bufSize) // should be slightly more than buf size due negotiations, etc +} + +// TestP2PModule_Pubsub tests P2P Module methods on +// the instance of pubsub. +func TestP2PModule_Pubsub(t *testing.T) { + net, err := mocknet.FullMeshConnected(5) + require.NoError(t, err) + + host := net.Hosts()[0] + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + gs, err := pubsub.NewGossipSub(ctx, host) + require.NoError(t, err) + + mgr := newModule(host, gs, nil, nil, nil) + + topicStr := "test-topic" + + topic, err := gs.Join(topicStr) + require.NoError(t, err) + + // also join all peers on mocknet to topic + for _, p := range net.Hosts()[1:] { + newGs, err := pubsub.NewGossipSub(ctx, p) + require.NoError(t, err) + + tp, err := newGs.Join(topicStr) + require.NoError(t, err) + _, err = tp.Subscribe() + require.NoError(t, err) + } + + err = topic.Publish(ctx, []byte("test")) + require.NoError(t, err) + + // give for some peers to properly join the topic (this is necessary + // anywhere where gossipsub is used in tests) + time.Sleep(1 * time.Second) + + assert.Equal(t, len(topic.ListPeers()), len(mgr.PubSubPeers(topicStr))) +} + +// TestP2PModule_ConnGater tests P2P Module methods on +// the instance of ConnectionGater. +func TestP2PModule_ConnGater(t *testing.T) { + gater, err := ConnectionGater(datastore.NewMapDatastore()) + require.NoError(t, err) + + mgr := newModule(nil, nil, gater, nil, nil) + + assert.NoError(t, mgr.BlockPeer("badpeer")) + assert.Len(t, mgr.ListBlockedPeers(), 1) + assert.NoError(t, mgr.UnblockPeer("badpeer")) + assert.Len(t, mgr.ListBlockedPeers(), 0) +} + +// TestP2PModule_ResourceManager tests P2P Module methods on +// the ResourceManager. +func TestP2PModule_ResourceManager(t *testing.T) { + rm, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale())) + require.NoError(t, err) + + mgr := newModule(nil, nil, nil, nil, rm) + + state, err := mgr.ResourceState() + require.NoError(t, err) + + assert.NotNil(t, state) +} diff --git a/nodebuilder/rpc/rpc.go b/nodebuilder/rpc/rpc.go index 8a1b0e1f85..7b46e39c8f 100644 --- a/nodebuilder/rpc/rpc.go +++ b/nodebuilder/rpc/rpc.go @@ -5,6 +5,7 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/das" "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/header" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/nodebuilder/state" ) @@ -16,6 +17,7 @@ func RegisterEndpoints( fraud fraud.Module, header header.Module, daser das.Module, + p2p p2p.Module, serv *rpc.Server, ) { serv.RegisterService("state", state) @@ -23,6 +25,7 @@ func RegisterEndpoints( serv.RegisterService("fraud", fraud) serv.RegisterService("header", header) serv.RegisterService("das", daser) + serv.RegisterService("p2p", p2p) } func Server(cfg *Config) *rpc.Server { diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index 6351c8f2c8..59325c1d69 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -18,6 +18,7 @@ import ( "go.uber.org/fx" "github.com/celestiaorg/celestia-app/testutil/testnode" + "github.com/celestiaorg/celestia-node/libs/keystore" "github.com/celestiaorg/celestia-node/logs" "github.com/celestiaorg/celestia-node/nodebuilder"