Skip to content

Commit

Permalink
telemetry: add TCP RTT info collection (#4745)
Browse files Browse the repository at this point in the history
  • Loading branch information
cce authored Nov 17, 2022
1 parent e618fbf commit 526cb89
Show file tree
Hide file tree
Showing 10 changed files with 423 additions and 112 deletions.
4 changes: 4 additions & 0 deletions logging/telemetryspec/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package telemetryspec

import (
"time"

"github.com/algorand/go-algorand/util"
)

// Telemetry Events
Expand Down Expand Up @@ -302,6 +304,8 @@ type PeerConnectionDetails struct {
MessageDelay int64 `json:",omitempty"`
// DuplicateFilterCount is the number of times this peer has sent us a message hash to filter that it had already sent before.
DuplicateFilterCount uint64
// TCPInfo provides connection measurements from TCP.
TCP util.TCPInfo `json:",omitempty"`
}

// CatchpointGenerationEvent event
Expand Down
4 changes: 4 additions & 0 deletions network/limitlistener/rejectingLimitListener.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,7 @@ func (l *rejectingLimitListenerConn) Close() error {
l.releaseOnce.Do(l.release)
return err
}

func (l *rejectingLimitListenerConn) UnderlyingConn() net.Conn {
return l.Conn
}
4 changes: 4 additions & 0 deletions network/requestTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ type requestTrackedConnection struct {
tracker *RequestTracker
}

func (c *requestTrackedConnection) UnderlyingConn() net.Conn {
return c.Conn
}

// Close removes the connection from the tracker's connections map and call the underlaying Close function.
func (c *requestTrackedConnection) Close() error {
c.tracker.hostRequestsMu.Lock()
Expand Down
24 changes: 22 additions & 2 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -1805,14 +1805,23 @@ func (wn *WebsocketNetwork) OnNetworkAdvance() {
// to the telemetry server. Internally, it's using a timer to ensure that it would only
// send the information once every hour ( configurable via PeerConnectionsUpdateInterval )
func (wn *WebsocketNetwork) sendPeerConnectionsTelemetryStatus() {
if !wn.log.GetTelemetryEnabled() {
return
}
now := time.Now()
if wn.lastPeerConnectionsSent.Add(time.Duration(wn.config.PeerConnectionsUpdateInterval)*time.Second).After(now) || wn.config.PeerConnectionsUpdateInterval <= 0 {
// it's not yet time to send the update.
return
}
wn.lastPeerConnectionsSent = now

var peers []*wsPeer
peers, _ = wn.peerSnapshot(peers)
connectionDetails := wn.getPeerConnectionTelemetryDetails(now, peers)
wn.log.EventWithDetails(telemetryspec.Network, telemetryspec.PeerConnectionsEvent, connectionDetails)
}

func (wn *WebsocketNetwork) getPeerConnectionTelemetryDetails(now time.Time, peers []*wsPeer) telemetryspec.PeersConnectionDetails {
var connectionDetails telemetryspec.PeersConnectionDetails
for _, peer := range peers {
connDetail := telemetryspec.PeerConnectionDetails{
Expand All @@ -1821,6 +1830,18 @@ func (wn *WebsocketNetwork) sendPeerConnectionsTelemetryStatus() {
InstanceName: peer.InstanceName,
DuplicateFilterCount: peer.duplicateFilterCount,
}
// unwrap websocket.Conn, requestTrackedConnection, rejectingLimitListenerConn
var uconn net.Conn = peer.conn.UnderlyingConn()
for i := 0; i < 10; i++ {
wconn, ok := uconn.(wrappedConn)
if !ok {
break
}
uconn = wconn.UnderlyingConn()
}
if tcpInfo, err := util.GetConnTCPInfo(uconn); err == nil && tcpInfo != nil {
connDetail.TCP = *tcpInfo
}
if peer.outgoing {
connDetail.Address = justHost(peer.conn.RemoteAddr().String())
connDetail.Endpoint = peer.GetAddress()
Expand All @@ -1831,8 +1852,7 @@ func (wn *WebsocketNetwork) sendPeerConnectionsTelemetryStatus() {
connectionDetails.IncomingPeers = append(connectionDetails.IncomingPeers, connDetail)
}
}

wn.log.EventWithDetails(telemetryspec.Network, telemetryspec.PeerConnectionsEvent, connectionDetails)
return connectionDetails
}

// prioWeightRefreshTime controls how often we refresh the weights
Expand Down
218 changes: 108 additions & 110 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -268,24 +269,30 @@ func netStop(t testing.TB, wn *WebsocketNetwork, name string) {
t.Logf("%s done", name)
}

// Set up two nodes, test that a.Broadcast is received by B
func TestWebsocketNetworkBasic(t *testing.T) {
partitiontest.PartitionTest(t)
func setupWebsocketNetworkAB(t *testing.T, countTarget int) (*WebsocketNetwork, *WebsocketNetwork, *messageCounterHandler, func()) {
success := false

netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
defer netStop(t, netA, "A")
defer func() {
if !success {
netStop(t, netA, "A")
}
}()
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
require.True(t, postListen)
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
defer netStop(t, netB, "B")
counter := newMessageCounter(t, 2)
counterDone := counter.done
defer func() {
if !success {
netStop(t, netB, "B")
}
}()
counter := newMessageCounter(t, countTarget)
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

readyTimeout := time.NewTimer(2 * time.Second)
Expand All @@ -294,6 +301,21 @@ func TestWebsocketNetworkBasic(t *testing.T) {
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")

success = true
closeFunc := func() {
netStop(t, netB, "B")
netStop(t, netB, "A")
}
return netA, netB, counter, closeFunc
}

// Set up two nodes, test that a.Broadcast is received by B
func TestWebsocketNetworkBasic(t *testing.T) {
partitiontest.PartitionTest(t)

netA, _, counter, closeFunc := setupWebsocketNetworkAB(t, 2)
defer closeFunc()
counterDone := counter.done
netA.Broadcast(context.Background(), protocol.TxnTag, []byte("foo"), false, nil)
netA.Broadcast(context.Background(), protocol.TxnTag, []byte("bar"), false, nil)

Expand Down Expand Up @@ -384,27 +406,9 @@ func TestWebsocketProposalPayloadCompression(t *testing.T) {
func TestWebsocketNetworkUnicast(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
require.True(t, postListen)
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
defer netStop(t, netB, "B")
counter := newMessageCounter(t, 2)
netA, _, counter, closeFunc := setupWebsocketNetworkAB(t, 2)
defer closeFunc()
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
t.Log("a ready")
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")

require.Equal(t, 1, len(netA.peers))
require.Equal(t, 1, len(netA.GetPeers(PeersConnectedIn)))
Expand All @@ -425,26 +429,8 @@ func TestWebsocketNetworkUnicast(t *testing.T) {
func TestWebsocketPeerData(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
require.True(t, postListen)
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
defer netStop(t, netB, "B")
counter := newMessageCounter(t, 2)
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
t.Log("a ready")
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")
netA, _, _, closeFunc := setupWebsocketNetworkAB(t, 2)
defer closeFunc()

require.Equal(t, 1, len(netA.peers))
require.Equal(t, 1, len(netA.GetPeers(PeersConnectedIn)))
Expand All @@ -463,27 +449,9 @@ func TestWebsocketPeerData(t *testing.T) {
func TestWebsocketNetworkArray(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
require.True(t, postListen)
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
defer netStop(t, netB, "B")
counter := newMessageCounter(t, 3)
netA, _, counter, closeFunc := setupWebsocketNetworkAB(t, 3)
defer closeFunc()
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
t.Log("a ready")
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")

tags := []protocol.Tag{protocol.TxnTag, protocol.TxnTag, protocol.TxnTag}
data := [][]byte{[]byte("foo"), []byte("bar"), []byte("algo")}
Expand All @@ -500,27 +468,9 @@ func TestWebsocketNetworkArray(t *testing.T) {
func TestWebsocketNetworkCancel(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
require.True(t, postListen)
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
defer netStop(t, netB, "B")
counter := newMessageCounter(t, 100)
netA, _, counter, closeFunc := setupWebsocketNetworkAB(t, 100)
defer closeFunc()
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
t.Log("a ready")
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")

tags := make([]protocol.Tag, 100)
data := make([][]byte, 100)
Expand Down Expand Up @@ -721,29 +671,15 @@ func TestAddrToGossipAddr(t *testing.T) {

type nopConn struct{}

func (nc *nopConn) RemoteAddr() net.Addr {
return nil
}
func (nc *nopConn) NextReader() (int, io.Reader, error) {
return 0, nil, nil
}
func (nc *nopConn) WriteMessage(int, []byte) error {
return nil
}
func (nc *nopConn) WriteControl(int, []byte, time.Time) error {
return nil
}
func (nc *nopConn) SetReadLimit(limit int64) {
}
func (nc *nopConn) CloseWithoutFlush() error {
return nil
}
func (nc *nopConn) SetPingHandler(h func(appData string) error) {

}
func (nc *nopConn) SetPongHandler(h func(appData string) error) {

}
func (nc *nopConn) RemoteAddr() net.Addr { return nil }
func (nc *nopConn) NextReader() (int, io.Reader, error) { return 0, nil, nil }
func (nc *nopConn) WriteMessage(int, []byte) error { return nil }
func (nc *nopConn) WriteControl(int, []byte, time.Time) error { return nil }
func (nc *nopConn) SetReadLimit(limit int64) {}
func (nc *nopConn) CloseWithoutFlush() error { return nil }
func (nc *nopConn) SetPingHandler(h func(appData string) error) {}
func (nc *nopConn) SetPongHandler(h func(appData string) error) {}
func (nc *nopConn) UnderlyingConn() net.Conn { return nil }

var nopConnSingleton = nopConn{}

Expand Down Expand Up @@ -2739,3 +2675,65 @@ func TestPreparePeerData(t *testing.T) {
}
}
}

func TestWebsocketNetworkTelemetryTCP(t *testing.T) {
partitiontest.PartitionTest(t)

// start two networks and send 2 messages from A to B
closed := false
netA, netB, counter, closeFunc := setupWebsocketNetworkAB(t, 2)
defer func() {
if !closed {
closeFunc()
}
}()
counterDone := counter.done
netA.Broadcast(context.Background(), protocol.TxnTag, []byte("foo"), false, nil)
netA.Broadcast(context.Background(), protocol.TxnTag, []byte("bar"), false, nil)

select {
case <-counterDone:
case <-time.After(2 * time.Second):
t.Errorf("timeout, count=%d, wanted 2", counter.count)
}

// get RTT from both ends and assert nonzero
var peersA, peersB []*wsPeer
peersA, _ = netA.peerSnapshot(peersA)
detailsA := netA.getPeerConnectionTelemetryDetails(time.Now(), peersA)
peersB, _ = netB.peerSnapshot(peersB)
detailsB := netB.getPeerConnectionTelemetryDetails(time.Now(), peersB)
require.Len(t, detailsA.IncomingPeers, 1)
assert.NotZero(t, detailsA.IncomingPeers[0].TCP.RTT)
require.Len(t, detailsB.OutgoingPeers, 1)
assert.NotZero(t, detailsB.OutgoingPeers[0].TCP.RTT)

pcdA, err := json.Marshal(detailsA)
assert.NoError(t, err)
pcdB, err := json.Marshal(detailsB)
assert.NoError(t, err)
t.Log("detailsA", string(pcdA))
t.Log("detailsB", string(pcdB))

// close connections
closeFunc()
closed = true
// open more FDs by starting 2 more networks
_, _, _, closeFunc2 := setupWebsocketNetworkAB(t, 2)
defer closeFunc2()
// use stale peers snapshot from closed networks to get telemetry
// *net.OpError "use of closed network connection" err results in 0 rtt values
detailsA = netA.getPeerConnectionTelemetryDetails(time.Now(), peersA)
detailsB = netB.getPeerConnectionTelemetryDetails(time.Now(), peersB)
require.Len(t, detailsA.IncomingPeers, 1)
assert.Zero(t, detailsA.IncomingPeers[0].TCP.RTT)
require.Len(t, detailsB.OutgoingPeers, 1)
assert.Zero(t, detailsB.OutgoingPeers[0].TCP.RTT)

pcdA, err = json.Marshal(detailsA)
assert.NoError(t, err)
pcdB, err = json.Marshal(detailsB)
assert.NoError(t, err)
t.Log("closed detailsA", string(pcdA))
t.Log("closed detailsB", string(pcdB))
}
5 changes: 5 additions & 0 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ type wsPeerWebsocketConn interface {
CloseWithoutFlush() error
SetPingHandler(h func(appData string) error)
SetPongHandler(h func(appData string) error)
wrappedConn
}

type wrappedConn interface {
UnderlyingConn() net.Conn
}

type sendMessage struct {
Expand Down
Loading

0 comments on commit 526cb89

Please sign in to comment.