Skip to content

Commit 8e6354c

Browse files
authored
network: wsnet with p2p backup meshing strategy (#6391)
1 parent d4bab73 commit 8e6354c

17 files changed

+776
-190
lines changed

network/connPerfMon.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/algorand/go-deadlock"
2424

2525
"github.com/algorand/go-algorand/crypto"
26+
"github.com/algorand/go-algorand/logging"
2627
)
2728

2829
//msgp:ignore pmStage
@@ -384,3 +385,127 @@ func (pm *connectionPerformanceMonitor) accumulateMessage(msg *IncomingMessage,
384385
delete(msgBucket.messages, msgDigest)
385386
}
386387
}
388+
389+
type networkAdvanceMonitor struct {
390+
// lastNetworkAdvance contains the last timestamp where the agreement protocol was able to make a notable progress.
391+
// it used as a watchdog to help us detect connectivity issues ( such as cliques )
392+
lastNetworkAdvance time.Time
393+
394+
mu deadlock.Mutex
395+
}
396+
397+
func makeNetworkAdvanceMonitor() *networkAdvanceMonitor {
398+
return &networkAdvanceMonitor{
399+
lastNetworkAdvance: time.Now().UTC(),
400+
}
401+
}
402+
403+
func (m *networkAdvanceMonitor) lastAdvancedWithin(interval time.Duration) bool {
404+
m.mu.Lock()
405+
defer m.mu.Unlock()
406+
// now < last + interval <=> now - last < interval
407+
return time.Now().UTC().Before(m.lastNetworkAdvance.Add(interval))
408+
}
409+
410+
func (m *networkAdvanceMonitor) updateLastAdvance() {
411+
m.mu.Lock()
412+
defer m.mu.Unlock()
413+
m.lastNetworkAdvance = time.Now().UTC()
414+
}
415+
416+
type outgoingConnsCloser struct {
417+
log logging.Logger
418+
net outgoingDisconnectable
419+
cliqueResolveInterval time.Duration
420+
connPerfMonitor *connectionPerformanceMonitor
421+
netAdvMonitor *networkAdvanceMonitor
422+
}
423+
424+
type outgoingDisconnectable interface {
425+
outgoingPeers() (peers []Peer)
426+
numOutgoingPending() int
427+
disconnect(badnode Peer, reason disconnectReason)
428+
OnNetworkAdvance()
429+
}
430+
431+
func makeOutgoingConnsCloser(log logging.Logger, net outgoingDisconnectable, connPerfMonitor *connectionPerformanceMonitor, cliqueResolveInterval time.Duration) *outgoingConnsCloser {
432+
return &outgoingConnsCloser{
433+
log: log,
434+
net: net,
435+
cliqueResolveInterval: cliqueResolveInterval,
436+
connPerfMonitor: connPerfMonitor,
437+
netAdvMonitor: makeNetworkAdvanceMonitor(),
438+
}
439+
}
440+
441+
// checkExistingConnectionsNeedDisconnecting check to see if existing connection need to be dropped due to
442+
// performance issues and/or network being stalled.
443+
func (cc *outgoingConnsCloser) checkExistingConnectionsNeedDisconnecting(targetConnCount int) bool {
444+
// we already connected ( or connecting.. ) to GossipFanout peers.
445+
// get the actual peers.
446+
outgoingPeers := cc.net.outgoingPeers()
447+
if len(outgoingPeers) < targetConnCount {
448+
// reset the performance monitor.
449+
cc.connPerfMonitor.Reset([]Peer{})
450+
return cc.checkNetworkAdvanceDisconnect()
451+
}
452+
453+
if !cc.connPerfMonitor.ComparePeers(outgoingPeers) {
454+
// different set of peers. restart monitoring.
455+
cc.connPerfMonitor.Reset(outgoingPeers)
456+
}
457+
458+
// same set of peers.
459+
peerStat := cc.connPerfMonitor.GetPeersStatistics()
460+
if peerStat == nil {
461+
// performance metrics are not yet ready.
462+
return cc.checkNetworkAdvanceDisconnect()
463+
}
464+
465+
// update peers with the performance metrics we've gathered.
466+
var leastPerformingPeer *wsPeer = nil
467+
for _, stat := range peerStat.peerStatistics {
468+
wsPeer := stat.peer.(*wsPeer)
469+
wsPeer.peerMessageDelay = stat.peerDelay
470+
cc.log.Infof("network performance monitor - peer '%s' delay %d first message portion %d%%", wsPeer.GetAddress(), stat.peerDelay, int(stat.peerFirstMessage*100))
471+
if wsPeer.throttledOutgoingConnection && leastPerformingPeer == nil {
472+
leastPerformingPeer = wsPeer
473+
}
474+
}
475+
if leastPerformingPeer == nil {
476+
return cc.checkNetworkAdvanceDisconnect()
477+
}
478+
cc.net.disconnect(leastPerformingPeer, disconnectLeastPerformingPeer)
479+
cc.connPerfMonitor.Reset([]Peer{})
480+
481+
return true
482+
}
483+
484+
// checkNetworkAdvanceDisconnect is using the lastNetworkAdvance indicator to see if the network is currently "stuck".
485+
// if it's seems to be "stuck", a randomly picked peer would be disconnected.
486+
func (cc *outgoingConnsCloser) checkNetworkAdvanceDisconnect() bool {
487+
if cc.netAdvMonitor.lastAdvancedWithin(cc.cliqueResolveInterval) {
488+
return false
489+
}
490+
outgoingPeers := cc.net.outgoingPeers()
491+
if len(outgoingPeers) == 0 {
492+
return false
493+
}
494+
if cc.net.numOutgoingPending() > 0 {
495+
// we're currently trying to extend the list of outgoing connections. no need to
496+
// disconnect any existing connection to free up room for another connection.
497+
return false
498+
}
499+
var peer *wsPeer
500+
disconnectPeerIdx := crypto.RandUint63() % uint64(len(outgoingPeers))
501+
peer = outgoingPeers[disconnectPeerIdx].(*wsPeer)
502+
503+
cc.net.disconnect(peer, disconnectCliqueResolve)
504+
cc.connPerfMonitor.Reset([]Peer{})
505+
cc.net.OnNetworkAdvance()
506+
return true
507+
}
508+
509+
func (cc *outgoingConnsCloser) updateLastAdvance() {
510+
cc.netAdvMonitor.updateLastAdvance()
511+
}

