Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions network/peer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type Metrics struct {
ClockSkewCount prometheus.Counter
ClockSkewSum prometheus.Gauge

RTTCount prometheus.Counter
RTTSum prometheus.Gauge

NumFailedToParse prometheus.Counter
NumSendFailed *prometheus.CounterVec // op

Expand All @@ -41,6 +44,14 @@ type Metrics struct {

func NewMetrics(registerer prometheus.Registerer) (*Metrics, error) {
m := &Metrics{
RTTCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "round_trip_count",
Help: "number of RTT samples taken (n)",
}),
RTTSum: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "round_trip_sum",
Help: "sum of RTT samples taken (ms)",
}),
ClockSkewCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "clock_skew_count",
Help: "number of handshake timestamps inspected (n)",
Expand Down Expand Up @@ -83,6 +94,8 @@ func NewMetrics(registerer prometheus.Registerer) (*Metrics, error) {
),
}
return m, errors.Join(
registerer.Register(m.RTTCount),
registerer.Register(m.RTTSum),
registerer.Register(m.ClockSkewCount),
registerer.Register(m.ClockSkewSum),
registerer.Register(m.NumFailedToParse),
Expand Down
25 changes: 24 additions & 1 deletion network/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ type peer struct {
// Must only be accessed atomically
lastSent, lastReceived int64

// lastPingSent is the milliseconds since 1970-01-01 UTC when the last ping was sent
lastPingSent int64

// getPeerListChan signals that we should attempt to send a GetPeerList to
// this peer
getPeerListChan chan struct{}
Expand Down Expand Up @@ -613,6 +616,10 @@ func (p *peer) writeMessage(writer io.Writer, msg message.OutboundMessage) {
return
}

if msg.Op() == message.PingOp {
atomic.StoreInt64(&p.lastPingSent, p.Clock.Time().UnixMilli())
}

// Write the message
var buf net.Buffers = [][]byte{msgLenBytes[:], msgBytes}
if _, err := io.CopyN(writer, &buf, int64(wrappers.IntLen+msgLen)); err != nil {
Expand Down Expand Up @@ -819,7 +826,23 @@ func (p *peer) getUptime() uint32 {
return primaryUptimePercent
}

func (*peer) handlePong(*p2p.Pong) {}
func (p *peer) handlePong(*p2p.Pong) {
pingSent := atomic.SwapInt64(&p.lastPingSent, 0)
if pingSent == 0 {
p.Log.Debug(malformedMessageLog,
zap.Stringer("nodeID", p.id),
zap.Stringer("messageOp", message.PongOp),
zap.String("reason", "received unexpected pong"),
)
p.StartClose()
return
}

elapsed := p.Clock.Time().UnixMilli() - pingSent

p.Metrics.RTTCount.Inc()
p.Metrics.RTTSum.Add(float64(elapsed))
}

func (p *peer) handleHandshake(msg *p2p.Handshake) {
if p.gotHandshake.Get() {
Expand Down
Loading