-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Libp2p] Add Libp2p module (part 4) #545
Changes from all commits
e493a67
b2a9b21
7353f9f
2d5a36b
2bac55a
4ca52eb
229355d
de8dc86
89126b4
42f8e74
8442aa4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,374 @@ | ||||||||||
/* | ||||||||||
TECHDEBT: This module currently imports types from the "legacy" P2P module. | ||||||||||
|
||||||||||
Migration path: | ||||||||||
1. Redefine P2P concrete types in terms of interfaces | ||||||||||
- PeersManager (raintree/peersManager) | ||||||||||
- Peer (p2p/types/NetworkPeer) | ||||||||||
- AddrBook (p2p/types/AddrBook) | ||||||||||
- AddrBookMap (p2p/types/NetworkPeer) | ||||||||||
- rainTreeNetwork doesn't depend on any concrete p2p types | ||||||||||
2. Simplify libp2p module implementation | ||||||||||
- Transport likely reduces to nothing | ||||||||||
- Network interface can be simplified | ||||||||||
- Consider renaming network as it functions more like a "router" | ||||||||||
(NB: could be replaced in future iterations with a "raintree pubsub router") | ||||||||||
3. Remove "legacy" P2P module & rename libp2p module directory (possibly object names as well) | ||||||||||
- P2PModule interface can be simplified | ||||||||||
- Clean up TECHDEBT introduced in debug CLI and node startup | ||||||||||
*/ | ||||||||||
package libp2p | ||||||||||
|
||||||||||
import ( | ||||||||||
"context" | ||||||||||
"fmt" | ||||||||||
"io" | ||||||||||
"time" | ||||||||||
|
||||||||||
"github.com/libp2p/go-libp2p" | ||||||||||
pubsub "github.com/libp2p/go-libp2p-pubsub" | ||||||||||
"github.com/libp2p/go-libp2p/core/host" | ||||||||||
libp2pNetwork "github.com/libp2p/go-libp2p/core/network" | ||||||||||
"github.com/multiformats/go-multiaddr" | ||||||||||
"google.golang.org/protobuf/proto" | ||||||||||
"google.golang.org/protobuf/types/known/anypb" | ||||||||||
|
||||||||||
"github.com/pokt-network/pocket/libp2p/network" | ||||||||||
"github.com/pokt-network/pocket/libp2p/protocol" | ||||||||||
"github.com/pokt-network/pocket/logger" | ||||||||||
typesP2P "github.com/pokt-network/pocket/p2p/types" | ||||||||||
"github.com/pokt-network/pocket/runtime/configs" | ||||||||||
"github.com/pokt-network/pocket/runtime/configs/types" | ||||||||||
"github.com/pokt-network/pocket/shared/crypto" | ||||||||||
"github.com/pokt-network/pocket/shared/messaging" | ||||||||||
"github.com/pokt-network/pocket/shared/modules" | ||||||||||
"github.com/pokt-network/pocket/shared/modules/base_modules" | ||||||||||
) | ||||||||||
|
||||||||||
var _ modules.P2PModule = &libp2pModule{} | ||||||||||
|
||||||||||
type libp2pModule struct { | ||||||||||
base_modules.IntegratableModule | ||||||||||
|
||||||||||
logger *modules.Logger | ||||||||||
cfg *configs.P2PConfig | ||||||||||
identity libp2p.Option | ||||||||||
listenAddrs libp2p.Option | ||||||||||
// host encapsulates libp2p peerstore & connection manager | ||||||||||
host host.Host | ||||||||||
// pubsub is used for broadcast communication | ||||||||||
// (i.e. multiple, unidentified receivers) | ||||||||||
pubsub *pubsub.PubSub | ||||||||||
// topic similar to pubsub but received messages are filtered by a "topic" string. | ||||||||||
// Published messages are also given the respective topic before broadcast. | ||||||||||
topic *pubsub.Topic | ||||||||||
// subscription provides an interface to continuously read messages from. | ||||||||||
subscription *pubsub.Subscription | ||||||||||
network typesP2P.Network | ||||||||||
} | ||||||||||
|
||||||||||
var ( | ||||||||||
// TECHDEBT: configure timeouts. Consider security exposure vs. real-world conditions). | ||||||||||
// TECHDEBT: parameterize and expose via config. | ||||||||||
// readStreamTimeout is the duration to wait for a read operation on a | ||||||||||
// stream to complete, after which the stream is closed ("timed out"). | ||||||||||
readStreamTimeoutDuration = time.Second * 10 | ||||||||||
) | ||||||||||
|
||||||||||
func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) { | ||||||||||
return new(libp2pModule).Create(bus, options...) | ||||||||||
} | ||||||||||
|
||||||||||
func (mod *libp2pModule) GetModuleName() string { | ||||||||||
return modules.P2PModuleName | ||||||||||
} | ||||||||||
|
||||||||||
func (_ *libp2pModule) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) { | ||||||||||
logger.Global.Debug().Msg("Creating libp2p-backed network module") | ||||||||||
mod := &libp2pModule{ | ||||||||||
cfg: bus.GetRuntimeMgr().GetConfig().P2P, | ||||||||||
logger: logger.Global.CreateLoggerForModule(modules.P2PModuleName), | ||||||||||
} | ||||||||||
|
||||||||||
// MUST call before referencing mod.bus to ensure != nil. | ||||||||||
bus.RegisterModule(mod) | ||||||||||
|
||||||||||
for _, option := range options { | ||||||||||
option(mod) | ||||||||||
} | ||||||||||
|
||||||||||
// TECHDEBT: investigate any unnecessary | ||||||||||
// key exposure / duplication in memory | ||||||||||
privateKey, err := crypto.NewLibP2PPrivateKey(mod.cfg.PrivateKey) | ||||||||||
if err != nil { | ||||||||||
return nil, fmt.Errorf("loading private key: %w", err) | ||||||||||
} | ||||||||||
|
||||||||||
mod.identity = libp2p.Identity(privateKey) | ||||||||||
|
||||||||||
// INCOMPLETE: support RainTree network | ||||||||||
if mod.cfg.UseRainTree { | ||||||||||
return nil, fmt.Errorf("%s", "raintree is not yet compatible with libp2p") | ||||||||||
} | ||||||||||
|
||||||||||
switch mod.cfg.ConnectionType { | ||||||||||
case types.ConnectionType_TCPConnection: | ||||||||||
addr, err := mod.getMultiaddr() | ||||||||||
if err != nil { | ||||||||||
return nil, fmt.Errorf("parsing multiaddr from config: %w", err) | ||||||||||
} | ||||||||||
mod.listenAddrs = libp2p.ListenAddrs(addr) | ||||||||||
case types.ConnectionType_EmptyConnection: | ||||||||||
mod.listenAddrs = libp2p.NoListenAddrs | ||||||||||
default: | ||||||||||
return nil, fmt.Errorf( | ||||||||||
// DISCUSS: rename to "transport protocol" instead. | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's do it |
||||||||||
"unsupported connection type: %s: %w", | ||||||||||
mod.cfg.ConnectionType, | ||||||||||
err, | ||||||||||
) | ||||||||||
} | ||||||||||
|
||||||||||
return mod, nil | ||||||||||
} | ||||||||||
|
||||||||||
func (mod *libp2pModule) Start() error { | ||||||||||
// IMPROVE: receive context in interface methods. | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💯 |
||||||||||
ctx := context.Background() | ||||||||||
|
||||||||||
// TECHDEBT: metrics integration. | ||||||||||
var err error | ||||||||||
opts := []libp2p.Option{ | ||||||||||
mod.identity, | ||||||||||
// INCOMPLETE(#544): add transport security! | ||||||||||
} | ||||||||||
|
||||||||||
// Disable unused libp2p relay and ping services in client debug mode. | ||||||||||
// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#DisableRelay | ||||||||||
// and https://pkg.go.dev/github.com/libp2p/go-libp2p#Ping) | ||||||||||
if mod.isClientDebugMode() { | ||||||||||
opts = append(opts, | ||||||||||
libp2p.DisableRelay(), | ||||||||||
libp2p.Ping(false), | ||||||||||
libp2p.NoListenAddrs, | ||||||||||
) | ||||||||||
} else { | ||||||||||
opts = append(opts, mod.listenAddrs) | ||||||||||
} | ||||||||||
|
||||||||||
// Represents a libp2p network node, `libp2p.New` configures | ||||||||||
// and starts listening according to options. | ||||||||||
// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme) | ||||||||||
mod.host, err = libp2p.New(opts...) | ||||||||||
if err != nil { | ||||||||||
return fmt.Errorf("unable to create libp2p host: %w", err) | ||||||||||
} | ||||||||||
|
||||||||||
listenAddrLogEvent := mod.logger.Info() | ||||||||||
for i, addr := range host.InfoFromHost(mod.host).Addrs { | ||||||||||
listenAddrLogEvent.Str(fmt.Sprintf("listen_addr_%d", i), addr.String()) | ||||||||||
} | ||||||||||
listenAddrLogEvent.Msg("Listening for incoming connections...") | ||||||||||
|
||||||||||
// TECHDEBT: use RandomSub or GossipSub once we're on more stable ground. | ||||||||||
// IMPROVE: consider supporting multiple router types via config. | ||||||||||
mod.pubsub, err = pubsub.NewFloodSub(ctx, mod.host) | ||||||||||
if err != nil { | ||||||||||
return fmt.Errorf("unable to create pubsub: %w", err) | ||||||||||
} | ||||||||||
|
||||||||||
// Topic is used to `#Publish` messages. | ||||||||||
mod.topic, err = mod.pubsub.Join(protocol.DefaultTopicStr) | ||||||||||
if err != nil { | ||||||||||
return fmt.Errorf("unable to join pubsub topic: %w", err) | ||||||||||
} | ||||||||||
|
||||||||||
// Subscription is notified when a new message is received on the topic. | ||||||||||
mod.subscription, err = mod.topic.Subscribe() | ||||||||||
if err != nil { | ||||||||||
return fmt.Errorf("subscribing to pubsub topic: %w", err) | ||||||||||
} | ||||||||||
|
||||||||||
mod.network, err = network.NewLibp2pNetwork(mod.GetBus(), mod.logger, mod.host, mod.topic) | ||||||||||
if err != nil { | ||||||||||
return fmt.Errorf("creating network: %w", err) | ||||||||||
} | ||||||||||
|
||||||||||
// Don't handle streams or read from the subscription in client debug mode. | ||||||||||
if !mod.isClientDebugMode() { | ||||||||||
mod.host.SetStreamHandler(protocol.PoktProtocolID, mod.handleStream) | ||||||||||
go mod.readFromSubscription(ctx) | ||||||||||
} | ||||||||||
return nil | ||||||||||
} | ||||||||||
|
||||||||||
func (mod *libp2pModule) Stop() error { | ||||||||||
return mod.host.Close() | ||||||||||
} | ||||||||||
|
||||||||||
func (mod *libp2pModule) Broadcast(msg *anypb.Any) error { | ||||||||||
c := &messaging.PocketEnvelope{ | ||||||||||
Content: msg, | ||||||||||
} | ||||||||||
//TECHDEBT: use shared/codec for marshalling | ||||||||||
data, err := proto.MarshalOptions{Deterministic: true}.Marshal(c) | ||||||||||
bryanchriswhite marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
if err != nil { | ||||||||||
return err | ||||||||||
} | ||||||||||
mod.logger.Info().Msg("broadcasting message to network") | ||||||||||
|
||||||||||
return mod.network.NetworkBroadcast(data) | ||||||||||
} | ||||||||||
|
||||||||||
func (mod *libp2pModule) Send(addr crypto.Address, msg *anypb.Any) error { | ||||||||||
c := &messaging.PocketEnvelope{ | ||||||||||
Content: msg, | ||||||||||
} | ||||||||||
//TECHDEBT: use shared/codec for marshalling | ||||||||||
data, err := proto.MarshalOptions{Deterministic: true}.Marshal(c) | ||||||||||
if err != nil { | ||||||||||
return err | ||||||||||
} | ||||||||||
|
||||||||||
return mod.network.NetworkSend(data, addr) | ||||||||||
} | ||||||||||
|
||||||||||
func (mod *libp2pModule) GetAddress() (crypto.Address, error) { | ||||||||||
privateKey, err := crypto.NewPrivateKey(mod.cfg.PrivateKey) | ||||||||||
if err != nil { | ||||||||||
return nil, err | ||||||||||
} | ||||||||||
|
||||||||||
return privateKey.Address(), nil | ||||||||||
} | ||||||||||
|
||||||||||
// HandleEvent implements the respective `modules.Module` interface method. | ||||||||||
func (mod *libp2pModule) HandleEvent(msg *anypb.Any) error { | ||||||||||
return nil | ||||||||||
} | ||||||||||
|
||||||||||
func (mod *libp2pModule) isClientDebugMode() bool { | ||||||||||
return mod.GetBus().GetRuntimeMgr().GetConfig().ClientDebugMode | ||||||||||
} | ||||||||||
|
||||||||||
// handleStream is called each time a peer establishes a new stream with this | ||||||||||
// module's libp2p `host.Host`. | ||||||||||
func (mod *libp2pModule) handleStream(stream libp2pNetwork.Stream) { | ||||||||||
peer, err := network.PeerFromLibp2pStream(stream) | ||||||||||
if err != nil { | ||||||||||
mod.logger.Error().Err(err). | ||||||||||
Str("address", peer.Address.String()). | ||||||||||
Msg("parsing remote peer public key") | ||||||||||
|
||||||||||
if err = stream.Close(); err != nil { | ||||||||||
mod.logger.Error().Err(err) | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
if err := mod.network.AddPeerToAddrBook(peer); err != nil { | ||||||||||
mod.logger.Error().Err(err). | ||||||||||
Str("address", peer.Address.String()). | ||||||||||
Msg("adding remote peer to address book") | ||||||||||
} | ||||||||||
|
||||||||||
go mod.readStream(stream) | ||||||||||
} | ||||||||||
|
||||||||||
// readStream is intended to be called in a goroutine. It continuously reads from | ||||||||||
// the given stream for handling at the network level. Used for handling "direct" | ||||||||||
// messages (i.e. one specific target node). | ||||||||||
func (mod *libp2pModule) readStream(stream libp2pNetwork.Stream) { | ||||||||||
closeStream := func() { | ||||||||||
if err := stream.Close(); err != nil { | ||||||||||
mod.logger.Error().Err(err) | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
// NB: time out if no data is sent to free resources. | ||||||||||
if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil { | ||||||||||
mod.logger.Error().Err(err).Msg("setting stream read deadline") | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that we are settling on this pattern (Added the Bool(true)) for this kind of thing to help with 🪵 🔍
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @deblasis can you elaborate on (or link to) the thinking behind this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Of course. We have a bunch of places in our codebase where we log a message and we also attach a As we mature our observability stack, it could be useful to search for these and also to inform the users that something is still in development and the log entries are merely placeholders for future real functionality There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also have #478 |
||||||||||
// TODO: abort if we can't set a read deadline? | ||||||||||
} | ||||||||||
|
||||||||||
data, err := io.ReadAll(stream) | ||||||||||
if err != nil { | ||||||||||
mod.logger.Error().Err(err).Msg("reading from stream") | ||||||||||
bryanchriswhite marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
closeStream() | ||||||||||
// NB: abort this goroutine | ||||||||||
// TODO: signal this somewhere? | ||||||||||
return | ||||||||||
} | ||||||||||
defer closeStream() | ||||||||||
|
||||||||||
mod.handleNetworkData(data) | ||||||||||
} | ||||||||||
|
||||||||||
// readFromSubscription is intended to be called in a goroutine. It continuously | ||||||||||
// reads from the subscribed topic in preparation for handling at the network level. | ||||||||||
// Used for handling "broadcast" messages (i.e. no specific target node). | ||||||||||
func (mod *libp2pModule) readFromSubscription(ctx context.Context) { | ||||||||||
for { | ||||||||||
select { | ||||||||||
case <-ctx.Done(): | ||||||||||
return | ||||||||||
default: | ||||||||||
msg, err := mod.subscription.Next(ctx) | ||||||||||
if err != nil { | ||||||||||
mod.logger.Error().Err(err). | ||||||||||
Bool("TODO", true). | ||||||||||
Msg("reading from subscription") | ||||||||||
} | ||||||||||
|
||||||||||
// NB: ignore messages from self | ||||||||||
if msg.ReceivedFrom == mod.host.ID() { | ||||||||||
continue | ||||||||||
} | ||||||||||
|
||||||||||
mod.handleNetworkData(msg.Data) | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
func (mod *libp2pModule) handleNetworkData(data []byte) { | ||||||||||
appMsgData, err := mod.network.HandleNetworkData(data) | ||||||||||
if err != nil { | ||||||||||
mod.logger.Error().Err(err).Msg("handling network data") | ||||||||||
return | ||||||||||
} | ||||||||||
|
||||||||||
// There was no error, but we don't need to forward this to the app-specific bus. | ||||||||||
// For example, the message has already been handled by the application. | ||||||||||
if appMsgData == nil { | ||||||||||
return | ||||||||||
} | ||||||||||
|
||||||||||
networkMessage := messaging.PocketEnvelope{} | ||||||||||
if err := proto.Unmarshal(appMsgData, &networkMessage); err != nil { | ||||||||||
mod.logger.Error().Err(err). | ||||||||||
Bool("TODO", true). | ||||||||||
Msg("Error decoding network message") | ||||||||||
return | ||||||||||
} | ||||||||||
|
||||||||||
event := messaging.PocketEnvelope{ | ||||||||||
Content: networkMessage.Content, | ||||||||||
} | ||||||||||
|
||||||||||
mod.GetBus().PublishEventToBus(&event) | ||||||||||
} | ||||||||||
|
||||||||||
// getMultiaddr returns a multiaddr constructed from the `hostname` and `port` | ||||||||||
// in the P2P config which pas provided upon creation. | ||||||||||
func (mod *libp2pModule) getMultiaddr() (multiaddr.Multiaddr, error) { | ||||||||||
// TECHDEBT: as soon as we add support for multiple transports | ||||||||||
// (i.e. not just TCP), we'll need to do something else. | ||||||||||
return network.Libp2pMultiaddrFromServiceURL(fmt.Sprintf( | ||||||||||
"%s:%d", mod.cfg.Hostname, mod.cfg.Port, | ||||||||||
)) | ||||||||||
} | ||||||||||
|
||||||||||
// newReadStreamDeadline returns a future deadline | ||||||||||
// based on the read stream timeout duration. | ||||||||||
func newReadStreamDeadline() time.Time { | ||||||||||
return time.Now().Add(readStreamTimeoutDuration) | ||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice comments on the sattributes