From a0ea3f952304ccc8dc3f0fd1a21261ef2ee3f787 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Tue, 25 Oct 2022 17:58:49 +0200 Subject: [PATCH 01/16] feat(nodebuilder/p2p): Define p2p.Module and implement it --- api/rpc/client/client.go | 5 + go.mod | 3 +- nodebuilder/p2p/host.go | 6 +- nodebuilder/p2p/module.go | 6 + nodebuilder/p2p/p2p.go | 212 +++++++++++++++++++++++++++++++ nodebuilder/p2p/p2p_test.go | 175 +++++++++++++++++++++++++ nodebuilder/p2p/resource.go | 10 ++ nodebuilder/rpc/rpc.go | 5 +- nodebuilder/tests/rpc_test.go | 24 ++++ nodebuilder/tests/swamp/swamp.go | 1 + 10 files changed, 444 insertions(+), 3 deletions(-) create mode 100644 nodebuilder/p2p/p2p.go create mode 100644 nodebuilder/p2p/p2p_test.go create mode 100644 nodebuilder/p2p/resource.go create mode 100644 nodebuilder/tests/rpc_test.go diff --git a/api/rpc/client/client.go b/api/rpc/client/client.go index 8e328f600c..ad4dc3d029 100644 --- a/api/rpc/client/client.go +++ b/api/rpc/client/client.go @@ -8,6 +8,7 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/das" "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/header" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/nodebuilder/state" ) @@ -18,6 +19,7 @@ type API interface { state.Module share.Module das.Module + p2p.Module } type Client struct { @@ -26,6 +28,7 @@ type Client struct { State state.API Share share.API DAS das.API + P2P p2p.API closer multiClientCloser } @@ -64,7 +67,9 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { "header": &client.Header, "fraud": &client.Fraud, "das": &client.DAS, + "p2p": &client.P2P, } + // TODO @distractedm1nd @renaynay: how does client know if daser is nil? for name, module := range modules { closer, err := jsonrpc.NewClient(ctx, addr, name, module, nil) if err != nil { diff --git a/go.mod b/go.mod index 0d1cb3c960..21b9edf55d 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,6 @@ require ( github.com/filecoin-project/go-jsonrpc v0.1.8 github.com/gammazero/workerpool v1.1.3 github.com/gogo/protobuf v1.3.3 - github.com/golang/mock v1.6.0 github.com/gorilla/mux v1.8.0 github.com/hashicorp/go-retryablehttp v0.7.1-0.20211018174820-ff6d014e72d9 github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d @@ -69,6 +68,8 @@ require ( google.golang.org/grpc v1.49.0 ) +require github.com/golang/mock v1.6.0 + require ( cloud.google.com/go v0.100.2 // indirect cloud.google.com/go/compute v1.6.1 // indirect diff --git a/nodebuilder/p2p/host.go b/nodebuilder/p2p/host.go index 23dfce9845..f75b16931a 100644 --- a/nodebuilder/p2p/host.go +++ b/nodebuilder/p2p/host.go @@ -8,6 +8,8 @@ import ( "github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/metrics" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" @@ -24,7 +26,7 @@ func RoutedHost(base HostBase, r routing.PeerRouting) host.Host { } // Host returns constructor for Host. -func Host(cfg Config, params hostParams) (HostBase, error) { +func Host(cfg Config, params hostParams, bw *metrics.BandwidthCounter, rm network.ResourceManager) (HostBase, error) { opts := []libp2p.Option{ libp2p.NoListenAddrs, // do not listen automatically libp2p.AddrsFactory(params.AddrF), @@ -35,6 +37,8 @@ func Host(cfg Config, params hostParams) (HostBase, error) { libp2p.UserAgent(fmt.Sprintf("celestia-%s", params.Net)), libp2p.NATPortMap(), // enables upnp libp2p.DisableRelay(), + libp2p.BandwidthReporter(bw), + libp2p.ResourceManager(rm), // to clearly define what defaults we rely upon libp2p.DefaultSecurity, libp2p.DefaultTransports, diff --git a/nodebuilder/p2p/module.go b/nodebuilder/p2p/module.go index f02c9a0bf1..253f0d73c7 100644 --- a/nodebuilder/p2p/module.go +++ b/nodebuilder/p2p/module.go @@ -2,6 +2,8 @@ package p2p import ( logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-core/metrics" + "github.com/libp2p/go-libp2p/p2p/host/autonat" "go.uber.org/fx" "github.com/celestiaorg/celestia-node/nodebuilder/node" @@ -30,6 +32,10 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Provide(PeerRouting), fx.Provide(ContentRouting), fx.Provide(AddrsFactory(cfg.AnnounceAddresses, cfg.NoAnnounceAddresses)), + fx.Provide(metrics.NewBandwidthCounter), + fx.Provide(autonat.New), + fx.Provide(ResourceManager), + fx.Provide(newManager), fx.Invoke(Listen(cfg.ListenAddresses)), ) diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go new file mode 100644 index 0000000000..a1fa197c98 --- /dev/null +++ b/nodebuilder/p2p/p2p.go @@ -0,0 +1,212 @@ +package p2p + +import ( + "context" + "fmt" + ma "github.com/multiformats/go-multiaddr" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/metrics" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + pubsub "github.com/libp2p/go-libp2p-pubsub" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" + "github.com/libp2p/go-libp2p/p2p/host/autonat" + "github.com/libp2p/go-libp2p/p2p/net/conngater" +) + +type API struct { + Info func() Info + Peers func() peer.IDSlice + PeerInfo func(id peer.ID) peer.AddrInfo + Connect func(ctx context.Context, pi peer.AddrInfo) error + ClosePeer func(id peer.ID) error + Connectedness func(id peer.ID) network.Connectedness + NATStatus func() network.Reachability + BlockPeer func(p peer.ID) error + UnblockPeer func(p peer.ID) error + ListBlockedPeers func() []peer.ID + MutualAdd func(id peer.ID, tag string) + MutualRm func(id peer.ID, tag string) bool + IsMutual func(id peer.ID, tag string) bool + BandwidthStats func() metrics.Stats + BandwidthForPeer func(id peer.ID) metrics.Stats + BandwidthForProtocol func(proto protocol.ID) metrics.Stats + ResourceState func(rcmgr.ResourceManagerStat, error) + PubSubPeers func(topic string) []peer.ID +} + +type Module interface { + // Info returns basic information about the node's p2p host/operations. + Info() Info + // Peers returns all peer IDs used across all inner stores. + Peers() peer.IDSlice + // PeerInfo returns a small slice of information Peerstore has on the + // given peer. + PeerInfo(id peer.ID) peer.AddrInfo + + // Connect ensures there is a connection between this host and the peer with + // given peer.ID. + Connect(ctx context.Context, pi peer.AddrInfo) error + // ClosePeer closes the connection to a given peer. + ClosePeer(id peer.ID) error + // Connectedness returns a state signaling connection capabilities. + Connectedness(id peer.ID) network.Connectedness + // NATStatus returns the current NAT status. + NATStatus() network.Reachability + + // BlockPeer adds a peer to the set of blocked peers. // TODO should we wrap BlockPeer so + // that it 1. disconnects from peer, then 2. adds to blocklist? cc @Wondertan + BlockPeer(p peer.ID) error + // UnblockPeer removes a peer from the set of blocked peers. + UnblockPeer(p peer.ID) error + // ListBlockedPeers returns a list of blocked peers. + ListBlockedPeers() []peer.ID + // MutualAdd adds a peer to the list of peers who have a bidirectional + // peering agreement that they are protected from being trimmed, dropped + // or negatively scored. + MutualAdd(id peer.ID, tag string) + // MutualRm removes a peer from the list of peers who have a bidirectional + // peering agreement that they are protected from being trimmed, dropped + // or negatively scored, returning a bool representing whether the given + // peer is protected or not. + MutualRm(id peer.ID, tag string) bool + // IsMutual returns whether the given peer is a mutual peer. + IsMutual(id peer.ID, tag string) bool + + // BandwidthStats returns a Stats struct with bandwidth metrics for all + // data sent/received by the local peer, regardless of protocol or remote + // peer IDs. + BandwidthStats() metrics.Stats + // BandwidthForPeer returns a Stats struct with bandwidth metrics associated with the given peer.ID. + // The metrics returned include all traffic sent / received for the peer, regardless of protocol. + BandwidthForPeer(id peer.ID) metrics.Stats + // BandwidthForProtocol returns a Stats struct with bandwidth metrics associated with the given protocol.ID. + BandwidthForProtocol(proto protocol.ID) metrics.Stats + + // ResourceState returns the state of the resource manager. + ResourceState() (rcmgr.ResourceManagerStat, error) + + // PubSubPeers returns the peer IDs of the peers joined on + // the given topic. + PubSubPeers(topic string) []peer.ID +} + +// manager contains all components necessary to access information and +// perform actions related to the node's p2p Host / operations. +type manager struct { // TODO @renaynay: rename ? + host HostBase + ps *pubsub.PubSub + nat autonat.AutoNAT + connGater *conngater.BasicConnectionGater + bw *metrics.BandwidthCounter + rm network.ResourceManager +} + +func newManager( + host host.Host, + ps *pubsub.PubSub, + nat autonat.AutoNAT, + cg *conngater.BasicConnectionGater, + bw *metrics.BandwidthCounter, + rm network.ResourceManager, +) Module { + return &manager{ + host: host, + ps: ps, + nat: nat, + connGater: cg, + bw: bw, + rm: rm, + } +} + +// Info contains basic information about the node's p2p host/operations. +type Info struct { + // ID is the node's peer ID + ID peer.ID `json:"id"` + // Addrs is the node's + Addrs []ma.Multiaddr `json:"addrs"` +} + +func (m *manager) Info() Info { + m.host.Network(). + return Info{ + ID: m.host.ID(), + Addrs: m.host.Addrs(), + } +} + +func (m *manager) Peers() peer.IDSlice { + return m.host.Peerstore().Peers() +} + +func (m *manager) PeerInfo(id peer.ID) peer.AddrInfo { + return m.host.Peerstore().PeerInfo(id) +} + +func (m *manager) Connect(ctx context.Context, pi peer.AddrInfo) error { + return m.host.Connect(ctx, pi) +} + +func (m *manager) ClosePeer(id peer.ID) error { + return m.host.Network().ClosePeer(id) +} + +func (m *manager) Connectedness(id peer.ID) network.Connectedness { + return m.host.Network().Connectedness(id) +} + +func (m *manager) NATStatus() network.Reachability { + return m.nat.Status() +} + +func (m *manager) BlockPeer(p peer.ID) error { + return m.connGater.BlockPeer(p) +} + +func (m *manager) UnblockPeer(p peer.ID) error { + return m.connGater.UnblockPeer(p) +} + +func (m *manager) ListBlockedPeers() []peer.ID { + return m.connGater.ListBlockedPeers() +} + +func (m *manager) MutualAdd(id peer.ID, tag string) { + m.host.ConnManager().Protect(id, tag) +} + +func (m *manager) MutualRm(id peer.ID, tag string) bool { + return m.host.ConnManager().Unprotect(id, tag) +} + +func (m *manager) IsMutual(id peer.ID, tag string) bool { + return m.host.ConnManager().IsProtected(id, tag) +} + +func (m *manager) BandwidthStats() metrics.Stats { + return m.bw.GetBandwidthTotals() +} + +func (m *manager) BandwidthForPeer(id peer.ID) metrics.Stats { + return m.bw.GetBandwidthForPeer(id) +} + +func (m *manager) BandwidthForProtocol(proto protocol.ID) metrics.Stats { + return m.bw.GetBandwidthForProtocol(proto) +} + +func (m *manager) ResourceState() (rcmgr.ResourceManagerStat, error) { + rms, ok := m.rm.(rcmgr.ResourceManagerState) + if !ok { + return rcmgr.ResourceManagerStat{}, fmt.Errorf("network.ResourceManager does not implement " + + "rcmgr.ResourceManagerState") // TODO err msg? + } + return rms.Stat(), nil +} + +func (m *manager) PubSubPeers(topic string) []peer.ID { + return m.ps.ListPeers(topic) +} diff --git a/nodebuilder/p2p/p2p_test.go b/nodebuilder/p2p/p2p_test.go new file mode 100644 index 0000000000..c231f891c5 --- /dev/null +++ b/nodebuilder/p2p/p2p_test.go @@ -0,0 +1,175 @@ +package p2p + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p" + libhost "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/metrics" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/protocol" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/p2p/host/autonat" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/require" +) + +// TestP2PModule_methodsOnHost TODO +func TestP2PModule_methodsOnHost(t *testing.T) { + net, err := mocknet.FullMeshConnected(2) + require.NoError(t, err) + host, peer := net.Hosts()[0], net.Hosts()[1] + + mgr := newManager(host, nil, nil, nil, nil, nil) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // test all methods on `manager.host` + require.Equal(t, host.ID(), mgr.Info().ID) + require.Equal(t, host.Peerstore().Peers(), mgr.Peers()) + require.Equal(t, libhost.InfoFromHost(peer).ID, mgr.PeerInfo(peer.ID()).ID) + + require.Equal(t, host.Network().Connectedness(peer.ID()), mgr.Connectedness(peer.ID())) + // now disconnect using manager and check for connectedness match again + require.NoError(t, mgr.ClosePeer(peer.ID())) + require.Equal(t, host.Network().Connectedness(peer.ID()), mgr.Connectedness(peer.ID())) + // reconnect using manager + require.NoError(t, mgr.Connect(ctx, *libhost.InfoFromHost(peer))) + mgr.MutualAdd(peer.ID(), "test") + isMutual := mgr.IsMutual(peer.ID(), "test") // TODO why fails? + // require.True(t, isMutual) // tODO why fails? + require.Equal(t, host.ConnManager().IsProtected(peer.ID(), "test"), isMutual) +} + +// TestP2PModule_methodsOnAutonat TODO +func TestP2PModule_methodsOnAutonat(t *testing.T) { + net, err := mocknet.FullMeshConnected(2) + require.NoError(t, err) + host := net.Hosts()[0] + + nat, err := autonat.New(host) + require.NoError(t, err) + + mgr := newManager(host, nil, nat, nil, nil, nil) + + // test all methods on `manager.nat` + require.Equal(t, nat.Status(), mgr.NATStatus()) +} + +// TestP2PModule_methodsOnBandwidth // TODO +func TestP2PModule_methodsOnBandwidth(t *testing.T) { + bw := metrics.NewBandwidthCounter() + host, err := libp2p.New(libp2p.BandwidthReporter(bw)) + require.NoError(t, err) + + protoID := protocol.ID("test") + + // define a buf size, so we know how many bytes + // to read + writeSize := 10000 + + // create a peer to connect to + peer, err := libp2p.New() + require.NoError(t, err) + peer.SetStreamHandler(protoID, func(stream network.Stream) { + buf := make([]byte, writeSize) + _, err := stream.Read(buf) + require.NoError(t, err) + t.Log("HITTING!") // TODO remove + }) + + mgr := newManager(host, nil, nil, nil, bw, nil) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // connect to the peer + err = mgr.Connect(ctx, *libhost.InfoFromHost(peer)) + require.NoError(t, err) + // check to ensure they're actually connected + require.Equal(t, network.Connected, mgr.Connectedness(peer.ID())) + + // open stream with peer (have to do it on the host as there's + // no public method to do so via the p2p Module) + stream, err := host.NewStream(ctx, peer.ID(), protoID) + require.NoError(t, err) + + // read from stream to increase bandwidth usage get some substantive + // data to read from the bandwidth counter + buf := make([]byte, writeSize) + _, err = stream.Write(buf) + require.NoError(t, err) + + stats := mgr.BandwidthStats() // TODO why 0? + t.Log(stats) // TODO assert + peerStat := mgr.BandwidthForPeer(peer.ID()) + t.Log(peerStat) + protoStat := mgr.BandwidthForProtocol(protoID) + t.Log(protoStat) // TODO +} + +func TestP2PModule_methodsOnPubsub(t *testing.T) { + net, err := mocknet.FullMeshConnected(5) + require.NoError(t, err) + + host := net.Hosts()[0] + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + gs, err := pubsub.NewGossipSub(ctx, host) + require.NoError(t, err) + + mgr := newManager(host, gs, nil, nil, nil, nil) + + topicStr := "test-topic" + + topic, err := gs.Join(topicStr) + require.NoError(t, err) + + // also join all peers on mocknet to topic + for _, p := range net.Hosts()[1:] { + newGs, err := pubsub.NewGossipSub(ctx, p) + require.NoError(t, err) + + tp, err := newGs.Join(topicStr) + require.NoError(t, err) + _, err = tp.Subscribe() + require.NoError(t, err) + } + + err = topic.Publish(ctx, []byte("test")) + require.NoError(t, err) + + // give for some peers to properly join the topic + time.Sleep(1 * time.Second) + + require.Equal(t, len(topic.ListPeers()), len(mgr.PubSubPeers(topicStr))) +} + +// TestP2PModule_methodsOnConnGater // TODO doc +func TestP2PModule_methodsOnConnGater(t *testing.T) { + gater, err := ConnectionGater(datastore.NewMapDatastore()) + require.NoError(t, err) + + mgr := newManager(nil, nil, nil, gater, nil, nil) + + require.NoError(t, mgr.BlockPeer("badpeer")) + require.Len(t, mgr.ListBlockedPeers(), 1) + require.NoError(t, mgr.UnblockPeer("badpeer")) + require.Len(t, mgr.ListBlockedPeers(), 0) +} + +// TestP2PModule_methodsOnResourceManager // TODO doc +func TestP2PModule_methodsOnResourceManager(t *testing.T) { + rm, err := ResourceManager() + require.NoError(t, err) + + mgr := newManager(nil, nil, nil, nil, nil, rm) + state, err := mgr.ResourceState() + require.NoError(t, err) + require.NotNil(t, state) +} diff --git a/nodebuilder/p2p/resource.go b/nodebuilder/p2p/resource.go new file mode 100644 index 0000000000..b850d53112 --- /dev/null +++ b/nodebuilder/p2p/resource.go @@ -0,0 +1,10 @@ +package p2p + +import ( + "github.com/libp2p/go-libp2p-core/network" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" +) + +func ResourceManager() (network.ResourceManager, error) { + return rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale())) +} diff --git a/nodebuilder/rpc/rpc.go b/nodebuilder/rpc/rpc.go index 8a1b0e1f85..2695016f69 100644 --- a/nodebuilder/rpc/rpc.go +++ b/nodebuilder/rpc/rpc.go @@ -5,6 +5,7 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/das" "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/header" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/nodebuilder/state" ) @@ -16,13 +17,15 @@ func RegisterEndpoints( fraud fraud.Module, header header.Module, daser das.Module, + p2p p2p.Module, serv *rpc.Server, ) { serv.RegisterService("state", state) serv.RegisterService("share", share) - serv.RegisterService("fraud", fraud) serv.RegisterService("header", header) serv.RegisterService("das", daser) + serv.RegisterService("fraud", fraud) + serv.RegisterService("p2p", p2p) } func Server(cfg *Config) *rpc.Server { diff --git a/nodebuilder/tests/rpc_test.go b/nodebuilder/tests/rpc_test.go new file mode 100644 index 0000000000..43a602d99e --- /dev/null +++ b/nodebuilder/tests/rpc_test.go @@ -0,0 +1,24 @@ +package tests + +/* +func TestRPC(t *testing.T) { + sw := swamp.NewSwamp(t) + cfg := nodebuilder.DefaultConfig(node.Bridge) + bridge := sw.NewNodeWithConfig(node.Bridge, cfg) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + err := bridge.Start(ctx) + require.NoError(t, err) + la := fmt.Sprintf("http://%s", bridge.RPCServer.ListenAddr()) + + client, closer, err := client.NewClient(ctx, la) + require.NoError(t, err) + t.Cleanup(closer.CloseAll) + + info := client.P2P.Peers() + t.Log(info) +} + +*/ diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index 6351c8f2c8..59325c1d69 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -18,6 +18,7 @@ import ( "go.uber.org/fx" "github.com/celestiaorg/celestia-app/testutil/testnode" + "github.com/celestiaorg/celestia-node/libs/keystore" "github.com/celestiaorg/celestia-node/logs" "github.com/celestiaorg/celestia-node/nodebuilder" From d2f8d660d8708f85cecbc87ab71a5aecb2341099 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Mon, 31 Oct 2022 09:46:14 +0100 Subject: [PATCH 02/16] doc(nodebuilder/p2p): add module doc --- nodebuilder/p2p/p2p.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index a1fa197c98..4d26af1507 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -37,6 +37,8 @@ type API struct { PubSubPeers func(topic string) []peer.ID } +// Module represents all accessible methods related to the node's p2p +// host / operations. type Module interface { // Info returns basic information about the node's p2p host/operations. Info() Info From 43437b94b081434e6bae56ca7c25abf1b7b7f762 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Mon, 31 Oct 2022 09:56:17 +0100 Subject: [PATCH 03/16] test(nodebuilder/p2p): Implement all tests for p2p module --- go.mod | 6 +- nodebuilder/p2p/p2p.go | 3 +- nodebuilder/p2p/p2p_test.go | 129 ++++++++++++++++++++++------------ nodebuilder/tests/rpc_test.go | 24 ------- 4 files changed, 90 insertions(+), 72 deletions(-) delete mode 100644 nodebuilder/tests/rpc_test.go diff --git a/go.mod b/go.mod index 21b9edf55d..daa38b43d8 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,10 @@ require ( google.golang.org/grpc v1.49.0 ) -require github.com/golang/mock v1.6.0 +require ( + github.com/golang/mock v1.6.0 + github.com/libp2p/go-libp2p-resource-manager v0.5.1 +) require ( cloud.google.com/go v0.100.2 // indirect @@ -209,7 +212,6 @@ require ( github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect github.com/libp2p/go-libp2p-kbucket v0.4.7 // indirect github.com/libp2p/go-libp2p-loggables v0.1.0 // indirect - github.com/libp2p/go-libp2p-resource-manager v0.5.1 // indirect github.com/libp2p/go-msgio v0.2.0 // indirect github.com/libp2p/go-nat v0.1.0 // indirect github.com/libp2p/go-netroute v0.2.0 // indirect diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index 4d26af1507..6c41d90d46 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -3,7 +3,6 @@ package p2p import ( "context" "fmt" - ma "github.com/multiformats/go-multiaddr" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/metrics" @@ -14,6 +13,7 @@ import ( rcmgr "github.com/libp2p/go-libp2p-resource-manager" "github.com/libp2p/go-libp2p/p2p/host/autonat" "github.com/libp2p/go-libp2p/p2p/net/conngater" + ma "github.com/multiformats/go-multiaddr" ) type API struct { @@ -133,7 +133,6 @@ type Info struct { } func (m *manager) Info() Info { - m.host.Network(). return Info{ ID: m.host.ID(), Addrs: m.host.Addrs(), diff --git a/nodebuilder/p2p/p2p_test.go b/nodebuilder/p2p/p2p_test.go index c231f891c5..d2e2bc3eb6 100644 --- a/nodebuilder/p2p/p2p_test.go +++ b/nodebuilder/p2p/p2p_test.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "math/rand" "testing" "time" @@ -14,37 +15,58 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/p2p/host/autonat" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// TestP2PModule_methodsOnHost TODO +// TestP2PModule_methodsOnHost tests P2P Module methods on +// the instance of Host. func TestP2PModule_methodsOnHost(t *testing.T) { net, err := mocknet.FullMeshConnected(2) require.NoError(t, err) host, peer := net.Hosts()[0], net.Hosts()[1] mgr := newManager(host, nil, nil, nil, nil, nil) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) // test all methods on `manager.host` - require.Equal(t, host.ID(), mgr.Info().ID) - require.Equal(t, host.Peerstore().Peers(), mgr.Peers()) - require.Equal(t, libhost.InfoFromHost(peer).ID, mgr.PeerInfo(peer.ID()).ID) + assert.Equal(t, host.ID(), mgr.Info().ID) + assert.Equal(t, host.Peerstore().Peers(), mgr.Peers()) + assert.Equal(t, libhost.InfoFromHost(peer).ID, mgr.PeerInfo(peer.ID()).ID) - require.Equal(t, host.Network().Connectedness(peer.ID()), mgr.Connectedness(peer.ID())) + assert.Equal(t, host.Network().Connectedness(peer.ID()), mgr.Connectedness(peer.ID())) // now disconnect using manager and check for connectedness match again - require.NoError(t, mgr.ClosePeer(peer.ID())) - require.Equal(t, host.Network().Connectedness(peer.ID()), mgr.Connectedness(peer.ID())) - // reconnect using manager - require.NoError(t, mgr.Connect(ctx, *libhost.InfoFromHost(peer))) + assert.NoError(t, mgr.ClosePeer(peer.ID())) + assert.Equal(t, host.Network().Connectedness(peer.ID()), mgr.Connectedness(peer.ID())) +} + +// TestP2PModule_methodsOnHostConnManager tests P2P Module methods on +// the Host's ConnManager. Note that this test is constructed differently +// than the one above because mocknet does not provide a ConnManager to its +// mock peers. +func TestP2PModule_methodsOnHostConnManager(t *testing.T) { + // make two full peers and connect them + host, err := libp2p.New() + require.NoError(t, err) + + peer, err := libp2p.New() + require.NoError(t, err) + + mgr := newManager(host, nil, nil, nil, nil, nil) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + err = mgr.Connect(ctx, *libhost.InfoFromHost(peer)) + require.NoError(t, err) + mgr.MutualAdd(peer.ID(), "test") - isMutual := mgr.IsMutual(peer.ID(), "test") // TODO why fails? - // require.True(t, isMutual) // tODO why fails? - require.Equal(t, host.ConnManager().IsProtected(peer.ID(), "test"), isMutual) + assert.True(t, mgr.IsMutual(peer.ID(), "test")) + mgr.MutualRm(peer.ID(), "test") + assert.False(t, mgr.IsMutual(peer.ID(), "test")) } -// TestP2PModule_methodsOnAutonat TODO +// TestP2PModule_methodsOnAutonat tests P2P Module methods on +// the node's instance of AutoNAT. func TestP2PModule_methodsOnAutonat(t *testing.T) { net, err := mocknet.FullMeshConnected(2) require.NoError(t, err) @@ -55,30 +77,32 @@ func TestP2PModule_methodsOnAutonat(t *testing.T) { mgr := newManager(host, nil, nat, nil, nil, nil) - // test all methods on `manager.nat` - require.Equal(t, nat.Status(), mgr.NATStatus()) + assert.Equal(t, nat.Status(), mgr.NATStatus()) } -// TestP2PModule_methodsOnBandwidth // TODO +// TestP2PModule_methodsOnBandwidth tests P2P Module methods on +// the Host's bandwidth reporter. func TestP2PModule_methodsOnBandwidth(t *testing.T) { bw := metrics.NewBandwidthCounter() host, err := libp2p.New(libp2p.BandwidthReporter(bw)) require.NoError(t, err) protoID := protocol.ID("test") - - // define a buf size, so we know how many bytes - // to read - writeSize := 10000 + // define a buf size, so we know how many bytes to read + bufSize := 1000 // create a peer to connect to - peer, err := libp2p.New() + peer, err := libp2p.New(libp2p.BandwidthReporter(bw)) require.NoError(t, err) - peer.SetStreamHandler(protoID, func(stream network.Stream) { - buf := make([]byte, writeSize) + + // set stream handler on the host + host.SetStreamHandler(protoID, func(stream network.Stream) { + buf := make([]byte, bufSize) _, err := stream.Read(buf) require.NoError(t, err) - t.Log("HITTING!") // TODO remove + + _, err = stream.Write(buf) + require.NoError(t, err) }) mgr := newManager(host, nil, nil, nil, bw, nil) @@ -92,25 +116,37 @@ func TestP2PModule_methodsOnBandwidth(t *testing.T) { // check to ensure they're actually connected require.Equal(t, network.Connected, mgr.Connectedness(peer.ID())) - // open stream with peer (have to do it on the host as there's - // no public method to do so via the p2p Module) - stream, err := host.NewStream(ctx, peer.ID(), protoID) + // open stream with host + stream, err := peer.NewStream(ctx, mgr.Info().ID, protoID) require.NoError(t, err) - // read from stream to increase bandwidth usage get some substantive + // write to stream to increase bandwidth usage get some substantive // data to read from the bandwidth counter - buf := make([]byte, writeSize) + buf := make([]byte, bufSize) + _, err = rand.Read(buf) + require.NoError(t, err) _, err = stream.Write(buf) require.NoError(t, err) - stats := mgr.BandwidthStats() // TODO why 0? - t.Log(stats) // TODO assert + _, err = stream.Read(buf) + require.NoError(t, err) + + // has to be a ~second for the metrics reporter to collect the stats + // in the background process + time.Sleep(time.Second) + + stats := mgr.BandwidthStats() + assert.NotNil(t, stats) peerStat := mgr.BandwidthForPeer(peer.ID()) - t.Log(peerStat) + assert.NotZero(t, peerStat.TotalIn) + assert.Greater(t, int(peerStat.TotalIn), bufSize) // should be slightly more than buf size due negotiations, etc protoStat := mgr.BandwidthForProtocol(protoID) - t.Log(protoStat) // TODO + assert.NotZero(t, protoStat.TotalIn) + assert.Greater(t, int(protoStat.TotalIn), bufSize) // should be slightly more than buf size due negotiations, etc } +// TestP2PModule_methodsOnPubsub tests P2P Module methods on +// the instance of pubsub. func TestP2PModule_methodsOnPubsub(t *testing.T) { net, err := mocknet.FullMeshConnected(5) require.NoError(t, err) @@ -144,32 +180,37 @@ func TestP2PModule_methodsOnPubsub(t *testing.T) { err = topic.Publish(ctx, []byte("test")) require.NoError(t, err) - // give for some peers to properly join the topic + // give for some peers to properly join the topic (this is necessary + // anywhere where gossipsub is used in tests) time.Sleep(1 * time.Second) - require.Equal(t, len(topic.ListPeers()), len(mgr.PubSubPeers(topicStr))) + assert.Equal(t, len(topic.ListPeers()), len(mgr.PubSubPeers(topicStr))) } -// TestP2PModule_methodsOnConnGater // TODO doc +// TestP2PModule_methodsOnConnGater tests P2P Module methods on +// the instance of ConnectionGater. func TestP2PModule_methodsOnConnGater(t *testing.T) { gater, err := ConnectionGater(datastore.NewMapDatastore()) require.NoError(t, err) mgr := newManager(nil, nil, nil, gater, nil, nil) - require.NoError(t, mgr.BlockPeer("badpeer")) - require.Len(t, mgr.ListBlockedPeers(), 1) - require.NoError(t, mgr.UnblockPeer("badpeer")) - require.Len(t, mgr.ListBlockedPeers(), 0) + assert.NoError(t, mgr.BlockPeer("badpeer")) + assert.Len(t, mgr.ListBlockedPeers(), 1) + assert.NoError(t, mgr.UnblockPeer("badpeer")) + assert.Len(t, mgr.ListBlockedPeers(), 0) } -// TestP2PModule_methodsOnResourceManager // TODO doc +// TestP2PModule_methodsOnResourceManager tests P2P Module methods on +// the ResourceManager. func TestP2PModule_methodsOnResourceManager(t *testing.T) { rm, err := ResourceManager() require.NoError(t, err) mgr := newManager(nil, nil, nil, nil, nil, rm) + state, err := mgr.ResourceState() require.NoError(t, err) - require.NotNil(t, state) + + assert.NotNil(t, state) } diff --git a/nodebuilder/tests/rpc_test.go b/nodebuilder/tests/rpc_test.go deleted file mode 100644 index 43a602d99e..0000000000 --- a/nodebuilder/tests/rpc_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package tests - -/* -func TestRPC(t *testing.T) { - sw := swamp.NewSwamp(t) - cfg := nodebuilder.DefaultConfig(node.Bridge) - bridge := sw.NewNodeWithConfig(node.Bridge, cfg) - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - err := bridge.Start(ctx) - require.NoError(t, err) - la := fmt.Sprintf("http://%s", bridge.RPCServer.ListenAddr()) - - client, closer, err := client.NewClient(ctx, la) - require.NoError(t, err) - t.Cleanup(closer.CloseAll) - - info := client.P2P.Peers() - t.Log(info) -} - -*/ From a1575bb1153e020f7a789ded7a0c02aac202333b Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Fri, 11 Nov 2022 10:04:49 +0100 Subject: [PATCH 04/16] chore: resolve TODOs --- api/rpc/client/client.go | 1 - nodebuilder/p2p/p2p.go | 7 +++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/api/rpc/client/client.go b/api/rpc/client/client.go index ad4dc3d029..c22d1beca1 100644 --- a/api/rpc/client/client.go +++ b/api/rpc/client/client.go @@ -69,7 +69,6 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { "das": &client.DAS, "p2p": &client.P2P, } - // TODO @distractedm1nd @renaynay: how does client know if daser is nil? for name, module := range modules { closer, err := jsonrpc.NewClient(ctx, addr, name, module, nil) if err != nil { diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index 6c41d90d46..a81d0d0de3 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -58,8 +58,7 @@ type Module interface { // NATStatus returns the current NAT status. NATStatus() network.Reachability - // BlockPeer adds a peer to the set of blocked peers. // TODO should we wrap BlockPeer so - // that it 1. disconnects from peer, then 2. adds to blocklist? cc @Wondertan + // BlockPeer adds a peer to the set of blocked peers. BlockPeer(p peer.ID) error // UnblockPeer removes a peer from the set of blocked peers. UnblockPeer(p peer.ID) error @@ -97,7 +96,7 @@ type Module interface { // manager contains all components necessary to access information and // perform actions related to the node's p2p Host / operations. -type manager struct { // TODO @renaynay: rename ? +type manager struct { host HostBase ps *pubsub.PubSub nat autonat.AutoNAT @@ -203,7 +202,7 @@ func (m *manager) ResourceState() (rcmgr.ResourceManagerStat, error) { rms, ok := m.rm.(rcmgr.ResourceManagerState) if !ok { return rcmgr.ResourceManagerStat{}, fmt.Errorf("network.ResourceManager does not implement " + - "rcmgr.ResourceManagerState") // TODO err msg? + "rcmgr.ResourceManagerState") } return rms.Stat(), nil } From 2f4f577804f6301858bdb9c52eb8fe17391389d9 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Fri, 11 Nov 2022 10:16:53 +0100 Subject: [PATCH 05/16] chore: Add back TODO for consistency --- nodebuilder/p2p/p2p.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index a81d0d0de3..e76a6d1f8a 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -16,6 +16,8 @@ import ( ma "github.com/multiformats/go-multiaddr" ) +// API is a wrapper around Module for the RPC. +// TODO(@distractedm1nd): These structs need to be autogenerated. type API struct { Info func() Info Peers func() peer.IDSlice From 2616a7acb7dc26742665c9cd2ce6e354471f8a83 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Fri, 11 Nov 2022 12:35:02 +0100 Subject: [PATCH 06/16] chore: add nolint directive to api + module --- nodebuilder/p2p/p2p.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index e76a6d1f8a..a420fa9bd3 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -18,6 +18,8 @@ import ( // API is a wrapper around Module for the RPC. // TODO(@distractedm1nd): These structs need to be autogenerated. +// +//nolint:dupl type API struct { Info func() Info Peers func() peer.IDSlice @@ -35,12 +37,14 @@ type API struct { BandwidthStats func() metrics.Stats BandwidthForPeer func(id peer.ID) metrics.Stats BandwidthForProtocol func(proto protocol.ID) metrics.Stats - ResourceState func(rcmgr.ResourceManagerStat, error) + ResourceState func() (rcmgr.ResourceManagerStat, error) PubSubPeers func(topic string) []peer.ID } // Module represents all accessible methods related to the node's p2p // host / operations. +// +//nolint:dupl type Module interface { // Info returns basic information about the node's p2p host/operations. Info() Info From e9f4f54df3586aa626b0adab3b3a9a16001e103e Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Fri, 11 Nov 2022 13:10:06 +0100 Subject: [PATCH 07/16] chore: refactorings from ryans suggestions --- nodebuilder/p2p/p2p_test.go | 28 ++++++++++++++-------------- nodebuilder/rpc/rpc.go | 2 +- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/nodebuilder/p2p/p2p_test.go b/nodebuilder/p2p/p2p_test.go index d2e2bc3eb6..d4877913be 100644 --- a/nodebuilder/p2p/p2p_test.go +++ b/nodebuilder/p2p/p2p_test.go @@ -19,9 +19,9 @@ import ( "github.com/stretchr/testify/require" ) -// TestP2PModule_methodsOnHost tests P2P Module methods on +// TestP2PModule_Host tests P2P Module methods on // the instance of Host. -func TestP2PModule_methodsOnHost(t *testing.T) { +func TestP2PModule_Host(t *testing.T) { net, err := mocknet.FullMeshConnected(2) require.NoError(t, err) host, peer := net.Hosts()[0], net.Hosts()[1] @@ -39,11 +39,11 @@ func TestP2PModule_methodsOnHost(t *testing.T) { assert.Equal(t, host.Network().Connectedness(peer.ID()), mgr.Connectedness(peer.ID())) } -// TestP2PModule_methodsOnHostConnManager tests P2P Module methods on +// TestP2PModule_ConnManager tests P2P Module methods on // the Host's ConnManager. Note that this test is constructed differently // than the one above because mocknet does not provide a ConnManager to its // mock peers. -func TestP2PModule_methodsOnHostConnManager(t *testing.T) { +func TestP2PModule_ConnManager(t *testing.T) { // make two full peers and connect them host, err := libp2p.New() require.NoError(t, err) @@ -65,9 +65,9 @@ func TestP2PModule_methodsOnHostConnManager(t *testing.T) { assert.False(t, mgr.IsMutual(peer.ID(), "test")) } -// TestP2PModule_methodsOnAutonat tests P2P Module methods on +// TestP2PModule_Autonat tests P2P Module methods on // the node's instance of AutoNAT. -func TestP2PModule_methodsOnAutonat(t *testing.T) { +func TestP2PModule_Autonat(t *testing.T) { net, err := mocknet.FullMeshConnected(2) require.NoError(t, err) host := net.Hosts()[0] @@ -80,9 +80,9 @@ func TestP2PModule_methodsOnAutonat(t *testing.T) { assert.Equal(t, nat.Status(), mgr.NATStatus()) } -// TestP2PModule_methodsOnBandwidth tests P2P Module methods on +// TestP2PModule_Bandwidth tests P2P Module methods on // the Host's bandwidth reporter. -func TestP2PModule_methodsOnBandwidth(t *testing.T) { +func TestP2PModule_Bandwidth(t *testing.T) { bw := metrics.NewBandwidthCounter() host, err := libp2p.New(libp2p.BandwidthReporter(bw)) require.NoError(t, err) @@ -145,9 +145,9 @@ func TestP2PModule_methodsOnBandwidth(t *testing.T) { assert.Greater(t, int(protoStat.TotalIn), bufSize) // should be slightly more than buf size due negotiations, etc } -// TestP2PModule_methodsOnPubsub tests P2P Module methods on +// TestP2PModule_Pubsub tests P2P Module methods on // the instance of pubsub. -func TestP2PModule_methodsOnPubsub(t *testing.T) { +func TestP2PModule_Pubsub(t *testing.T) { net, err := mocknet.FullMeshConnected(5) require.NoError(t, err) @@ -187,9 +187,9 @@ func TestP2PModule_methodsOnPubsub(t *testing.T) { assert.Equal(t, len(topic.ListPeers()), len(mgr.PubSubPeers(topicStr))) } -// TestP2PModule_methodsOnConnGater tests P2P Module methods on +// TestP2PModule_ConnGater tests P2P Module methods on // the instance of ConnectionGater. -func TestP2PModule_methodsOnConnGater(t *testing.T) { +func TestP2PModule_ConnGater(t *testing.T) { gater, err := ConnectionGater(datastore.NewMapDatastore()) require.NoError(t, err) @@ -201,9 +201,9 @@ func TestP2PModule_methodsOnConnGater(t *testing.T) { assert.Len(t, mgr.ListBlockedPeers(), 0) } -// TestP2PModule_methodsOnResourceManager tests P2P Module methods on +// TestP2PModule_ResourceManager tests P2P Module methods on // the ResourceManager. -func TestP2PModule_methodsOnResourceManager(t *testing.T) { +func TestP2PModule_ResourceManager(t *testing.T) { rm, err := ResourceManager() require.NoError(t, err) diff --git a/nodebuilder/rpc/rpc.go b/nodebuilder/rpc/rpc.go index 2695016f69..7b46e39c8f 100644 --- a/nodebuilder/rpc/rpc.go +++ b/nodebuilder/rpc/rpc.go @@ -22,9 +22,9 @@ func RegisterEndpoints( ) { serv.RegisterService("state", state) serv.RegisterService("share", share) + serv.RegisterService("fraud", fraud) serv.RegisterService("header", header) serv.RegisterService("das", daser) - serv.RegisterService("fraud", fraud) serv.RegisterService("p2p", p2p) } From 46ba201497190aed6189452d42bc9ff82b3e97a8 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Sat, 12 Nov 2022 09:13:41 +0100 Subject: [PATCH 08/16] doc(nodebuilder/p2p): Fix comment for Connect method --- nodebuilder/p2p/p2p.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index a420fa9bd3..4ffec90a30 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -55,7 +55,7 @@ type Module interface { PeerInfo(id peer.ID) peer.AddrInfo // Connect ensures there is a connection between this host and the peer with - // given peer.ID. + // given peer. Connect(ctx context.Context, pi peer.AddrInfo) error // ClosePeer closes the connection to a given peer. ClosePeer(id peer.ID) error From ad0fc59694aae5709770e570a13d6a09662ad6f1 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Wed, 16 Nov 2022 15:23:44 +0100 Subject: [PATCH 09/16] refactor(nodebuilder/p2p): Refactor NATStatus implementation --- nodebuilder/p2p/host.go | 9 ++++++--- nodebuilder/p2p/module.go | 2 -- nodebuilder/p2p/p2p.go | 26 ++++++++++++++++---------- nodebuilder/p2p/p2p_test.go | 26 ++++++++++++-------------- 4 files changed, 34 insertions(+), 29 deletions(-) diff --git a/nodebuilder/p2p/host.go b/nodebuilder/p2p/host.go index f75b16931a..3c4f4bda15 100644 --- a/nodebuilder/p2p/host.go +++ b/nodebuilder/p2p/host.go @@ -17,6 +17,8 @@ import ( routedhost "github.com/libp2p/go-libp2p/p2p/host/routed" "github.com/libp2p/go-libp2p/p2p/net/conngater" "go.uber.org/fx" + + "github.com/celestiaorg/celestia-node/nodebuilder/node" ) // RoutedHost constructs a wrapped Host that may fallback to address discovery, @@ -45,9 +47,8 @@ func Host(cfg Config, params hostParams, bw *metrics.BandwidthCounter, rm networ libp2p.DefaultMuxers, } - // TODO(@Wondertan): Other, non Celestia bootstrapper may also enable NATService to contribute the - // network. - if cfg.Bootstrapper { + // All node types except light (bridge, full) will enable NATService + if params.Tp != node.Light { opts = append(opts, libp2p.EnableNATService()) } @@ -76,4 +77,6 @@ type hostParams struct { PStore peerstore.Peerstore ConnMngr connmgr.ConnManager ConnGater *conngater.BasicConnectionGater + + Tp node.Type } diff --git a/nodebuilder/p2p/module.go b/nodebuilder/p2p/module.go index 253f0d73c7..afa3c1746c 100644 --- a/nodebuilder/p2p/module.go +++ b/nodebuilder/p2p/module.go @@ -3,7 +3,6 @@ package p2p import ( logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/metrics" - "github.com/libp2p/go-libp2p/p2p/host/autonat" "go.uber.org/fx" "github.com/celestiaorg/celestia-node/nodebuilder/node" @@ -33,7 +32,6 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Provide(ContentRouting), fx.Provide(AddrsFactory(cfg.AnnounceAddresses, cfg.NoAnnounceAddresses)), fx.Provide(metrics.NewBandwidthCounter), - fx.Provide(autonat.New), fx.Provide(ResourceManager), fx.Provide(newManager), fx.Invoke(Listen(cfg.ListenAddresses)), diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index 4ffec90a30..fb0914e578 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -4,14 +4,13 @@ import ( "context" "fmt" - "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/metrics" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" pubsub "github.com/libp2p/go-libp2p-pubsub" rcmgr "github.com/libp2p/go-libp2p-resource-manager" - "github.com/libp2p/go-libp2p/p2p/host/autonat" + basichost "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/net/conngater" ma "github.com/multiformats/go-multiaddr" ) @@ -27,7 +26,7 @@ type API struct { Connect func(ctx context.Context, pi peer.AddrInfo) error ClosePeer func(id peer.ID) error Connectedness func(id peer.ID) network.Connectedness - NATStatus func() network.Reachability + NATStatus func() (network.Reachability, error) BlockPeer func(p peer.ID) error UnblockPeer func(p peer.ID) error ListBlockedPeers func() []peer.ID @@ -62,7 +61,7 @@ type Module interface { // Connectedness returns a state signaling connection capabilities. Connectedness(id peer.ID) network.Connectedness // NATStatus returns the current NAT status. - NATStatus() network.Reachability + NATStatus() (network.Reachability, error) // BlockPeer adds a peer to the set of blocked peers. BlockPeer(p peer.ID) error @@ -105,16 +104,14 @@ type Module interface { type manager struct { host HostBase ps *pubsub.PubSub - nat autonat.AutoNAT connGater *conngater.BasicConnectionGater bw *metrics.BandwidthCounter rm network.ResourceManager } func newManager( - host host.Host, + host HostBase, ps *pubsub.PubSub, - nat autonat.AutoNAT, cg *conngater.BasicConnectionGater, bw *metrics.BandwidthCounter, rm network.ResourceManager, @@ -122,7 +119,6 @@ func newManager( return &manager{ host: host, ps: ps, - nat: nat, connGater: cg, bw: bw, rm: rm, @@ -164,8 +160,18 @@ func (m *manager) Connectedness(id peer.ID) network.Connectedness { return m.host.Network().Connectedness(id) } -func (m *manager) NATStatus() network.Reachability { - return m.nat.Status() +func (m *manager) NATStatus() (network.Reachability, error) { + basic, ok := m.host.(*basichost.BasicHost) + if !ok { + return 0, fmt.Errorf("does not impl") // todo + } + // return a nice error if autonat is not enabled (e.g. light nodes + // do not have autonat enabled) + // TODO does this even happen where autonat is nil? + if basic.GetAutoNat() == nil { + return 0, fmt.Errorf("does not have") // todo + } + return basic.GetAutoNat().Status(), nil } func (m *manager) BlockPeer(p peer.ID) error { diff --git a/nodebuilder/p2p/p2p_test.go b/nodebuilder/p2p/p2p_test.go index d4877913be..29e9ce7685 100644 --- a/nodebuilder/p2p/p2p_test.go +++ b/nodebuilder/p2p/p2p_test.go @@ -13,7 +13,6 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/protocol" pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/p2p/host/autonat" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,7 +25,7 @@ func TestP2PModule_Host(t *testing.T) { require.NoError(t, err) host, peer := net.Hosts()[0], net.Hosts()[1] - mgr := newManager(host, nil, nil, nil, nil, nil) + mgr := newManager(host, nil, nil, nil, nil) // test all methods on `manager.host` assert.Equal(t, host.ID(), mgr.Info().ID) @@ -51,7 +50,7 @@ func TestP2PModule_ConnManager(t *testing.T) { peer, err := libp2p.New() require.NoError(t, err) - mgr := newManager(host, nil, nil, nil, nil, nil) + mgr := newManager(host, nil, nil, nil, nil) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -68,16 +67,15 @@ func TestP2PModule_ConnManager(t *testing.T) { // TestP2PModule_Autonat tests P2P Module methods on // the node's instance of AutoNAT. func TestP2PModule_Autonat(t *testing.T) { - net, err := mocknet.FullMeshConnected(2) - require.NoError(t, err) - host := net.Hosts()[0] - - nat, err := autonat.New(host) + host, err := libp2p.New(libp2p.EnableNATService()) require.NoError(t, err) - mgr := newManager(host, nil, nat, nil, nil, nil) + mgr := newManager(host, nil, nil, nil, nil) - assert.Equal(t, nat.Status(), mgr.NATStatus()) + status, err := mgr.NATStatus() + assert.NoError(t, err) + // TODO ?????? + t.Log(status) } // TestP2PModule_Bandwidth tests P2P Module methods on @@ -105,7 +103,7 @@ func TestP2PModule_Bandwidth(t *testing.T) { require.NoError(t, err) }) - mgr := newManager(host, nil, nil, nil, bw, nil) + mgr := newManager(host, nil, nil, bw, nil) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -159,7 +157,7 @@ func TestP2PModule_Pubsub(t *testing.T) { gs, err := pubsub.NewGossipSub(ctx, host) require.NoError(t, err) - mgr := newManager(host, gs, nil, nil, nil, nil) + mgr := newManager(host, gs, nil, nil, nil) topicStr := "test-topic" @@ -193,7 +191,7 @@ func TestP2PModule_ConnGater(t *testing.T) { gater, err := ConnectionGater(datastore.NewMapDatastore()) require.NoError(t, err) - mgr := newManager(nil, nil, nil, gater, nil, nil) + mgr := newManager(nil, nil, gater, nil, nil) assert.NoError(t, mgr.BlockPeer("badpeer")) assert.Len(t, mgr.ListBlockedPeers(), 1) @@ -207,7 +205,7 @@ func TestP2PModule_ResourceManager(t *testing.T) { rm, err := ResourceManager() require.NoError(t, err) - mgr := newManager(nil, nil, nil, nil, nil, rm) + mgr := newManager(nil, nil, nil, nil, rm) state, err := mgr.ResourceState() require.NoError(t, err) From ef49c6bbb7388a170411ca53b4d34782579ef42f Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Wed, 16 Nov 2022 16:10:02 +0100 Subject: [PATCH 10/16] refactor(nodebuilder/p2p): Better error messages for NATStatus --- nodebuilder/p2p/p2p.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index fb0914e578..398b5cafaa 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -3,6 +3,7 @@ package p2p import ( "context" "fmt" + "reflect" "github.com/libp2p/go-libp2p-core/metrics" "github.com/libp2p/go-libp2p-core/network" @@ -163,13 +164,12 @@ func (m *manager) Connectedness(id peer.ID) network.Connectedness { func (m *manager) NATStatus() (network.Reachability, error) { basic, ok := m.host.(*basichost.BasicHost) if !ok { - return 0, fmt.Errorf("does not impl") // todo + return 0, fmt.Errorf("unexpected implementation of host.Host, expected %s, got %T", + reflect.TypeOf(&basichost.BasicHost{}).String(), m.host) } - // return a nice error if autonat is not enabled (e.g. light nodes - // do not have autonat enabled) - // TODO does this even happen where autonat is nil? + // light nodes do not provide AutoNAT services by default if basic.GetAutoNat() == nil { - return 0, fmt.Errorf("does not have") // todo + return 0, fmt.Errorf("host does not provide AutoNAT services") } return basic.GetAutoNat().Status(), nil } From 6016729ef29a3ae8df9acffb1131083c065cd823 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Thu, 17 Nov 2022 16:30:01 +0100 Subject: [PATCH 11/16] refactor(nodebuilder/p2p): manager --> module --- nodebuilder/p2p/flags.go | 2 +- nodebuilder/p2p/module.go | 2 +- nodebuilder/p2p/p2p.go | 44 ++++++++++++++++++------------------- nodebuilder/p2p/p2p_test.go | 14 ++++++------ 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/nodebuilder/p2p/flags.go b/nodebuilder/p2p/flags.go index 82d03da697..f7a3d9c463 100644 --- a/nodebuilder/p2p/flags.go +++ b/nodebuilder/p2p/flags.go @@ -26,7 +26,7 @@ func Flags() *flag.FlagSet { mutualFlag, nil, `Comma-separated multiaddresses of mutual peers to keep a prioritized connection with. -Such connection is immune to peer scoring slashing and connection manager trimming. +Such connection is immune to peer scoring slashing and connection module trimming. Peers must bidirectionally point to each other. (Format: multiformats.io/multiaddr) `, ) diff --git a/nodebuilder/p2p/module.go b/nodebuilder/p2p/module.go index afa3c1746c..9570f02821 100644 --- a/nodebuilder/p2p/module.go +++ b/nodebuilder/p2p/module.go @@ -33,7 +33,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Provide(AddrsFactory(cfg.AnnounceAddresses, cfg.NoAnnounceAddresses)), fx.Provide(metrics.NewBandwidthCounter), fx.Provide(ResourceManager), - fx.Provide(newManager), + fx.Provide(newModule), fx.Invoke(Listen(cfg.ListenAddresses)), ) diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index 398b5cafaa..1abd62712b 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -100,9 +100,9 @@ type Module interface { PubSubPeers(topic string) []peer.ID } -// manager contains all components necessary to access information and +// module contains all components necessary to access information and // perform actions related to the node's p2p Host / operations. -type manager struct { +type module struct { host HostBase ps *pubsub.PubSub connGater *conngater.BasicConnectionGater @@ -110,14 +110,14 @@ type manager struct { rm network.ResourceManager } -func newManager( +func newModule( host HostBase, ps *pubsub.PubSub, cg *conngater.BasicConnectionGater, bw *metrics.BandwidthCounter, rm network.ResourceManager, ) Module { - return &manager{ + return &module{ host: host, ps: ps, connGater: cg, @@ -134,34 +134,34 @@ type Info struct { Addrs []ma.Multiaddr `json:"addrs"` } -func (m *manager) Info() Info { +func (m *module) Info() Info { return Info{ ID: m.host.ID(), Addrs: m.host.Addrs(), } } -func (m *manager) Peers() peer.IDSlice { +func (m *module) Peers() peer.IDSlice { return m.host.Peerstore().Peers() } -func (m *manager) PeerInfo(id peer.ID) peer.AddrInfo { +func (m *module) PeerInfo(id peer.ID) peer.AddrInfo { return m.host.Peerstore().PeerInfo(id) } -func (m *manager) Connect(ctx context.Context, pi peer.AddrInfo) error { +func (m *module) Connect(ctx context.Context, pi peer.AddrInfo) error { return m.host.Connect(ctx, pi) } -func (m *manager) ClosePeer(id peer.ID) error { +func (m *module) ClosePeer(id peer.ID) error { return m.host.Network().ClosePeer(id) } -func (m *manager) Connectedness(id peer.ID) network.Connectedness { +func (m *module) Connectedness(id peer.ID) network.Connectedness { return m.host.Network().Connectedness(id) } -func (m *manager) NATStatus() (network.Reachability, error) { +func (m *module) NATStatus() (network.Reachability, error) { basic, ok := m.host.(*basichost.BasicHost) if !ok { return 0, fmt.Errorf("unexpected implementation of host.Host, expected %s, got %T", @@ -174,43 +174,43 @@ func (m *manager) NATStatus() (network.Reachability, error) { return basic.GetAutoNat().Status(), nil } -func (m *manager) BlockPeer(p peer.ID) error { +func (m *module) BlockPeer(p peer.ID) error { return m.connGater.BlockPeer(p) } -func (m *manager) UnblockPeer(p peer.ID) error { +func (m *module) UnblockPeer(p peer.ID) error { return m.connGater.UnblockPeer(p) } -func (m *manager) ListBlockedPeers() []peer.ID { +func (m *module) ListBlockedPeers() []peer.ID { return m.connGater.ListBlockedPeers() } -func (m *manager) MutualAdd(id peer.ID, tag string) { +func (m *module) MutualAdd(id peer.ID, tag string) { m.host.ConnManager().Protect(id, tag) } -func (m *manager) MutualRm(id peer.ID, tag string) bool { +func (m *module) MutualRm(id peer.ID, tag string) bool { return m.host.ConnManager().Unprotect(id, tag) } -func (m *manager) IsMutual(id peer.ID, tag string) bool { +func (m *module) IsMutual(id peer.ID, tag string) bool { return m.host.ConnManager().IsProtected(id, tag) } -func (m *manager) BandwidthStats() metrics.Stats { +func (m *module) BandwidthStats() metrics.Stats { return m.bw.GetBandwidthTotals() } -func (m *manager) BandwidthForPeer(id peer.ID) metrics.Stats { +func (m *module) BandwidthForPeer(id peer.ID) metrics.Stats { return m.bw.GetBandwidthForPeer(id) } -func (m *manager) BandwidthForProtocol(proto protocol.ID) metrics.Stats { +func (m *module) BandwidthForProtocol(proto protocol.ID) metrics.Stats { return m.bw.GetBandwidthForProtocol(proto) } -func (m *manager) ResourceState() (rcmgr.ResourceManagerStat, error) { +func (m *module) ResourceState() (rcmgr.ResourceManagerStat, error) { rms, ok := m.rm.(rcmgr.ResourceManagerState) if !ok { return rcmgr.ResourceManagerStat{}, fmt.Errorf("network.ResourceManager does not implement " + @@ -219,6 +219,6 @@ func (m *manager) ResourceState() (rcmgr.ResourceManagerStat, error) { return rms.Stat(), nil } -func (m *manager) PubSubPeers(topic string) []peer.ID { +func (m *module) PubSubPeers(topic string) []peer.ID { return m.ps.ListPeers(topic) } diff --git a/nodebuilder/p2p/p2p_test.go b/nodebuilder/p2p/p2p_test.go index 29e9ce7685..64b56f5023 100644 --- a/nodebuilder/p2p/p2p_test.go +++ b/nodebuilder/p2p/p2p_test.go @@ -25,7 +25,7 @@ func TestP2PModule_Host(t *testing.T) { require.NoError(t, err) host, peer := net.Hosts()[0], net.Hosts()[1] - mgr := newManager(host, nil, nil, nil, nil) + mgr := newModule(host, nil, nil, nil, nil) // test all methods on `manager.host` assert.Equal(t, host.ID(), mgr.Info().ID) @@ -50,7 +50,7 @@ func TestP2PModule_ConnManager(t *testing.T) { peer, err := libp2p.New() require.NoError(t, err) - mgr := newManager(host, nil, nil, nil, nil) + mgr := newModule(host, nil, nil, nil, nil) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -70,7 +70,7 @@ func TestP2PModule_Autonat(t *testing.T) { host, err := libp2p.New(libp2p.EnableNATService()) require.NoError(t, err) - mgr := newManager(host, nil, nil, nil, nil) + mgr := newModule(host, nil, nil, nil, nil) status, err := mgr.NATStatus() assert.NoError(t, err) @@ -103,7 +103,7 @@ func TestP2PModule_Bandwidth(t *testing.T) { require.NoError(t, err) }) - mgr := newManager(host, nil, nil, bw, nil) + mgr := newModule(host, nil, nil, bw, nil) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -157,7 +157,7 @@ func TestP2PModule_Pubsub(t *testing.T) { gs, err := pubsub.NewGossipSub(ctx, host) require.NoError(t, err) - mgr := newManager(host, gs, nil, nil, nil) + mgr := newModule(host, gs, nil, nil, nil) topicStr := "test-topic" @@ -191,7 +191,7 @@ func TestP2PModule_ConnGater(t *testing.T) { gater, err := ConnectionGater(datastore.NewMapDatastore()) require.NoError(t, err) - mgr := newManager(nil, nil, gater, nil, nil) + mgr := newModule(nil, nil, gater, nil, nil) assert.NoError(t, mgr.BlockPeer("badpeer")) assert.Len(t, mgr.ListBlockedPeers(), 1) @@ -205,7 +205,7 @@ func TestP2PModule_ResourceManager(t *testing.T) { rm, err := ResourceManager() require.NoError(t, err) - mgr := newManager(nil, nil, nil, nil, rm) + mgr := newModule(nil, nil, nil, nil, rm) state, err := mgr.ResourceState() require.NoError(t, err) From 1ac01ec7c3c3d500b90dc31842105b7d2e7304b6 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Thu, 17 Nov 2022 17:09:48 +0100 Subject: [PATCH 12/16] refactor(nodebuilder/p2p): Address remainder of @wondertan s comments --- nodebuilder/p2p/module.go | 2 +- nodebuilder/p2p/p2p.go | 40 ++++++++++++++++++------------------- nodebuilder/p2p/p2p_test.go | 20 +++++++++---------- nodebuilder/p2p/resource.go | 2 +- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/nodebuilder/p2p/module.go b/nodebuilder/p2p/module.go index 9570f02821..97cb4eb037 100644 --- a/nodebuilder/p2p/module.go +++ b/nodebuilder/p2p/module.go @@ -32,7 +32,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Provide(ContentRouting), fx.Provide(AddrsFactory(cfg.AnnounceAddresses, cfg.NoAnnounceAddresses)), fx.Provide(metrics.NewBandwidthCounter), - fx.Provide(ResourceManager), + fx.Provide(resourceManager), fx.Provide(newModule), fx.Invoke(Listen(cfg.ListenAddresses)), ) diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index 1abd62712b..51ce2250d5 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" + libhost "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/metrics" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -13,7 +14,6 @@ import ( rcmgr "github.com/libp2p/go-libp2p-resource-manager" basichost "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/net/conngater" - ma "github.com/multiformats/go-multiaddr" ) // API is a wrapper around Module for the RPC. @@ -22,7 +22,7 @@ import ( //nolint:dupl type API struct { Info func() Info - Peers func() peer.IDSlice + Peers func() []peer.ID PeerInfo func(id peer.ID) peer.AddrInfo Connect func(ctx context.Context, pi peer.AddrInfo) error ClosePeer func(id peer.ID) error @@ -31,9 +31,9 @@ type API struct { BlockPeer func(p peer.ID) error UnblockPeer func(p peer.ID) error ListBlockedPeers func() []peer.ID - MutualAdd func(id peer.ID, tag string) - MutualRm func(id peer.ID, tag string) bool - IsMutual func(id peer.ID, tag string) bool + Protect func(id peer.ID, tag string) + Unprotect func(id peer.ID, tag string) bool + IsProtected func(id peer.ID, tag string) bool BandwidthStats func() metrics.Stats BandwidthForPeer func(id peer.ID) metrics.Stats BandwidthForProtocol func(proto protocol.ID) metrics.Stats @@ -49,7 +49,7 @@ type Module interface { // Info returns basic information about the node's p2p host/operations. Info() Info // Peers returns all peer IDs used across all inner stores. - Peers() peer.IDSlice + Peers() []peer.ID // PeerInfo returns a small slice of information Peerstore has on the // given peer. PeerInfo(id peer.ID) peer.AddrInfo @@ -70,17 +70,17 @@ type Module interface { UnblockPeer(p peer.ID) error // ListBlockedPeers returns a list of blocked peers. ListBlockedPeers() []peer.ID - // MutualAdd adds a peer to the list of peers who have a bidirectional + // Protect adds a peer to the list of peers who have a bidirectional // peering agreement that they are protected from being trimmed, dropped // or negatively scored. - MutualAdd(id peer.ID, tag string) - // MutualRm removes a peer from the list of peers who have a bidirectional + Protect(id peer.ID, tag string) + // Unprotect removes a peer from the list of peers who have a bidirectional // peering agreement that they are protected from being trimmed, dropped // or negatively scored, returning a bool representing whether the given // peer is protected or not. - MutualRm(id peer.ID, tag string) bool - // IsMutual returns whether the given peer is a mutual peer. - IsMutual(id peer.ID, tag string) bool + Unprotect(id peer.ID, tag string) bool + // IsProtected returns whether the given peer is protected. + IsProtected(id peer.ID, tag string) bool // BandwidthStats returns a Stats struct with bandwidth metrics for all // data sent/received by the local peer, regardless of protocol or remote @@ -130,18 +130,18 @@ func newModule( type Info struct { // ID is the node's peer ID ID peer.ID `json:"id"` - // Addrs is the node's - Addrs []ma.Multiaddr `json:"addrs"` + // AddrInfo is the node's + AddrInfo peer.AddrInfo `json:"addr_info"` } func (m *module) Info() Info { return Info{ - ID: m.host.ID(), - Addrs: m.host.Addrs(), + ID: m.host.ID(), + AddrInfo: *libhost.InfoFromHost(m.host), } } -func (m *module) Peers() peer.IDSlice { +func (m *module) Peers() []peer.ID { return m.host.Peerstore().Peers() } @@ -186,15 +186,15 @@ func (m *module) ListBlockedPeers() []peer.ID { return m.connGater.ListBlockedPeers() } -func (m *module) MutualAdd(id peer.ID, tag string) { +func (m *module) Protect(id peer.ID, tag string) { m.host.ConnManager().Protect(id, tag) } -func (m *module) MutualRm(id peer.ID, tag string) bool { +func (m *module) Unprotect(id peer.ID, tag string) bool { return m.host.ConnManager().Unprotect(id, tag) } -func (m *module) IsMutual(id peer.ID, tag string) bool { +func (m *module) IsProtected(id peer.ID, tag string) bool { return m.host.ConnManager().IsProtected(id, tag) } diff --git a/nodebuilder/p2p/p2p_test.go b/nodebuilder/p2p/p2p_test.go index 64b56f5023..449ae3f38a 100644 --- a/nodebuilder/p2p/p2p_test.go +++ b/nodebuilder/p2p/p2p_test.go @@ -11,6 +11,7 @@ import ( libhost "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/metrics" "github.com/libp2p/go-libp2p-core/network" + libpeer "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" pubsub "github.com/libp2p/go-libp2p-pubsub" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -29,7 +30,7 @@ func TestP2PModule_Host(t *testing.T) { // test all methods on `manager.host` assert.Equal(t, host.ID(), mgr.Info().ID) - assert.Equal(t, host.Peerstore().Peers(), mgr.Peers()) + assert.Equal(t, []libpeer.ID(host.Peerstore().Peers()), mgr.Peers()) assert.Equal(t, libhost.InfoFromHost(peer).ID, mgr.PeerInfo(peer.ID()).ID) assert.Equal(t, host.Network().Connectedness(peer.ID()), mgr.Connectedness(peer.ID())) @@ -58,10 +59,10 @@ func TestP2PModule_ConnManager(t *testing.T) { err = mgr.Connect(ctx, *libhost.InfoFromHost(peer)) require.NoError(t, err) - mgr.MutualAdd(peer.ID(), "test") - assert.True(t, mgr.IsMutual(peer.ID(), "test")) - mgr.MutualRm(peer.ID(), "test") - assert.False(t, mgr.IsMutual(peer.ID(), "test")) + mgr.Protect(peer.ID(), "test") + assert.True(t, mgr.IsProtected(peer.ID(), "test")) + mgr.Unprotect(peer.ID(), "test") + assert.False(t, mgr.IsProtected(peer.ID(), "test")) } // TestP2PModule_Autonat tests P2P Module methods on @@ -74,8 +75,7 @@ func TestP2PModule_Autonat(t *testing.T) { status, err := mgr.NATStatus() assert.NoError(t, err) - // TODO ?????? - t.Log(status) + assert.Equal(t, network.ReachabilityUnknown, status) } // TestP2PModule_Bandwidth tests P2P Module methods on @@ -129,9 +129,9 @@ func TestP2PModule_Bandwidth(t *testing.T) { _, err = stream.Read(buf) require.NoError(t, err) - // has to be a ~second for the metrics reporter to collect the stats + // has to be ~2 seconds for the metrics reporter to collect the stats // in the background process - time.Sleep(time.Second) + time.Sleep(time.Second * 2) stats := mgr.BandwidthStats() assert.NotNil(t, stats) @@ -202,7 +202,7 @@ func TestP2PModule_ConnGater(t *testing.T) { // TestP2PModule_ResourceManager tests P2P Module methods on // the ResourceManager. func TestP2PModule_ResourceManager(t *testing.T) { - rm, err := ResourceManager() + rm, err := resourceManager() require.NoError(t, err) mgr := newModule(nil, nil, nil, nil, rm) diff --git a/nodebuilder/p2p/resource.go b/nodebuilder/p2p/resource.go index b850d53112..d17bb4063f 100644 --- a/nodebuilder/p2p/resource.go +++ b/nodebuilder/p2p/resource.go @@ -5,6 +5,6 @@ import ( rcmgr "github.com/libp2p/go-libp2p-resource-manager" ) -func ResourceManager() (network.ResourceManager, error) { +func resourceManager() (network.ResourceManager, error) { return rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale())) } From ca0c917ddd041b82c828efefc980c4f5f52f698a Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Sat, 19 Nov 2022 12:08:30 +0100 Subject: [PATCH 13/16] doc(docs/adr): Update api adr with new func sig --- docs/adr/adr-009-public-api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/adr/adr-009-public-api.md b/docs/adr/adr-009-public-api.md index 575ad3b9fd..c1cad95838 100644 --- a/docs/adr/adr-009-public-api.md +++ b/docs/adr/adr-009-public-api.md @@ -143,7 +143,7 @@ SyncHead(ctx context.Context) (*header.ExtendedHeader, error) // Info returns basic information about the node's p2p host/operations. Info() p2p.Info // Peers returns all peer IDs used across all inner stores. - Peers() peer.IDSlice + Peers() []peer.ID // PeerInfo returns a small slice of information Peerstore has on the // given peer. PeerInfo(id peer.ID) peer.AddrInfo From 8b5f2f6c6ee3f08f1234e52cfcafa8570d537b7d Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Wed, 23 Nov 2022 18:10:56 +0100 Subject: [PATCH 14/16] refactor(nodebuilder/p2p): address wondernits --- go.mod | 7 ++----- nodebuilder/p2p/module.go | 6 +++++- nodebuilder/p2p/p2p.go | 4 ---- nodebuilder/p2p/p2p_test.go | 3 ++- nodebuilder/p2p/resource.go | 10 ---------- 5 files changed, 9 insertions(+), 21 deletions(-) delete mode 100644 nodebuilder/p2p/resource.go diff --git a/go.mod b/go.mod index daa38b43d8..b3a8abd6d9 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/filecoin-project/go-jsonrpc v0.1.8 github.com/gammazero/workerpool v1.1.3 github.com/gogo/protobuf v1.3.3 + github.com/golang/mock v1.6.0 github.com/gorilla/mux v1.8.0 github.com/hashicorp/go-retryablehttp v0.7.1-0.20211018174820-ff6d014e72d9 github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d @@ -42,6 +43,7 @@ require ( github.com/libp2p/go-libp2p-peerstore v0.7.1 github.com/libp2p/go-libp2p-pubsub v0.7.0 github.com/libp2p/go-libp2p-record v0.1.3 + github.com/libp2p/go-libp2p-resource-manager v0.5.1 github.com/libp2p/go-libp2p-routing-helpers v0.2.3 github.com/minio/sha256-simd v1.0.0 github.com/mitchellh/go-homedir v1.1.0 @@ -68,11 +70,6 @@ require ( google.golang.org/grpc v1.49.0 ) -require ( - github.com/golang/mock v1.6.0 - github.com/libp2p/go-libp2p-resource-manager v0.5.1 -) - require ( cloud.google.com/go v0.100.2 // indirect cloud.google.com/go/compute v1.6.1 // indirect diff --git a/nodebuilder/p2p/module.go b/nodebuilder/p2p/module.go index 97cb4eb037..524e4d44b7 100644 --- a/nodebuilder/p2p/module.go +++ b/nodebuilder/p2p/module.go @@ -3,6 +3,8 @@ package p2p import ( logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/metrics" + "github.com/libp2p/go-libp2p-core/network" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" "go.uber.org/fx" "github.com/celestiaorg/celestia-node/nodebuilder/node" @@ -32,7 +34,9 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Provide(ContentRouting), fx.Provide(AddrsFactory(cfg.AnnounceAddresses, cfg.NoAnnounceAddresses)), fx.Provide(metrics.NewBandwidthCounter), - fx.Provide(resourceManager), + fx.Provide(func() (network.ResourceManager, error) { + return rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale())) + }), fx.Provide(newModule), fx.Invoke(Listen(cfg.ListenAddresses)), ) diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index 51ce2250d5..dd84e78352 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -167,10 +167,6 @@ func (m *module) NATStatus() (network.Reachability, error) { return 0, fmt.Errorf("unexpected implementation of host.Host, expected %s, got %T", reflect.TypeOf(&basichost.BasicHost{}).String(), m.host) } - // light nodes do not provide AutoNAT services by default - if basic.GetAutoNat() == nil { - return 0, fmt.Errorf("host does not provide AutoNAT services") - } return basic.GetAutoNat().Status(), nil } diff --git a/nodebuilder/p2p/p2p_test.go b/nodebuilder/p2p/p2p_test.go index 449ae3f38a..b79e8bc63d 100644 --- a/nodebuilder/p2p/p2p_test.go +++ b/nodebuilder/p2p/p2p_test.go @@ -14,6 +14,7 @@ import ( libpeer "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" pubsub "github.com/libp2p/go-libp2p-pubsub" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -202,7 +203,7 @@ func TestP2PModule_ConnGater(t *testing.T) { // TestP2PModule_ResourceManager tests P2P Module methods on // the ResourceManager. func TestP2PModule_ResourceManager(t *testing.T) { - rm, err := resourceManager() + rm, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale())) require.NoError(t, err) mgr := newModule(nil, nil, nil, nil, rm) diff --git a/nodebuilder/p2p/resource.go b/nodebuilder/p2p/resource.go deleted file mode 100644 index d17bb4063f..0000000000 --- a/nodebuilder/p2p/resource.go +++ /dev/null @@ -1,10 +0,0 @@ -package p2p - -import ( - "github.com/libp2p/go-libp2p-core/network" - rcmgr "github.com/libp2p/go-libp2p-resource-manager" -) - -func resourceManager() (network.ResourceManager, error) { - return rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale())) -} From f0a9a7a17e2ff531dcf6be0e4d1bc70dfafd90e0 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Wed, 23 Nov 2022 18:24:03 +0100 Subject: [PATCH 15/16] refactor(nodebuilder/p2p): more wondernits --- nodebuilder/p2p/p2p.go | 53 +++++++++++++++++-------------------- nodebuilder/p2p/p2p_test.go | 3 +-- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index dd84e78352..39ec2fc28f 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -16,31 +16,6 @@ import ( "github.com/libp2p/go-libp2p/p2p/net/conngater" ) -// API is a wrapper around Module for the RPC. -// TODO(@distractedm1nd): These structs need to be autogenerated. -// -//nolint:dupl -type API struct { - Info func() Info - Peers func() []peer.ID - PeerInfo func(id peer.ID) peer.AddrInfo - Connect func(ctx context.Context, pi peer.AddrInfo) error - ClosePeer func(id peer.ID) error - Connectedness func(id peer.ID) network.Connectedness - NATStatus func() (network.Reachability, error) - BlockPeer func(p peer.ID) error - UnblockPeer func(p peer.ID) error - ListBlockedPeers func() []peer.ID - Protect func(id peer.ID, tag string) - Unprotect func(id peer.ID, tag string) bool - IsProtected func(id peer.ID, tag string) bool - BandwidthStats func() metrics.Stats - BandwidthForPeer func(id peer.ID) metrics.Stats - BandwidthForProtocol func(proto protocol.ID) metrics.Stats - ResourceState func() (rcmgr.ResourceManagerStat, error) - PubSubPeers func(topic string) []peer.ID -} - // Module represents all accessible methods related to the node's p2p // host / operations. // @@ -128,15 +103,12 @@ func newModule( // Info contains basic information about the node's p2p host/operations. type Info struct { - // ID is the node's peer ID - ID peer.ID `json:"id"` // AddrInfo is the node's AddrInfo peer.AddrInfo `json:"addr_info"` } func (m *module) Info() Info { return Info{ - ID: m.host.ID(), AddrInfo: *libhost.InfoFromHost(m.host), } } @@ -218,3 +190,28 @@ func (m *module) ResourceState() (rcmgr.ResourceManagerStat, error) { func (m *module) PubSubPeers(topic string) []peer.ID { return m.ps.ListPeers(topic) } + +// API is a wrapper around Module for the RPC. +// TODO(@distractedm1nd): These structs need to be autogenerated. +// +//nolint:dupl +type API struct { + Info func() Info + Peers func() []peer.ID + PeerInfo func(id peer.ID) peer.AddrInfo + Connect func(ctx context.Context, pi peer.AddrInfo) error + ClosePeer func(id peer.ID) error + Connectedness func(id peer.ID) network.Connectedness + NATStatus func() (network.Reachability, error) + BlockPeer func(p peer.ID) error + UnblockPeer func(p peer.ID) error + ListBlockedPeers func() []peer.ID + Protect func(id peer.ID, tag string) + Unprotect func(id peer.ID, tag string) bool + IsProtected func(id peer.ID, tag string) bool + BandwidthStats func() metrics.Stats + BandwidthForPeer func(id peer.ID) metrics.Stats + BandwidthForProtocol func(proto protocol.ID) metrics.Stats + ResourceState func() (rcmgr.ResourceManagerStat, error) + PubSubPeers func(topic string) []peer.ID +} diff --git a/nodebuilder/p2p/p2p_test.go b/nodebuilder/p2p/p2p_test.go index b79e8bc63d..ed8a7e6a21 100644 --- a/nodebuilder/p2p/p2p_test.go +++ b/nodebuilder/p2p/p2p_test.go @@ -30,7 +30,6 @@ func TestP2PModule_Host(t *testing.T) { mgr := newModule(host, nil, nil, nil, nil) // test all methods on `manager.host` - assert.Equal(t, host.ID(), mgr.Info().ID) assert.Equal(t, []libpeer.ID(host.Peerstore().Peers()), mgr.Peers()) assert.Equal(t, libhost.InfoFromHost(peer).ID, mgr.PeerInfo(peer.ID()).ID) @@ -116,7 +115,7 @@ func TestP2PModule_Bandwidth(t *testing.T) { require.Equal(t, network.Connected, mgr.Connectedness(peer.ID())) // open stream with host - stream, err := peer.NewStream(ctx, mgr.Info().ID, protoID) + stream, err := peer.NewStream(ctx, mgr.Info().AddrInfo.ID, protoID) require.NoError(t, err) // write to stream to increase bandwidth usage get some substantive From 90b6152698e1940be54a318636f4b189fd5b8689 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Wed, 23 Nov 2022 18:32:31 +0100 Subject: [PATCH 16/16] refactor(nodebuilder/p2p): change Info() method --- docs/adr/adr-009-public-api.md | 4 ++-- nodebuilder/p2p/p2p.go | 18 +++++------------- nodebuilder/p2p/p2p_test.go | 2 +- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/docs/adr/adr-009-public-api.md b/docs/adr/adr-009-public-api.md index c1cad95838..56ae47a14e 100644 --- a/docs/adr/adr-009-public-api.md +++ b/docs/adr/adr-009-public-api.md @@ -140,8 +140,8 @@ SyncHead(ctx context.Context) (*header.ExtendedHeader, error) ```go type P2PModule interface { - // Info returns basic information about the node's p2p host/operations. - Info() p2p.Info + // Info returns address information about the host. + Info() peer.AddrInfo // Peers returns all peer IDs used across all inner stores. Peers() []peer.ID // PeerInfo returns a small slice of information Peerstore has on the diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index 39ec2fc28f..e27e30034a 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -21,8 +21,8 @@ import ( // //nolint:dupl type Module interface { - // Info returns basic information about the node's p2p host/operations. - Info() Info + // Info returns address information about the host. + Info() peer.AddrInfo // Peers returns all peer IDs used across all inner stores. Peers() []peer.ID // PeerInfo returns a small slice of information Peerstore has on the @@ -101,16 +101,8 @@ func newModule( } } -// Info contains basic information about the node's p2p host/operations. -type Info struct { - // AddrInfo is the node's - AddrInfo peer.AddrInfo `json:"addr_info"` -} - -func (m *module) Info() Info { - return Info{ - AddrInfo: *libhost.InfoFromHost(m.host), - } +func (m *module) Info() peer.AddrInfo { + return *libhost.InfoFromHost(m.host) } func (m *module) Peers() []peer.ID { @@ -196,7 +188,7 @@ func (m *module) PubSubPeers(topic string) []peer.ID { // //nolint:dupl type API struct { - Info func() Info + Info func() peer.AddrInfo Peers func() []peer.ID PeerInfo func(id peer.ID) peer.AddrInfo Connect func(ctx context.Context, pi peer.AddrInfo) error diff --git a/nodebuilder/p2p/p2p_test.go b/nodebuilder/p2p/p2p_test.go index ed8a7e6a21..da77ff4562 100644 --- a/nodebuilder/p2p/p2p_test.go +++ b/nodebuilder/p2p/p2p_test.go @@ -115,7 +115,7 @@ func TestP2PModule_Bandwidth(t *testing.T) { require.Equal(t, network.Connected, mgr.Connectedness(peer.ID())) // open stream with host - stream, err := peer.NewStream(ctx, mgr.Info().AddrInfo.ID, protoID) + stream, err := peer.NewStream(ctx, mgr.Info().ID, protoID) require.NoError(t, err) // write to stream to increase bandwidth usage get some substantive