diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 1bbe7088ed..ac847128b0 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -43,6 +43,11 @@ import ( "github.com/multiformats/go-multiaddr" ) +type SubNextCancellable interface { + Next(ctx context.Context) (*pubsub.Message, error) + Cancel() +} + // Service defines the interface used by the network integrating with underlying p2p implementation type Service interface { Start() error @@ -57,7 +62,7 @@ type Service interface { Conns() []network.Conn ListPeersForTopic(topic string) []peer.ID - Subscribe(topic string, val pubsub.ValidatorEx) (*pubsub.Subscription, error) + Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error) Publish(ctx context.Context, topic string, data []byte) error GetStream(peer.ID) (network.Stream, bool) diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index 372c9249c8..5d1c144632 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -133,7 +133,7 @@ func (s *serviceImpl) getOrCreateTopic(topicName string) (*pubsub.Topic, error) } // Subscribe returns a subscription to the given topic -func (s *serviceImpl) Subscribe(topic string, val pubsub.ValidatorEx) (*pubsub.Subscription, error) { +func (s *serviceImpl) Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error) { if err := s.pubsub.RegisterTopicValidator(topic, val); err != nil { return nil, err } diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index a3879f938f..8fd5dc9004 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -69,6 +69,7 @@ type P2PNetwork struct { wsPeersConnectivityCheckTicker *time.Ticker relayMessages bool // True if we should relay messages from other nodes (nominally true for relays, false otherwise) + wantTXGossip atomic.Bool capabilitiesDiscovery *p2p.CapabilitiesDiscovery @@ -160,6 +161,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo return nil, err } + relayMessages := cfg.IsGossipServer() || cfg.ForceRelayMessages net := &P2PNetwork{ log: log, config: cfg, @@ -171,8 +173,10 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo peerStats: make(map[peer.ID]*p2pPeerStats), nodeInfo: node, pstore: pstore, - relayMessages: cfg.IsGossipServer() || cfg.ForceRelayMessages, + relayMessages: relayMessages, } + net.wantTXGossip.Store(relayMessages || cfg.ForceFetchTransactions || node.IsParticipating()) + net.ctx, net.ctxCancel = context.WithCancel(context.Background()) net.handler = msgHandler{ ctx: net.ctx, @@ -244,13 +248,15 @@ func (n *P2PNetwork) PeerIDSigner() identityChallengeSigner { // Start threads, listen on sockets. func (n *P2PNetwork) Start() error { - n.wg.Add(1) n.bootstrapper.start() err := n.service.Start() if err != nil { return err } - go n.txTopicHandleLoop() + if n.wantTXGossip.Load() { + n.wg.Add(1) + go n.txTopicHandleLoop() + } if n.wsPeersConnectivityCheckTicker != nil { n.wsPeersConnectivityCheckTicker.Stop() @@ -267,7 +273,6 @@ func (n *P2PNetwork) Start() error { n.wg.Add(1) go n.broadcaster.broadcastThread(&n.wg, n) - // n.service.DialPeersUntilTargetCount(n.config.GossipFanout) n.wg.Add(1) go n.meshThread() @@ -320,8 +325,9 @@ func (n *P2PNetwork) innerStop() { func (n *P2PNetwork) meshThread() { defer n.wg.Done() - timer := time.NewTicker(meshThreadInterval) + timer := time.NewTicker(1) // start immediately and reset after defer timer.Stop() + var resetTimer bool for { // get some relay nodes var peers []peer.AddrInfo @@ -339,11 +345,13 @@ func (n *P2PNetwork) meshThread() { n.pstore.ReplacePeerList(replace, string(n.networkID), phonebook.PhoneBookEntryRelayRole) } } - // call immediately and then every interval - n.service.DialPeersUntilTargetCount(n.config.GossipFanout) select { case <-timer.C: n.service.DialPeersUntilTargetCount(n.config.GossipFanout) + if !resetTimer { + timer.Reset(meshThreadInterval) + resetTimer = true + } case <-n.ctx.Done(): return } @@ -585,7 +593,19 @@ func (n *P2PNetwork) GetHTTPClient(address string) (*http.Client, error) { // this is the only indication that we have that we haven't formed a clique, where all incoming messages // arrive very quickly, but might be missing some votes. The usage of this call is expected to have similar // characteristics as with a watchdog timer. -func (n *P2PNetwork) OnNetworkAdvance() {} +func (n *P2PNetwork) OnNetworkAdvance() { + if n.nodeInfo != nil { + old := n.wantTXGossip.Load() + new := n.nodeInfo.IsParticipating() + if old != new { + n.wantTXGossip.Store(new) + if new { + n.wg.Add(1) + go n.txTopicHandleLoop() + } + } + } +} // GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same // request that was provided to the http handler ( or provide a fallback Context() to that ) @@ -760,6 +780,13 @@ func (n *P2PNetwork) txTopicHandleLoop() { // from gossipsub's point of view, it's just waiting to hear back from the validator, // and txHandler does all its work in the validator, so we don't need to do anything here _ = msg + + // participation or configuration change, cancel subscription and quit + if !n.wantTXGossip.Load() { + n.log.Debugf("Canceling subscription to topic %s", p2p.TXTopicName) + sub.Cancel() + return + } } } diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index c658633184..0467dff04d 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -47,6 +47,7 @@ func TestP2PSubmitTX(t *testing.T) { partitiontest.PartitionTest(t) cfg := config.GetDefaultLocal() + cfg.ForceFetchTransactions = true log := logging.TestingLog(t) netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) require.NoError(t, err) @@ -66,7 +67,6 @@ func TestP2PSubmitTX(t *testing.T) { defer netB.Stop() netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) - require.NoError(t, err) netC.Start() defer netC.Stop() @@ -81,7 +81,10 @@ func TestP2PSubmitTX(t *testing.T) { 2*time.Second, 50*time.Millisecond, ) - time.Sleep(time.Second) // give time for peers to connect. + require.Eventually(t, func() bool { + return len(netA.wsPeers) > 0 && len(netB.wsPeers) > 0 && len(netC.wsPeers) > 0 + }, 2*time.Second, 50*time.Millisecond) + // now we should be connected in a line: B <-> A <-> C where both B and C are connected to A but not each other // Since we aren't using the transaction handler in this test, we need to register a pass-through handler @@ -117,6 +120,88 @@ func TestP2PSubmitTX(t *testing.T) { ) } +// TestP2PSubmitTXNoGossip tests nodes without gossip enabled cannot receive transactions +func TestP2PSubmitTXNoGossip(t *testing.T) { + partitiontest.PartitionTest(t) + + cfg := config.GetDefaultLocal() + cfg.ForceFetchTransactions = true + log := logging.TestingLog(t) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + require.NoError(t, err) + netA.Start() + defer netA.Stop() + + peerInfoA := netA.service.AddrInfo() + addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA) + require.NoError(t, err) + require.NotZero(t, addrsA[0]) + + multiAddrStr := addrsA[0].String() + phoneBookAddresses := []string{multiAddrStr} + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + require.NoError(t, err) + netB.Start() + defer netB.Stop() + + require.Eventually( + t, + func() bool { + return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) == 1 && + len(netB.service.ListPeersForTopic(p2p.TXTopicName)) == 1 + }, + 2*time.Second, + 50*time.Millisecond, + ) + + // run netC in NPN mode (no relay => no gossip sup => no TX receiving) + cfg.ForceFetchTransactions = false + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + require.NoError(t, err) + netC.Start() + defer netC.Stop() + + require.Eventually(t, func() bool { + return len(netA.wsPeers) > 0 && len(netB.wsPeers) > 0 && len(netC.wsPeers) > 0 + }, 2*time.Second, 50*time.Millisecond) + + // ensure netC cannot receive messages + passThroughHandler := []TaggedMessageHandler{ + {Tag: protocol.TxnTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage { + return OutgoingMessage{Action: Broadcast} + })}, + } + + netB.RegisterHandlers(passThroughHandler) + netC.RegisterHandlers(passThroughHandler) + for i := 0; i < 10; i++ { + err = netA.Broadcast(context.Background(), protocol.TxnTag, []byte(fmt.Sprintf("test %d", i)), false, nil) + require.NoError(t, err) + } + + // check netB received the messages + require.Eventually( + t, + func() bool { + netB.peerStatsMu.Lock() + netBpeerStatsA, ok := netB.peerStats[netA.service.ID()] + netB.peerStatsMu.Unlock() + if !ok { + return false + } + return netBpeerStatsA.txReceived.Load() == 10 + }, + 1*time.Second, + 50*time.Millisecond, + ) + + // check netB did not receive the messages + netC.peerStatsMu.Lock() + _, ok := netC.peerStats[netA.service.ID()] + netC.peerStatsMu.Unlock() + require.False(t, ok) +} + func TestP2PSubmitWS(t *testing.T) { partitiontest.PartitionTest(t) @@ -148,17 +233,10 @@ func TestP2PSubmitWS(t *testing.T) { require.NoError(t, err) defer netC.Stop() - require.Eventually( - t, - func() bool { - return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) == 2 && - len(netB.service.ListPeersForTopic(p2p.TXTopicName)) == 1 && - len(netC.service.ListPeersForTopic(p2p.TXTopicName)) == 1 - }, - 2*time.Second, - 50*time.Millisecond, - ) - time.Sleep(time.Second) // XX give time for peers to connect. Knowing about them being subscribed to topics is clearly not enough + require.Eventually(t, func() bool { + return len(netA.wsPeers) > 0 && len(netB.wsPeers) > 0 && len(netC.wsPeers) > 0 + }, 2*time.Second, 50*time.Millisecond) + // now we should be connected in a line: B <-> A <-> C where both B and C are connected to A but not each other testTag := protocol.AgreementVoteTag @@ -242,7 +320,7 @@ func (s *mockService) ListPeersForTopic(topic string) []peer.ID { return nil } -func (s *mockService) Subscribe(topic string, val pubsub.ValidatorEx) (*pubsub.Subscription, error) { +func (s *mockService) Subscribe(topic string, val pubsub.ValidatorEx) (p2p.SubNextCancellable, error) { return nil, nil } func (s *mockService) Publish(ctx context.Context, topic string, data []byte) error { @@ -345,7 +423,7 @@ func (r *mockResolver) Resolve(ctx context.Context, _ ma.Multiaddr) ([]ma.Multia return []ma.Multiaddr{maddr}, err } -func TestBootstrapFunc(t *testing.T) { +func TestP2PBootstrapFunc(t *testing.T) { t.Parallel() partitiontest.PartitionTest(t) @@ -373,7 +451,7 @@ func TestBootstrapFunc(t *testing.T) { require.GreaterOrEqual(t, len(addr.Addrs), 1) } -func TestGetBootstrapPeersFailure(t *testing.T) { +func TestP2PGetBootstrapPeersFailure(t *testing.T) { t.Parallel() partitiontest.PartitionTest(t) @@ -387,7 +465,7 @@ func TestGetBootstrapPeersFailure(t *testing.T) { require.Equal(t, 0, len(addrs)) } -func TestGetBootstrapPeersInvalidAddr(t *testing.T) { +func TestP2PGetBootstrapPeersInvalidAddr(t *testing.T) { t.Parallel() partitiontest.PartitionTest(t) @@ -475,16 +553,10 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { require.NoError(t, err) defer netC.Stop() - require.Eventually( - t, - func() bool { - return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) > 0 && - len(netB.service.ListPeersForTopic(p2p.TXTopicName)) > 0 && - len(netC.service.ListPeersForTopic(p2p.TXTopicName)) > 0 - }, - 2*time.Second, - 50*time.Millisecond, - ) + require.Eventually(t, func() bool { + return len(netA.wsPeers) > 0 && len(netB.wsPeers) > 0 && len(netC.wsPeers) > 0 + }, 2*time.Second, 50*time.Millisecond) + t.Logf("peers connected") nets := []*P2PNetwork{netA, netB, netC} @@ -557,7 +629,7 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) { } // TestMultiaddrConversionToFrom ensures Multiaddr can be serialized back to an address without losing information -func TestMultiaddrConversionToFrom(t *testing.T) { +func TestP2PMultiaddrConversionToFrom(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() @@ -654,6 +726,7 @@ func TestP2PRelay(t *testing.T) { partitiontest.PartitionTest(t) cfg := config.GetDefaultLocal() + cfg.ForceFetchTransactions = true log := logging.TestingLog(t) netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) require.NoError(t, err) @@ -761,3 +834,60 @@ func TestP2PRelay(t *testing.T) { } } } + +type mockSubPService struct { + mockService + count int +} + +type mockSubscription struct { +} + +func (m *mockSubscription) Next(ctx context.Context) (*pubsub.Message, error) { return nil, nil } +func (m *mockSubscription) Cancel() {} + +func (m *mockSubPService) Subscribe(topic string, val pubsub.ValidatorEx) (p2p.SubNextCancellable, error) { + m.count++ + return &mockSubscription{}, nil +} + +// TestP2PWantTXGossip checks txTopicHandleLoop runs as expected on wantTXGossip changes +func TestP2PWantTXGossip(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + // cancelled context to trigger subscription.Next to return + ctx, cancel := context.WithCancel(context.Background()) + cancel() + mockService := &mockSubPService{} + net := &P2PNetwork{ + ctx: ctx, + nodeInfo: &nopeNodeInfo{}, + service: mockService, + } + net.wantTXGossip.Store(false) + + // ensure wantTXGossip from false to false is noop + net.OnNetworkAdvance() + require.Eventually(t, func() bool { net.wg.Wait(); return true }, 1*time.Second, 50*time.Millisecond) + require.Equal(t, 0, mockService.count) + require.False(t, net.wantTXGossip.Load()) + + // ensure wantTXGossip from true (wantTXGossip) to false (nopeNodeInfo) is noop + net.wantTXGossip.Store(true) + net.OnNetworkAdvance() + require.Eventually(t, func() bool { net.wg.Wait(); return true }, 1*time.Second, 50*time.Millisecond) + require.Equal(t, 0, mockService.count) + require.False(t, net.wantTXGossip.Load()) + + // check false to true change triggers subscription + net.nodeInfo = &participatingNodeInfo{} + net.OnNetworkAdvance() + require.Eventually(t, func() bool { return mockService.count == 1 }, 1*time.Second, 50*time.Millisecond) + require.True(t, net.wantTXGossip.Load()) + + // check true to true change is noop + net.OnNetworkAdvance() + require.Eventually(t, func() bool { return mockService.count == 1 }, 1*time.Second, 50*time.Millisecond) + require.True(t, net.wantTXGossip.Load()) +} diff --git a/node/node.go b/node/node.go index 3529ae3111..33c3ca865c 100644 --- a/node/node.go +++ b/node/node.go @@ -399,7 +399,6 @@ func (node *AlgorandFullNode) Capabilities() []p2p.Capability { if node.config.StoresCatchpoints() { caps = append(caps, p2p.Catchpoints) } - // TODO: change to a separate if node.config.EnableGossipService && node.config.IsGossipServer() { caps = append(caps, p2p.Gossip) }