network/connPerfMon_test.go

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/stretchr/testify/require"
2525

2626
"github.com/algorand/go-algorand/crypto"
27+
"github.com/algorand/go-algorand/logging"
2728
"github.com/algorand/go-algorand/protocol"
2829
"github.com/algorand/go-algorand/test/partitiontest"
2930
)
@@ -92,7 +93,7 @@ func BenchmarkConnMonitor(b *testing.B) {
9293
}
9394
}
9495

95-
func TestConnMonitorStageTiming(t *testing.T) {
96+
func TestConnMonitor_StageTiming(t *testing.T) {
9697
partitiontest.PartitionTest(t)
9798

9899
peers := []Peer{&wsPeer{}, &wsPeer{}, &wsPeer{}, &wsPeer{}}
@@ -130,7 +131,7 @@ func TestConnMonitorStageTiming(t *testing.T) {
130131
}
131132

132133
}
133-
func TestBucketsPruning(t *testing.T) {
134+
func TestConnMonitor_BucketsPruning(t *testing.T) {
134135
partitiontest.PartitionTest(t)
135136

136137
bucketsCount := 100
@@ -160,3 +161,82 @@ func TestBucketsPruning(t *testing.T) {
160161
require.Equal(t, bucketsCount-i, len(perfMonitor.pendingMessagesBuckets))
161162
}
162163
}
164+
165+
type mockOutgoingNet struct {
166+
peers []Peer
167+
pending int
168+
disconnectedPeer Peer
169+
disconnectReason disconnectReason
170+
advanceCalled bool
171+
}
172+
173+
func (m *mockOutgoingNet) outgoingPeers() (peers []Peer) { return m.peers }
174+
func (m *mockOutgoingNet) numOutgoingPending() int { return m.pending }
175+
func (m *mockOutgoingNet) disconnect(badnode Peer, reason disconnectReason) {
176+
m.disconnectedPeer = badnode
177+
m.disconnectReason = reason
178+
}
179+
func (m *mockOutgoingNet) OnNetworkAdvance() { m.advanceCalled = true }
180+
181+
func TestConnMonitor_CheckExistingConnections_ThrottledPeers(t *testing.T) {
182+
partitiontest.PartitionTest(t)
183+
mon := makeConnectionPerformanceMonitor(nil)
184+
185+
p1 := &wsPeer{throttledOutgoingConnection: true}
186+
mockNet := &mockOutgoingNet{peers: []Peer{p1}}
187+
cc := makeOutgoingConnsCloser(logging.TestingLog(t), mockNet, mon, 100*time.Second)
188+
189+
res := cc.checkExistingConnectionsNeedDisconnecting(2)
190+
require.False(t, res)
191+
require.Nil(t, mockNet.disconnectedPeer)
192+
193+
p2 := &wsPeer{throttledOutgoingConnection: false} // not throttled
194+
mockNet = &mockOutgoingNet{peers: []Peer{p1, p2}}
195+
cc = makeOutgoingConnsCloser(logging.TestingLog(t), mockNet, mon, 100*time.Second)
196+
197+
mon.Reset(mockNet.peers)
198+
mon.stage = pmStageStopped
199+
mon.connectionDelay = map[Peer]int64{p1: 20, p2: 10}
200+
mon.firstMessageCount = map[Peer]int64{p1: 1, p2: 2}
201+
mon.msgCount = 3
202+
203+
res = cc.checkExistingConnectionsNeedDisconnecting(2)
204+
require.True(t, res, "expected disconnect")
205+
require.Equal(t, p1, mockNet.disconnectedPeer)
206+
require.Equal(t, disconnectLeastPerformingPeer, mockNet.disconnectReason)
207+
}
208+
209+
func TestConnMonitor_CheckExistingConnections_NoThrottledPeers(t *testing.T) {
210+
partitiontest.PartitionTest(t)
211+
mon := makeConnectionPerformanceMonitor(nil)
212+
p1 := &wsPeer{throttledOutgoingConnection: false}
213+
p2 := &wsPeer{throttledOutgoingConnection: false}
214+
mockNet := &mockOutgoingNet{peers: []Peer{p1, p2}}
215+
cc := makeOutgoingConnsCloser(logging.TestingLog(t), mockNet, mon, 0)
216+
mon.Reset(mockNet.peers)
217+
mon.stage = pmStageStopped
218+
mon.connectionDelay = map[Peer]int64{p1: 5, p2: 6}
219+
mon.firstMessageCount = map[Peer]int64{p1: 1, p2: 1}
220+
mon.msgCount = 2
221+
222+
res := cc.checkExistingConnectionsNeedDisconnecting(2)
223+
require.True(t, res)
224+
require.NotNil(t, mockNet.disconnectedPeer)
225+
require.NotEqual(t, disconnectLeastPerformingPeer, mockNet.disconnectReason)
226+
}
227+
228+
func TestNetworkAdvanceMonitor(t *testing.T) {
229+
partitiontest.PartitionTest(t)
230+
m := makeNetworkAdvanceMonitor()
231+
232+
require.True(t, m.lastAdvancedWithin(500*time.Millisecond))
233+
234+
m.mu.Lock()
235+
m.lastNetworkAdvance = time.Now().Add(-2 * time.Second)
236+
m.mu.Unlock()
237+
require.False(t, m.lastAdvancedWithin(500*time.Millisecond), "expected false after stale interval")
238+
239+
// update and verify within again
240+
m.updateLastAdvance()
241+
require.True(t, m.lastAdvancedWithin(500*time.Millisecond))
242+
}

network/hybridNetwork.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,14 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p
5252
var childWsNetMeshCreator = meshCreator
5353
var childP2PNetMeshCreator = meshCreator
5454
var hybridMeshCreator MeshCreator = noopMeshCreator{}
55+
noMeshCreatorAndHybridServer := meshCreator == nil && cfg.IsHybridServer()
5556
_, isHybridMeshCreator := meshCreator.(hybridRelayMeshCreator)
56-
if meshCreator == nil && cfg.IsHybridServer() || isHybridMeshCreator {
57+
if noMeshCreatorAndHybridServer || isHybridMeshCreator {
5758
// no mesh creator provided and this node is a listening/relaying node
5859
// then override and use hybrid relay meshing
5960
// or, if a hybrid relay meshing requested explicitly, do the same
6061
childWsNetMeshCreator = noopMeshCreator{}
61-
childP2PNetMeshCreator = noopMeshPubSubFilteredCreator{}
62+
childP2PNetMeshCreator = noopMeshCreator{}
6263
hybridMeshCreator = hybridRelayMeshCreator{}
6364
}
6465

@@ -77,6 +78,7 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p
7778
}
7879

7980
hybridMesh, err := hybridMeshCreator.create(
81+
withTargetConnCount(cfg.GossipFanout),
8082
withWebsocketNetwork(wsnet),
8183
withP2PNetwork(p2pnet))
8284
if err != nil {
@@ -187,7 +189,12 @@ func (n *HybridP2PNetwork) RegisterHTTPHandlerFunc(path string, handlerFunc func
187189
}
188190

189191
// RequestConnectOutgoing implements GossipNode
190-
func (n *HybridP2PNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {}
192+
func (n *HybridP2PNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {
193+
_ = n.runParallel(func(net GossipNode) error {
194+
net.RequestConnectOutgoing(replace, quit)
195+
return nil
196+
})
197+
}
191198

192199
// GetPeers implements GossipNode
193200
func (n *HybridP2PNetwork) GetPeers(options ...PeerOption) []Peer {

network/hybridNetwork_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ func TestHybridNetwork_DuplicateConn(t *testing.T) {
3939

4040
cfg := config.GetDefaultLocal()
4141
cfg.EnableP2PHybridMode = true
42+
cfg.DNSBootstrapID = ""
4243
log := logging.TestingLog(t)
4344
const p2pKeyDir = ""
4445

@@ -208,6 +209,7 @@ func TestHybridNetwork_HybridRelayStrategy(t *testing.T) {
208209

209210
cfg := config.GetDefaultLocal()
210211
cfg.EnableP2PHybridMode = true
212+
cfg.DNSBootstrapID = ""
211213
log := logging.TestingLog(t)
212214

213215
genesisInfo := GenesisInfo{genesisID, "net"}

0 commit comments

Comments
 (0)