Skip to content

Commit

Permalink
unsubscribe non-part nodes from TX traffic
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Feb 15, 2024
1 parent 4959d39 commit d05816c
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 39 deletions.
7 changes: 6 additions & 1 deletion network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ import (
"github.com/multiformats/go-multiaddr"
)

type SubNextCancellable interface {

Check failure on line 46 in network/p2p/p2p.go

View workflow job for this annotation

GitHub Actions / reviewdog-errors

[Lint Errors] reported by reviewdog 🐶 exported: exported type SubNextCancellable should have comment or be unexported (revive) Raw Output: network/p2p/p2p.go:46:6: exported: exported type SubNextCancellable should have comment or be unexported (revive) type SubNextCancellable interface { ^
Next(ctx context.Context) (*pubsub.Message, error)
Cancel()
}

// Service defines the interface used by the network integrating with underlying p2p implementation
type Service interface {
Start() error
Expand All @@ -57,7 +62,7 @@ type Service interface {

Conns() []network.Conn
ListPeersForTopic(topic string) []peer.ID
Subscribe(topic string, val pubsub.ValidatorEx) (*pubsub.Subscription, error)
Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error)
Publish(ctx context.Context, topic string, data []byte) error

GetStream(peer.ID) (network.Stream, bool)
Expand Down
2 changes: 1 addition & 1 deletion network/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (s *serviceImpl) getOrCreateTopic(topicName string) (*pubsub.Topic, error)
}

// Subscribe returns a subscription to the given topic
func (s *serviceImpl) Subscribe(topic string, val pubsub.ValidatorEx) (*pubsub.Subscription, error) {
func (s *serviceImpl) Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error) {
if err := s.pubsub.RegisterTopicValidator(topic, val); err != nil {
return nil, err
}
Expand Down
43 changes: 35 additions & 8 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type P2PNetwork struct {
wsPeersConnectivityCheckTicker *time.Ticker

relayMessages bool // True if we should relay messages from other nodes (nominally true for relays, false otherwise)
wantTXGossip atomic.Bool

capabilitiesDiscovery *p2p.CapabilitiesDiscovery

Expand Down Expand Up @@ -160,6 +161,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
return nil, err
}

relayMessages := cfg.IsGossipServer() || cfg.ForceRelayMessages
net := &P2PNetwork{
log: log,
config: cfg,
Expand All @@ -171,8 +173,10 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
peerStats: make(map[peer.ID]*p2pPeerStats),
nodeInfo: node,
pstore: pstore,
relayMessages: cfg.IsGossipServer() || cfg.ForceRelayMessages,
relayMessages: relayMessages,
}
net.wantTXGossip.Store(relayMessages || cfg.ForceFetchTransactions || node.IsParticipating())

net.ctx, net.ctxCancel = context.WithCancel(context.Background())
net.handler = msgHandler{
ctx: net.ctx,
Expand Down Expand Up @@ -244,13 +248,15 @@ func (n *P2PNetwork) PeerIDSigner() identityChallengeSigner {

// Start threads, listen on sockets.
func (n *P2PNetwork) Start() error {
n.wg.Add(1)
n.bootstrapper.start()
err := n.service.Start()
if err != nil {
return err
}
go n.txTopicHandleLoop()
if n.wantTXGossip.Load() {
n.wg.Add(1)
go n.txTopicHandleLoop()
}

if n.wsPeersConnectivityCheckTicker != nil {
n.wsPeersConnectivityCheckTicker.Stop()
Expand All @@ -267,7 +273,6 @@ func (n *P2PNetwork) Start() error {

n.wg.Add(1)
go n.broadcaster.broadcastThread(&n.wg, n)
// n.service.DialPeersUntilTargetCount(n.config.GossipFanout)

n.wg.Add(1)
go n.meshThread()
Expand Down Expand Up @@ -320,8 +325,9 @@ func (n *P2PNetwork) innerStop() {

func (n *P2PNetwork) meshThread() {
defer n.wg.Done()
timer := time.NewTicker(meshThreadInterval)
timer := time.NewTicker(1) // start immediately and reset after
defer timer.Stop()
var resetTimer bool
for {
// get some relay nodes
var peers []peer.AddrInfo
Expand All @@ -339,11 +345,13 @@ func (n *P2PNetwork) meshThread() {
n.pstore.ReplacePeerList(replace, string(n.networkID), phonebook.PhoneBookEntryRelayRole)
}
}
// call immediately and then every interval
n.service.DialPeersUntilTargetCount(n.config.GossipFanout)
select {
case <-timer.C:
n.service.DialPeersUntilTargetCount(n.config.GossipFanout)
if !resetTimer {
timer.Reset(meshThreadInterval)
resetTimer = true
}
case <-n.ctx.Done():
return
}
Expand Down Expand Up @@ -585,7 +593,19 @@ func (n *P2PNetwork) GetHTTPClient(address string) (*http.Client, error) {
// this is the only indication that we have that we haven't formed a clique, where all incoming messages
// arrive very quickly, but might be missing some votes. The usage of this call is expected to have similar
// characteristics as with a watchdog timer.
func (n *P2PNetwork) OnNetworkAdvance() {}
func (n *P2PNetwork) OnNetworkAdvance() {
if n.nodeInfo != nil {
old := n.wantTXGossip.Load()
new := n.nodeInfo.IsParticipating()
if old != new {
n.wantTXGossip.Store(new)
if new {
n.wg.Add(1)
go n.txTopicHandleLoop()
}
}
}
}

// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same
// request that was provided to the http handler ( or provide a fallback Context() to that )
Expand Down Expand Up @@ -760,6 +780,13 @@ func (n *P2PNetwork) txTopicHandleLoop() {
// from gossipsub's point of view, it's just waiting to hear back from the validator,
// and txHandler does all its work in the validator, so we don't need to do anything here
_ = msg

// participation or configuration change, cancel subscription and quit
if !n.wantTXGossip.Load() {
n.log.Debugf("Canceling subscription to topic %s", p2p.TXTopicName)
sub.Cancel()
return
}
}
}

Expand Down
Loading

0 comments on commit d05816c

Please sign in to comment.