Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go-algorand 3.7.2-stable Release PR #4118

Merged
merged 4 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildnumber.dat
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1
2
42 changes: 22 additions & 20 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ type WebsocketNetwork struct {
// messagesOfInterestMu protects messagesOfInterest and ensures
// that messagesOfInterestEnc does not change once it is set during
// network start.
messagesOfInterestMu deadlock.Mutex
messagesOfInterestCond *sync.Cond
messagesOfInterestMu deadlock.Mutex
messagesOfInterestRefresh chan struct{}

// peersConnectivityCheckTicker is the timer for testing that all the connected peers
// are still transmitting or receiving information. The channel produced by this ticker
Expand Down Expand Up @@ -764,7 +764,7 @@ func (wn *WebsocketNetwork) setup() {
SupportedProtocolVersions = []string{wn.config.NetworkProtocolVersion}
}

wn.messagesOfInterestCond = sync.NewCond(&wn.messagesOfInterestMu)
wn.messagesOfInterestRefresh = make(chan struct{}, 2)
wn.messagesOfInterestGeneration = 1 // something nonzero so that any new wsPeer needs updating
if wn.relayMessages {
wn.RegisterMessageInterest(protocol.CompactCertSigTag)
Expand Down Expand Up @@ -1178,7 +1178,7 @@ func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOf
if messagesOfInterestEnc != nil {
peer.sendMessagesOfInterest(messagesOfInterestGeneration, messagesOfInterestEnc)
} else {
wn.log.Infof("msgOfInterest Enc=nil")
wn.log.Infof("msgOfInterest Enc=nil, MOIGen=%d", messagesOfInterestGeneration)
}
}
}
Expand Down Expand Up @@ -1726,14 +1726,10 @@ func (wn *WebsocketNetwork) OnNetworkAdvance() {
defer wn.lastNetworkAdvanceMu.Unlock()
wn.lastNetworkAdvance = time.Now().UTC()
if wn.nodeInfo != nil && !wn.relayMessages && !wn.config.ForceFetchTransactions {
// if we're not a relay, and not participating, we don't need txn pool
wantTXGossip := wn.nodeInfo.IsParticipating()
if wantTXGossip && (wn.wantTXGossip != wantTXGossipYes) {
wn.RegisterMessageInterest(protocol.TxnTag)
wn.wantTXGossip = wantTXGossipYes
} else if !wantTXGossip && (wn.wantTXGossip != wantTXGossipNo) {
wn.DeregisterMessageInterest(protocol.TxnTag)
wn.wantTXGossip = wantTXGossipNo
select {
case wn.messagesOfInterestRefresh <- struct{}{}:
default:
// if the notify chan is full, it will get around to updating the latest when it actually runs
}
}
}
Expand Down Expand Up @@ -2350,18 +2346,24 @@ func (wn *WebsocketNetwork) updateMessagesOfInterestEnc() {
wn.messagesOfInterestEnc = MarshallMessageOfInterestMap(wn.messagesOfInterest)
wn.messagesOfInterestEncoded = true
atomic.AddUint32(&wn.messagesOfInterestGeneration, 1)
wn.messagesOfInterestCond.Broadcast()
var peers []*wsPeer
peers, _ = wn.peerSnapshot(peers)
for _, peer := range peers {
wn.maybeSendMessagesOfInterest(peer, wn.messagesOfInterestEnc)
}
}

func (wn *WebsocketNetwork) postMessagesOfInterestThread() {
var peers []*wsPeer
wn.messagesOfInterestMu.Lock()
defer wn.messagesOfInterestMu.Unlock()
for {
wn.messagesOfInterestCond.Wait()
peers, _ = wn.peerSnapshot(peers)
for _, peer := range peers {
wn.maybeSendMessagesOfInterest(peer, wn.messagesOfInterestEnc)
<-wn.messagesOfInterestRefresh
// if we're not a relay, and not participating, we don't need txn pool
wantTXGossip := wn.nodeInfo.IsParticipating()
if wantTXGossip && (wn.wantTXGossip != wantTXGossipYes) {
wn.RegisterMessageInterest(protocol.TxnTag)
atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipYes)
} else if !wantTXGossip && (wn.wantTXGossip != wantTXGossipNo) {
wn.DeregisterMessageInterest(protocol.TxnTag)
atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipNo)
}
}
}
Expand Down
Loading