diff --git a/network/p2p/README.md b/network/p2p/README.md index 8490e391b6..8c88f4e70c 100644 --- a/network/p2p/README.md +++ b/network/p2p/README.md @@ -23,7 +23,7 @@ Libp2p also provides an implementation of a message-based gossip protocol, Gossi Algorand's current network protocol sends messages between peers over bidirectional WebSocket connections. Nodes that are configured to enable message-forwarding (including -nodes currently called "relays") validate incoming messages, then selectively forward +nodes currently called "relays") validate incoming messages, then selectively forward messages to other connected peers. This network implementation (`WebsocketNetwork`) sits behind the `GossipNode` interface in the network package. @@ -36,8 +36,8 @@ via peer connections managed by libp2p. The `P2PNetwork` implementation uses and [peer IDs](https://docs.libp2p.io/concepts/fundamentals/peers/#peer-ids-in-multiaddrs) to establish connections and identify peers. -Currently transactions (protocol tag `TX`) are distributed using the GossipSub protocol, -while all other messages are forwarded over a custom message protocol `/algorand-ws/1.0.0` +Currently transactions (protocol tag `TX`) are distributed using the GossipSub protocol (see [pubsub.go](./pubsub.go)), +while all other messages are forwarded over a custom message protocol `/algorand-ws/1.0.0` (see [streams.go](./streams.go)) that uses the same message serialization as the existing `WebsocketNetwork` implementation. These two protocols are multiplexed over a single connection using libp2p streams. @@ -63,3 +63,77 @@ graph LR AW --> WS S --> T ``` + +### Sub-components + +#### DHT and capabilities + +Provides helper methods to construct DHT discovery service using `go-libp2p-kad-dht` library. +High level [CapabilitiesDiscovery](./capabilities.go) class supports retrieving (`PeersForCapability`) +peers by a given capability(-ies) or advertising own capabilities (`AdvertiseCapabilities`). + +Note, by default private and non-routable addresses are filtered (see `AddrsFactory`), +libp2p's `ObservedAddrManager` can track own public address and makes it available +(and so that discoverable with DHT) if it was observed at least 4 times in 30 minutes (as libp2p@v0.33.2). + +```mermaid +graph LR + + subgraph "node" + Cap[Capabilities] + end + + subgraph "P2P Implementation" + P2P[P2PNetwork] + AdvCap[AdvertiseCapabilities] + end + + P2P --> AdvCap + Cap --> P2P + + subgraph "libp2p" + Adv[Advertise] + Addr[Addrs] + OAM[ObservedAddrManager] + AF[AddrFactory] + KAD["/kad/1.0.0"] + end + + OAM -.-> Addr + AF -.-> Addr + AdvCap --> Adv + + subgraph "libp2p-kad-dht" + Pro[Provide] + end + + Addr -.-> Pro + Adv --> Pro + Pro --> KAD +``` + +#### HTTP over libp2p connection + +libp2p@0.33 added ability to multiplex HTTP traffic in p2p connection. +A custom `/algorand-http/1.0.0` stream is utilized to expose HTTP server and allow +network service clients (catchup, catchpoint, txsync) to register its own handlers +similarly to the legacy ws-net implementation. + +#### Peerstore + +In-memory peerstore implements `libp2p.Peerstore` and go-algorand `Phonebook` interfaces. +Peer classes (relays, archival, etc) and persistent peers (i.e. peers from command line or phonebook.json) +are supported. Possible enhancement is to save/load peerstore to/from disk to tolerate bootstrap nodes failures. + +#### Logging + +lip2p uses zap logger as a separate `ipfs/go-log/v2` module. `EnableP2PLogging` helper adds +go-algorand's `logrus` as a custom zap core so that all libp2p logs go through go-algorand logging facility. +Unfortunately `ipfs/go-log/v2` has a primary logging core as module variable that makes impossible +to have custom `logrus` sub-loggers in unit tests. + +#### Metrics + +`go-libp2p` uses Prometheus as a metrics library, `go-libp2p-kad-dht` relies on OpenCensus library. +go-algorand has two collectors (see `util/metrics) for both Prometheus and OpenCensus for +counters and gauges with labels. Other types (summary, histogram, distribution) are not supported at the moment. \ No newline at end of file diff --git a/network/p2p/capabilities.go b/network/p2p/capabilities.go index e5781aa389..7a418767d1 100644 --- a/network/p2p/capabilities.go +++ b/network/p2p/capabilities.go @@ -56,13 +56,13 @@ type CapabilitiesDiscovery struct { wg sync.WaitGroup } -// Advertise implements the discovery.Discovery/discovery.Advertiser interface -func (c *CapabilitiesDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { +// advertise implements the discovery.Discovery/discovery.Advertiser interface +func (c *CapabilitiesDiscovery) advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { return c.disc.Advertise(ctx, ns, opts...) } -// FindPeers implements the discovery.Discovery/discovery.Discoverer interface -func (c *CapabilitiesDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { +// findPeers implements the discovery.Discovery/discovery.Discoverer interface +func (c *CapabilitiesDiscovery) findPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { return c.disc.FindPeers(ctx, ns, opts...) } @@ -78,8 +78,8 @@ func (c *CapabilitiesDiscovery) Host() host.Host { return c.dht.Host() } -// AddPeer adds a given peer.AddrInfo to the Host's Peerstore, and the DHT's routing table -func (c *CapabilitiesDiscovery) AddPeer(p peer.AddrInfo) (bool, error) { +// addPeer adds a given peer.AddrInfo to the Host's Peerstore, and the DHT's routing table +func (c *CapabilitiesDiscovery) addPeer(p peer.AddrInfo) (bool, error) { c.Host().Peerstore().AddAddrs(p.ID, p.Addrs, libpeerstore.AddressTTL) return c.dht.RoutingTable().TryAddPeer(p.ID, true, true) } @@ -93,7 +93,7 @@ func (c *CapabilitiesDiscovery) PeersForCapability(capability Capability, n int) var peers []peer.AddrInfo // +1 because it can include self but we exclude self from the returned list // that might confuse the caller (and tests assertions) - peersChan, err := c.FindPeers(ctx, string(capability), discovery.Limit(n+1)) + peersChan, err := c.findPeers(ctx, string(capability), discovery.Limit(n+1)) if err != nil { return nil, err } @@ -128,7 +128,7 @@ func (c *CapabilitiesDiscovery) AdvertiseCapabilities(capabilities ...Capability var err error advertisementInterval := maxAdvertisementInterval for _, capa := range capabilities { - ttl, err0 := c.Advertise(c.dht.Context(), string(capa)) + ttl, err0 := c.advertise(c.dht.Context(), string(capa)) if err0 != nil { err = err0 c.log.Errorf("failed to advertise for capability %s: %v", capa, err0) diff --git a/network/p2p/capabilities_test.go b/network/p2p/capabilities_test.go index 881860f647..5b41ed70d8 100644 --- a/network/p2p/capabilities_test.go +++ b/network/p2p/capabilities_test.go @@ -62,7 +62,7 @@ func TestCapabilities_Discovery(t *testing.T) { for _, capD := range caps { peersAdded := 0 for _, addr := range addrs { - added, err := capD.AddPeer(addr) + added, err := capD.addPeer(addr) require.NoError(t, err) require.True(t, added) peersAdded++