From 06337268efaacdf949eac8bc4a0debc110500b14 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Fri, 10 Jun 2022 15:09:22 -0400 Subject: [PATCH 1/2] network: message-of-interest tiny bugfix (#4107) --- network/wsNetwork.go | 42 ++++----- network/wsNetwork_test.go | 180 +++++++++++++++++++++++--------------- 2 files changed, 131 insertions(+), 91 deletions(-) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 68394a924c..1a4070fd18 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -417,8 +417,8 @@ type WebsocketNetwork struct { // messagesOfInterestMu protects messagesOfInterest and ensures // that messagesOfInterestEnc does not change once it is set during // network start. - messagesOfInterestMu deadlock.Mutex - messagesOfInterestCond *sync.Cond + messagesOfInterestMu deadlock.Mutex + messagesOfInterestRefresh chan struct{} // peersConnectivityCheckTicker is the timer for testing that all the connected peers // are still transmitting or receiving information. The channel produced by this ticker @@ -764,7 +764,7 @@ func (wn *WebsocketNetwork) setup() { SupportedProtocolVersions = []string{wn.config.NetworkProtocolVersion} } - wn.messagesOfInterestCond = sync.NewCond(&wn.messagesOfInterestMu) + wn.messagesOfInterestRefresh = make(chan struct{}, 2) wn.messagesOfInterestGeneration = 1 // something nonzero so that any new wsPeer needs updating if wn.relayMessages { wn.RegisterMessageInterest(protocol.CompactCertSigTag) @@ -1178,7 +1178,7 @@ func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOf if messagesOfInterestEnc != nil { peer.sendMessagesOfInterest(messagesOfInterestGeneration, messagesOfInterestEnc) } else { - wn.log.Infof("msgOfInterest Enc=nil") + wn.log.Infof("msgOfInterest Enc=nil, MOIGen=%d", messagesOfInterestGeneration) } } } @@ -1726,14 +1726,10 @@ func (wn *WebsocketNetwork) OnNetworkAdvance() { defer wn.lastNetworkAdvanceMu.Unlock() wn.lastNetworkAdvance = time.Now().UTC() if wn.nodeInfo != nil && !wn.relayMessages && !wn.config.ForceFetchTransactions { - // if we're not a relay, and not participating, we don't need txn pool - wantTXGossip := wn.nodeInfo.IsParticipating() - if wantTXGossip && (wn.wantTXGossip != wantTXGossipYes) { - wn.RegisterMessageInterest(protocol.TxnTag) - wn.wantTXGossip = wantTXGossipYes - } else if !wantTXGossip && (wn.wantTXGossip != wantTXGossipNo) { - wn.DeregisterMessageInterest(protocol.TxnTag) - wn.wantTXGossip = wantTXGossipNo + select { + case wn.messagesOfInterestRefresh <- struct{}{}: + default: + // if the notify chan is full, it will get around to updating the latest when it actually runs } } } @@ -2350,18 +2346,24 @@ func (wn *WebsocketNetwork) updateMessagesOfInterestEnc() { wn.messagesOfInterestEnc = MarshallMessageOfInterestMap(wn.messagesOfInterest) wn.messagesOfInterestEncoded = true atomic.AddUint32(&wn.messagesOfInterestGeneration, 1) - wn.messagesOfInterestCond.Broadcast() + var peers []*wsPeer + peers, _ = wn.peerSnapshot(peers) + for _, peer := range peers { + wn.maybeSendMessagesOfInterest(peer, wn.messagesOfInterestEnc) + } } func (wn *WebsocketNetwork) postMessagesOfInterestThread() { - var peers []*wsPeer - wn.messagesOfInterestMu.Lock() - defer wn.messagesOfInterestMu.Unlock() for { - wn.messagesOfInterestCond.Wait() - peers, _ = wn.peerSnapshot(peers) - for _, peer := range peers { - wn.maybeSendMessagesOfInterest(peer, wn.messagesOfInterestEnc) + <-wn.messagesOfInterestRefresh + // if we're not a relay, and not participating, we don't need txn pool + wantTXGossip := wn.nodeInfo.IsParticipating() + if wantTXGossip && (wn.wantTXGossip != wantTXGossipYes) { + wn.RegisterMessageInterest(protocol.TxnTag) + atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipYes) + } else if !wantTXGossip && (wn.wantTXGossip != wantTXGossipNo) { + wn.DeregisterMessageInterest(protocol.TxnTag) + atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipNo) } } } diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 02294f0c01..424586f019 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -219,6 +219,13 @@ func waitReady(t testing.TB, wn *WebsocketNetwork, timeout <-chan time.Time) boo } } +func netStop(t testing.TB, wn *WebsocketNetwork, name string) { + t.Logf("stopping %s", name) + wn.Stop() + time.Sleep(time.Millisecond) // Stop is imperfect and some worker threads can log an error after Stop and that causes a testing error + t.Logf("%s done", name) +} + // Set up two nodes, test that a.Broadcast is received by B func TestWebsocketNetworkBasic(t *testing.T) { partitiontest.PartitionTest(t) @@ -226,7 +233,7 @@ func TestWebsocketNetworkBasic(t *testing.T) { netA := makeTestWebsocketNode(t) netA.config.GossipFanout = 1 netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") netB := makeTestWebsocketNode(t) netB.config.GossipFanout = 1 addrA, postListen := netA.Address() @@ -234,7 +241,7 @@ func TestWebsocketNetworkBasic(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") counter := newMessageCounter(t, 2) counterDone := counter.done netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) @@ -262,7 +269,7 @@ func TestWebsocketNetworkUnicast(t *testing.T) { netA := makeTestWebsocketNode(t) netA.config.GossipFanout = 1 netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") netB := makeTestWebsocketNode(t) netB.config.GossipFanout = 1 addrA, postListen := netA.Address() @@ -270,7 +277,7 @@ func TestWebsocketNetworkUnicast(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") counter := newMessageCounter(t, 2) counterDone := counter.done netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) @@ -303,7 +310,7 @@ func TestWebsocketPeerData(t *testing.T) { netA := makeTestWebsocketNode(t) netA.config.GossipFanout = 1 netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") netB := makeTestWebsocketNode(t) netB.config.GossipFanout = 1 addrA, postListen := netA.Address() @@ -311,7 +318,7 @@ func TestWebsocketPeerData(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") counter := newMessageCounter(t, 2) netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) @@ -341,7 +348,7 @@ func TestWebsocketNetworkArray(t *testing.T) { netA := makeTestWebsocketNode(t) netA.config.GossipFanout = 1 netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") netB := makeTestWebsocketNode(t) netB.config.GossipFanout = 1 addrA, postListen := netA.Address() @@ -349,7 +356,7 @@ func TestWebsocketNetworkArray(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") counter := newMessageCounter(t, 3) counterDone := counter.done netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) @@ -378,7 +385,7 @@ func TestWebsocketNetworkCancel(t *testing.T) { netA := makeTestWebsocketNode(t) netA.config.GossipFanout = 1 netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") netB := makeTestWebsocketNode(t) netB.config.GossipFanout = 1 addrA, postListen := netA.Address() @@ -386,7 +393,7 @@ func TestWebsocketNetworkCancel(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") counter := newMessageCounter(t, 100) counterDone := counter.done netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) @@ -461,7 +468,7 @@ func TestWebsocketNetworkNoAddress(t *testing.T) { netA := makeTestWebsocketNode(t) netA.config.GossipFanout = 1 netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") noAddressConfig := defaultConfig noAddressConfig.NetAddress = "" @@ -472,7 +479,7 @@ func TestWebsocketNetworkNoAddress(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") counter := newMessageCounter(t, 2) counterDone := counter.done netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) @@ -886,7 +893,7 @@ func TestDupFilter(t *testing.T) { netA := makeTestFilterWebsocketNode(t, "a") netA.config.GossipFanout = 1 netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") netB := makeTestFilterWebsocketNode(t, "b") netB.config.GossipFanout = 2 addrA, postListen := netA.Address() @@ -894,7 +901,7 @@ func TestDupFilter(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") counter := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})} netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}}) debugTag2 := protocol.ProposalPayloadTag @@ -1032,7 +1039,7 @@ func BenchmarkWebsocketNetworkBasic(t *testing.B) { netA := makeTestWebsocketNode(t) netA.config.GossipFanout = 1 netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") netB := makeTestWebsocketNode(t) netB.config.GossipFanout = 1 addrA, postListen := netA.Address() @@ -1040,7 +1047,7 @@ func BenchmarkWebsocketNetworkBasic(t *testing.B) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") returns := make(chan uint64, 100) bhandler := benchmarkHandler{returns} netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: &bhandler}}) @@ -1109,7 +1116,7 @@ func TestWebsocketNetworkPrio(t *testing.T) { netA.config.GossipFanout = 1 netA.prioResponseChan = make(chan *wsPeer, 10) netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") prioB := netPrioStub{} crypto.RandBytes(prioB.addr[:]) @@ -1122,7 +1129,7 @@ func TestWebsocketNetworkPrio(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") // Wait for response message to propagate from B to A select { @@ -1154,7 +1161,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) { netA.config.GossipFanout = 2 netA.prioResponseChan = make(chan *wsPeer, 10) netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") addrA, postListen := netA.Address() require.True(t, postListen) @@ -1170,7 +1177,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) { netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counterB}}) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") counterC := newMessageCounter(t, 1) counterCdone := counterC.done @@ -1382,7 +1389,7 @@ func TestDelayedMessageDrop(t *testing.T) { netA := makeTestWebsocketNode(t) netA.config.GossipFanout = 1 netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") noAddressConfig := defaultConfig noAddressConfig.NetAddress = "" @@ -1393,7 +1400,7 @@ func TestDelayedMessageDrop(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") counter := newMessageCounter(t, 5) counterDone := counter.done netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) @@ -1435,7 +1442,7 @@ func TestSlowPeerDisconnection(t *testing.T) { netA := wn netA.config.GossipFanout = 1 netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") noAddressConfig := defaultConfig noAddressConfig.NetAddress = "" @@ -1446,7 +1453,7 @@ func TestSlowPeerDisconnection(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") readyTimeout := time.NewTimer(2 * time.Second) waitReady(t, netA, readyTimeout.C) @@ -1508,7 +1515,7 @@ func TestForceMessageRelaying(t *testing.T) { netA := wn netA.config.GossipFanout = 1 - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") counter := newMessageCounter(t, 5) counterDone := counter.done @@ -1523,7 +1530,7 @@ func TestForceMessageRelaying(t *testing.T) { netB.config.GossipFanout = 1 netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") noAddressConfig.ForceRelayMessages = true netC := makeTestWebsocketNodeWithConfig(t, noAddressConfig) @@ -1672,7 +1679,7 @@ func TestWebsocketNetworkTopicRoundtrip(t *testing.T) { netA := makeTestWebsocketNode(t) netA.config.GossipFanout = 1 netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") netB := makeTestWebsocketNode(t) netB.config.GossipFanout = 1 addrA, postListen := netA.Address() @@ -1680,7 +1687,7 @@ func TestWebsocketNetworkTopicRoundtrip(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") netB.RegisterHandlers([]TaggedMessageHandler{ { @@ -1729,6 +1736,41 @@ var ( testTags = []protocol.Tag{ft1, ft2, ft3, ft4} ) +func waitPeerInternalChanQuiet(t *testing.T, netA *WebsocketNetwork) { + // okay, but now we need to wait for asynchronous thread within netA to _apply_ the MOI to its peer for netB... + timeout := time.Now().Add(100 * time.Millisecond) + waiting := true + for waiting { + time.Sleep(1 * time.Millisecond) + peers := netA.GetPeers(PeersConnectedIn) + for _, pg := range peers { + wp := pg.(*wsPeer) + if len(wp.sendBufferHighPrio)+len(wp.sendBufferBulk) == 0 { + waiting = false + break + } + } + if time.Now().After(timeout) { + for _, pg := range peers { + wp := pg.(*wsPeer) + if len(wp.sendBufferHighPrio)+len(wp.sendBufferBulk) == 0 { + t.Fatalf("netA peer buff empty timeout len(high)=%d, len(bulk)=%d", len(wp.sendBufferHighPrio), len(wp.sendBufferBulk)) + } + } + } + } +} + +func waitForMOIRefreshQuiet(netB *WebsocketNetwork) { + for { + // wait for async messagesOfInterestRefresh + time.Sleep(time.Millisecond) + if len(netB.messagesOfInterestRefresh) == 0 { + break + } + } +} + // Set up two nodes, have one of them request a certain message tag mask, and verify the other follow that. func TestWebsocketNetworkMessageOfInterest(t *testing.T) { partitiontest.PartitionTest(t) @@ -1738,7 +1780,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) { netA.config.EnablePingHandler = false netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") netB := makeTestWebsocketNode(t) netB.config.GossipFanout = 1 netB.config.EnablePingHandler = false @@ -1747,7 +1789,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") incomingMsgSync := deadlock.Mutex{} msgCounters := make(map[protocol.Tag]int) @@ -1802,28 +1844,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) { // send another message which we can track, so that we'll know that the first message was delivered. netB.Broadcast(context.Background(), protocol.VoteBundleTag, []byte{0, 1, 2, 3, 4}, true, nil) messageFilterArriveWg.Wait() - // okay, but now we need to wait for asynchronous thread within netA to _apply_ the MOI to its peer for netB... - timeout := time.Now().Add(100 * time.Millisecond) - waiting := true - for waiting { - time.Sleep(1 * time.Millisecond) - peers := netA.GetPeers(PeersConnectedIn) - for _, pg := range peers { - wp := pg.(*wsPeer) - if len(wp.sendBufferHighPrio)+len(wp.sendBufferBulk) == 0 { - waiting = false - break - } - } - if time.Now().After(timeout) { - for _, pg := range peers { - wp := pg.(*wsPeer) - if len(wp.sendBufferHighPrio)+len(wp.sendBufferBulk) == 0 { - t.Fatalf("netA peer buff empty timeout len(high)=%d, len(bulk)=%d", len(wp.sendBufferHighPrio), len(wp.sendBufferBulk)) - } - } - } - } + waitPeerInternalChanQuiet(t, netA) messageArriveWg.Add(5 * 2) // we're expecting exactly 10 messages. // send 5 messages of few types. @@ -1872,7 +1893,7 @@ func TestWebsocketNetworkTXMessageOfInterestRelay(t *testing.T) { netA.config.EnablePingHandler = false netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") bConfig := defaultConfig bConfig.NetAddress = "" bConfig.ForceRelayMessages = true @@ -1884,12 +1905,13 @@ func TestWebsocketNetworkTXMessageOfInterestRelay(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") incomingMsgSync := deadlock.Mutex{} msgCounters := make(map[protocol.Tag]int) messageArriveWg := sync.WaitGroup{} msgHandler := func(msg IncomingMessage) (out OutgoingMessage) { + t.Logf("A->B %s", msg.Tag) incomingMsgSync.Lock() defer incomingMsgSync.Unlock() msgCounters[msg.Tag] = msgCounters[msg.Tag] + 1 @@ -1923,6 +1945,7 @@ func TestWebsocketNetworkTXMessageOfInterestRelay(t *testing.T) { waitReady(t, netB, readyTimeout.C) netB.OnNetworkAdvance() + waitForMOIRefreshQuiet(netB) // send another message which we can track, so that we'll know that the first message was delivered. netB.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil) messageFilterArriveWg.Wait() @@ -1954,7 +1977,7 @@ func TestWebsocketNetworkTXMessageOfInterestForceTx(t *testing.T) { netA.config.EnablePingHandler = false netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") bConfig := defaultConfig bConfig.NetAddress = "" bConfig.ForceFetchTransactions = true @@ -1966,12 +1989,13 @@ func TestWebsocketNetworkTXMessageOfInterestForceTx(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") incomingMsgSync := deadlock.Mutex{} msgCounters := make(map[protocol.Tag]int) messageArriveWg := sync.WaitGroup{} msgHandler := func(msg IncomingMessage) (out OutgoingMessage) { + t.Logf("A->B %s", msg.Tag) incomingMsgSync.Lock() defer incomingMsgSync.Unlock() msgCounters[msg.Tag] = msgCounters[msg.Tag] + 1 @@ -2005,6 +2029,7 @@ func TestWebsocketNetworkTXMessageOfInterestForceTx(t *testing.T) { waitReady(t, netB, readyTimeout.C) netB.OnNetworkAdvance() + waitForMOIRefreshQuiet(netB) // send another message which we can track, so that we'll know that the first message was delivered. netB.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil) messageFilterArriveWg.Wait() @@ -2034,7 +2059,7 @@ func TestWebsocketNetworkTXMessageOfInterestNPN(t *testing.T) { netA.config.GossipFanout = 1 netA.config.EnablePingHandler = false netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") bConfig := defaultConfig bConfig.NetAddress = "" @@ -2046,14 +2071,15 @@ func TestWebsocketNetworkTXMessageOfInterestNPN(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") require.False(t, netB.relayMessages) - require.Equal(t, uint32(wantTXGossipUnk), netB.wantTXGossip) + require.Equal(t, uint32(wantTXGossipUnk), atomic.LoadUint32(&netB.wantTXGossip)) incomingMsgSync := deadlock.Mutex{} msgCounters := make(map[protocol.Tag]int) messageArriveWg := sync.WaitGroup{} msgHandler := func(msg IncomingMessage) (out OutgoingMessage) { + t.Logf("A->B %s", msg.Tag) incomingMsgSync.Lock() defer incomingMsgSync.Unlock() msgCounters[msg.Tag] = msgCounters[msg.Tag] + 1 @@ -2087,12 +2113,18 @@ func TestWebsocketNetworkTXMessageOfInterestNPN(t *testing.T) { waitReady(t, netB, readyTimeout.C) netB.OnNetworkAdvance() - // TODO: better event driven thing for netB sending new MOI - time.Sleep(10 * time.Millisecond) - require.Equal(t, uint32(wantTXGossipNo), netB.wantTXGossip) + waitForMOIRefreshQuiet(netB) + for i := 0; i < 10; i++ { + if atomic.LoadUint32(&netB.wantTXGossip) == uint32(wantTXGossipNo) { + break + } + time.Sleep(time.Millisecond) + } + require.Equal(t, uint32(wantTXGossipNo), atomic.LoadUint32(&netB.wantTXGossip)) // send another message which we can track, so that we'll know that the first message was delivered. netB.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil) messageFilterArriveWg.Wait() + waitPeerInternalChanQuiet(t, netA) messageArriveWg.Add(5 * 3) // we're expecting exactly 15 messages. // send 5 messages of few types. @@ -2105,7 +2137,7 @@ func TestWebsocketNetworkTXMessageOfInterestNPN(t *testing.T) { // wait until all the expected messages arrive. messageArriveWg.Wait() incomingMsgSync.Lock() - require.Equal(t, 3, len(msgCounters)) + require.Equal(t, 3, len(msgCounters), msgCounters) for tag, count := range msgCounters { if tag == protocol.TxnTag { require.Equal(t, 0, count) @@ -2131,7 +2163,7 @@ func TestWebsocketNetworkTXMessageOfInterestPN(t *testing.T) { netA.config.GossipFanout = 1 netA.config.EnablePingHandler = false netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") bConfig := defaultConfig bConfig.NetAddress = "" @@ -2144,14 +2176,15 @@ func TestWebsocketNetworkTXMessageOfInterestPN(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") require.False(t, netB.relayMessages) - require.Equal(t, uint32(wantTXGossipUnk), netB.wantTXGossip) + require.Equal(t, uint32(wantTXGossipUnk), atomic.LoadUint32(&netB.wantTXGossip)) incomingMsgSync := deadlock.Mutex{} msgCounters := make(map[protocol.Tag]int) messageArriveWg := sync.WaitGroup{} msgHandler := func(msg IncomingMessage) (out OutgoingMessage) { + t.Logf("A->B %s", msg.Tag) incomingMsgSync.Lock() defer incomingMsgSync.Unlock() msgCounters[msg.Tag] = msgCounters[msg.Tag] + 1 @@ -2185,9 +2218,14 @@ func TestWebsocketNetworkTXMessageOfInterestPN(t *testing.T) { waitReady(t, netB, readyTimeout.C) netB.OnNetworkAdvance() - // TODO: better event driven thing for netB sending new MOI - time.Sleep(10 * time.Millisecond) - require.Equal(t, uint32(wantTXGossipYes), netB.wantTXGossip) + waitForMOIRefreshQuiet(netB) + for i := 0; i < 10; i++ { + if atomic.LoadUint32(&netB.wantTXGossip) == uint32(wantTXGossipYes) { + break + } + time.Sleep(time.Millisecond) + } + require.Equal(t, uint32(wantTXGossipYes), atomic.LoadUint32(&netB.wantTXGossip)) // send another message which we can track, so that we'll know that the first message was delivered. netB.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil) messageFilterArriveWg.Wait() @@ -2229,7 +2267,7 @@ func TestWebsocketDisconnection(t *testing.T) { netA.log = dl netA.Start() - defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + defer netStop(t, netA, "A") netB := makeTestWebsocketNode(t) netB.config.GossipFanout = 1 netB.config.EnablePingHandler = false @@ -2238,7 +2276,7 @@ func TestWebsocketDisconnection(t *testing.T) { t.Log(addrA) netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) netB.Start() - defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + defer netStop(t, netB, "B") msgHandlerA := func(msg IncomingMessage) (out OutgoingMessage) { // if we received a message, send a message back. From 53c3684fd437316b484ef2ba8e5ff88c5f945ee5 Mon Sep 17 00:00:00 2001 From: John Lee Date: Fri, 10 Jun 2022 15:12:29 -0400 Subject: [PATCH 2/2] Devops: bump build number --- buildnumber.dat | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildnumber.dat b/buildnumber.dat index d00491fd7e..0cfbf08886 100644 --- a/buildnumber.dat +++ b/buildnumber.dat @@ -1 +1 @@ -1 +2