Skip to content

Commit

Permalink
dvovk/limit mem usage (#10069)
Browse files Browse the repository at this point in the history
Implemented limit for saving peers in an Erigon node memory to be able
to turn on diagnostics data collection by default.
dvovk authored Apr 29, 2024
1 parent aed056f commit 2b83da1
Showing 4 changed files with 374 additions and 60 deletions.
3 changes: 2 additions & 1 deletion erigon-lib/diagnostics/client.go
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ type DiagnosticClient struct {
mu sync.Mutex
headerMutex sync.Mutex
hardwareInfo HardwareInfo
peersSyncMap sync.Map
peersStats *PeerStats
headers Headers
bodies BodiesInfo
bodiesMutex sync.Mutex
@@ -37,6 +37,7 @@ func NewDiagnosticClient(metricsMux *http.ServeMux, dataDirPath string) *Diagnos
resourcesUsage: ResourcesUsage{
MemoryUsage: []MemoryStats{},
},
peersStats: NewPeerStats(1000), // 1000 is the limit of peers; TODO: make it configurable through a flag
}
}

4 changes: 3 additions & 1 deletion erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
@@ -16,7 +16,9 @@

package diagnostics

import "time"
import (
"time"
)

type PeerStatisticsGetter interface {
GetPeersStatistics() map[string]*PeerStatistics
215 changes: 157 additions & 58 deletions erigon-lib/diagnostics/network.go
Original file line number Diff line number Diff line change
@@ -2,60 +2,99 @@ package diagnostics

import (
"context"
"sort"
"sync"
"time"

"github.com/ledgerwatch/log/v3"
)

func (d *DiagnosticClient) setupNetworkDiagnostics(rootCtx context.Context) {
d.runCollectPeersStatistics(rootCtx)
type PeerStats struct {
peersInfo *sync.Map
recordsCount int
lastUpdateMap map[string]time.Time
limit int
}

func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) {
go func() {
ctx, ch, closeChannel := Context[PeerStatisticMsgUpdate](rootCtx, 1)
defer closeChannel()
func NewPeerStats(peerLimit int) *PeerStats {
return &PeerStats{
peersInfo: &sync.Map{},
recordsCount: 0,
lastUpdateMap: make(map[string]time.Time),
limit: peerLimit,
}
}

StartProviders(ctx, TypeOf(PeerStatisticMsgUpdate{}), log.Root())
for {
select {
case <-rootCtx.Done():
return
case info := <-ch:
if value, ok := d.peersSyncMap.Load(info.PeerID); ok {
if stats, ok := value.(PeerStatistics); ok {
if info.Inbound {
stats.BytesIn += uint64(info.Bytes)
stats.CapBytesIn[info.MsgCap] += uint64(info.Bytes)
stats.TypeBytesIn[info.MsgType] += uint64(info.Bytes)
} else {
stats.BytesOut += uint64(info.Bytes)
stats.CapBytesOut[info.MsgCap] += uint64(info.Bytes)
stats.TypeBytesOut[info.MsgType] += uint64(info.Bytes)
}

d.peersSyncMap.Store(info.PeerID, stats)
} else {
log.Debug("Failed to cast value to PeerStatistics struct", value)
}
} else {
d.peersSyncMap.Store(info.PeerID, PeerStatistics{
PeerType: info.PeerType,
CapBytesIn: make(map[string]uint64),
CapBytesOut: make(map[string]uint64),
TypeBytesIn: make(map[string]uint64),
TypeBytesOut: make(map[string]uint64),
})
}
}
func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate) {
if value, ok := p.peersInfo.Load(peerID); ok {
p.UpdatePeer(peerID, peerInfo, value)
} else {
p.AddPeer(peerID, peerInfo)
if p.GetPeersCount() > p.limit {
p.RemovePeersWhichExceedLimit(p.limit)
}
}()
}
}

func (d *DiagnosticClient) Peers() map[string]*PeerStatistics {
stats := make(map[string]*PeerStatistics)
func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) {
pv := PeerStatisticsFromMsgUpdate(peerInfo, nil)
p.peersInfo.Store(peerID, pv)
p.recordsCount++
p.lastUpdateMap[peerID] = time.Now()
}

d.peersSyncMap.Range(func(key, value interface{}) bool {
func (p *PeerStats) UpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) {
pv := PeerStatisticsFromMsgUpdate(peerInfo, prevValue)

p.peersInfo.Store(peerID, pv)
p.lastUpdateMap[peerID] = time.Now()
}

func PeerStatisticsFromMsgUpdate(msg PeerStatisticMsgUpdate, prevValue any) PeerStatistics {
ps := PeerStatistics{
PeerType: msg.PeerType,
BytesIn: 0,
BytesOut: 0,
CapBytesIn: make(map[string]uint64),
CapBytesOut: make(map[string]uint64),
TypeBytesIn: make(map[string]uint64),
TypeBytesOut: make(map[string]uint64),
}

if stats, ok := prevValue.(PeerStatistics); ok {
if msg.Inbound {
ps.BytesIn = stats.BytesIn + uint64(msg.Bytes)
ps.CapBytesIn[msg.MsgCap] = stats.CapBytesIn[msg.MsgCap] + uint64(msg.Bytes)
ps.TypeBytesIn[msg.MsgType] = stats.TypeBytesIn[msg.MsgType] + uint64(msg.Bytes)
} else {
ps.BytesOut = stats.BytesOut + uint64(msg.Bytes)
ps.CapBytesOut[msg.MsgCap] = stats.CapBytesOut[msg.MsgCap] + uint64(msg.Bytes)
ps.TypeBytesOut[msg.MsgType] = stats.TypeBytesOut[msg.MsgType] + uint64(msg.Bytes)
}
} else {
if msg.Inbound {
ps.BytesIn += uint64(msg.Bytes)
ps.CapBytesIn[msg.MsgCap] += uint64(msg.Bytes)
ps.TypeBytesIn[msg.MsgType] += uint64(msg.Bytes)
} else {
ps.BytesOut += uint64(msg.Bytes)
ps.CapBytesOut[msg.MsgCap] += uint64(msg.Bytes)
ps.TypeBytesOut[msg.MsgType] += uint64(msg.Bytes)
}

}

return ps
}

func (p *PeerStats) GetPeersCount() int {
return p.recordsCount
}

func (p *PeerStats) GetPeers() map[string]*PeerStatistics {
stats := make(map[string]*PeerStatistics)

p.peersInfo.Range(func(key, value interface{}) bool {
if loadedKey, ok := key.(string); ok {
if loadedValue, ok := value.(PeerStatistics); ok {
stats[loadedKey] = &loadedValue
@@ -69,26 +108,86 @@ func (d *DiagnosticClient) Peers() map[string]*PeerStatistics {
return true
})

d.PeerDataResetStatistics()

return stats
}

func (d *DiagnosticClient) PeerDataResetStatistics() {
d.peersSyncMap.Range(func(key, value interface{}) bool {
if stats, ok := value.(PeerStatistics); ok {
stats.BytesIn = 0
stats.BytesOut = 0
stats.CapBytesIn = make(map[string]uint64)
stats.CapBytesOut = make(map[string]uint64)
stats.TypeBytesIn = make(map[string]uint64)
stats.TypeBytesOut = make(map[string]uint64)

d.peersSyncMap.Store(key, stats)
} else {
log.Debug("Failed to cast value to PeerStatistics struct", value)
func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics {
if value, ok := p.peersInfo.Load(peerID); ok {
if peerStats, ok := value.(PeerStatistics); ok {
return peerStats
}
}

return true
return PeerStatistics{}
}

func (p *PeerStats) GetLastUpdate(peerID string) time.Time {
if lastUpdate, ok := p.lastUpdateMap[peerID]; ok {
return lastUpdate
}

return time.Time{}
}

func (p *PeerStats) RemovePeer(peerID string) {
p.peersInfo.Delete(peerID)
p.recordsCount--
delete(p.lastUpdateMap, peerID)
}

type PeerUpdTime struct {
PeerID string
Time time.Time
}

func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime {
timeArray := make([]PeerUpdTime, 0, p.GetPeersCount())
for k, v := range p.lastUpdateMap {
timeArray = append(timeArray, PeerUpdTime{k, v})
}

sort.Slice(timeArray, func(i, j int) bool {
return timeArray[i].Time.Before(timeArray[j].Time)
})

if len(timeArray) < size {
return timeArray
} else {
return timeArray[:size]
}
}

func (p *PeerStats) RemovePeersWhichExceedLimit(limit int) {
peersToRemove := p.GetPeersCount() - limit
if peersToRemove > 0 {
peers := p.GetOldestUpdatedPeersWithSize(peersToRemove)
for _, peer := range peers {
p.RemovePeer(peer.PeerID)
}
}
}

func (d *DiagnosticClient) setupNetworkDiagnostics(rootCtx context.Context) {
d.runCollectPeersStatistics(rootCtx)
}

func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) {
go func() {
ctx, ch, closeChannel := Context[PeerStatisticMsgUpdate](rootCtx, 1)
defer closeChannel()

StartProviders(ctx, TypeOf(PeerStatisticMsgUpdate{}), log.Root())
for {
select {
case <-rootCtx.Done():
return
case info := <-ch:
d.peersStats.AddOrUpdatePeer(info.PeerID, info)
}
}
}()
}

func (d *DiagnosticClient) Peers() map[string]*PeerStatistics {
return d.peersStats.GetPeers()
}
212 changes: 212 additions & 0 deletions erigon-lib/diagnostics/network_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package diagnostics_test

import (
"strconv"
"testing"
"time"

"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/stretchr/testify/require"
)

var mockInboundPeerStats = diagnostics.PeerStatistics{
PeerType: "Sentinel",
BytesIn: 10,
CapBytesIn: map[string]uint64{"msgCap1": 10},
TypeBytesIn: map[string]uint64{"msgType1": 10},
BytesOut: 0,
CapBytesOut: map[string]uint64{},
TypeBytesOut: map[string]uint64{},
}

var mockOutboundPeerStats = diagnostics.PeerStatistics{
PeerType: "Sentinel",
BytesIn: 0,
CapBytesIn: map[string]uint64{},
TypeBytesIn: map[string]uint64{},
BytesOut: 10,
CapBytesOut: map[string]uint64{"msgCap1": 10},
TypeBytesOut: map[string]uint64{"msgType1": 10},
}

var mockInboundUpdMsg = diagnostics.PeerStatisticMsgUpdate{
PeerType: "Sentinel",
PeerID: "test1",
Inbound: true,
MsgType: "msgType1",
MsgCap: "msgCap1",
Bytes: 10,
}

var mockOutboundUpdMsg = diagnostics.PeerStatisticMsgUpdate{
PeerType: "Sentinel",
PeerID: "test1",
Inbound: false,
MsgType: "msgType1",
MsgCap: "msgCap1",
Bytes: 10,
}

func TestPeerStatisticsFromMsgUpdate(t *testing.T) {
//test handing inbound message
inboundPeerStats := diagnostics.PeerStatisticsFromMsgUpdate(mockInboundUpdMsg, nil)
require.Equal(t, mockInboundPeerStats, inboundPeerStats)

inboundPeerStats = diagnostics.PeerStatisticsFromMsgUpdate(mockInboundUpdMsg, inboundPeerStats)

require.Equal(t, diagnostics.PeerStatistics{
PeerType: "Sentinel",
BytesIn: 20,
CapBytesIn: map[string]uint64{"msgCap1": 20},
TypeBytesIn: map[string]uint64{"msgType1": 20},
BytesOut: 0,
CapBytesOut: map[string]uint64{},
TypeBytesOut: map[string]uint64{},
}, inboundPeerStats)

//test handing outbound message
outboundPeerStats := diagnostics.PeerStatisticsFromMsgUpdate(mockOutboundUpdMsg, nil)
require.Equal(t, mockOutboundPeerStats, outboundPeerStats)

outboundPeerStats = diagnostics.PeerStatisticsFromMsgUpdate(mockOutboundUpdMsg, outboundPeerStats)

require.Equal(t, diagnostics.PeerStatistics{
PeerType: "Sentinel",
BytesIn: 0,
CapBytesIn: map[string]uint64{},
TypeBytesIn: map[string]uint64{},
BytesOut: 20,
CapBytesOut: map[string]uint64{"msgCap1": 20},
TypeBytesOut: map[string]uint64{"msgType1": 20},
}, outboundPeerStats)

}

func TestAddPeer(t *testing.T) {
var peerStats = diagnostics.NewPeerStats(100)

peerStats.AddPeer("test1", mockInboundUpdMsg)
require.Equal(t, 1, peerStats.GetPeersCount())

require.Equal(t, mockInboundPeerStats, peerStats.GetPeerStatistics("test1"))
}

func TestUpdatePeer(t *testing.T) {
peerStats := diagnostics.NewPeerStats(1000)

peerStats.AddPeer("test1", mockInboundUpdMsg)
peerStats.UpdatePeer("test1", mockInboundUpdMsg, mockInboundPeerStats)
require.Equal(t, 1, peerStats.GetPeersCount())

require.Equal(t, diagnostics.PeerStatistics{
PeerType: "Sentinel",
BytesIn: 20,
CapBytesIn: map[string]uint64{"msgCap1": 20},
TypeBytesIn: map[string]uint64{"msgType1": 20},
BytesOut: 0,
CapBytesOut: map[string]uint64{},
TypeBytesOut: map[string]uint64{},
}, peerStats.GetPeerStatistics("test1"))
}

func TestAddOrUpdatePeer(t *testing.T) {
peerStats := diagnostics.NewPeerStats(100)

peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg)
require.Equal(t, 1, peerStats.GetPeersCount())

require.Equal(t, mockInboundPeerStats, peerStats.GetPeerStatistics("test1"))

peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg)
require.Equal(t, 1, peerStats.GetPeersCount())

require.Equal(t, diagnostics.PeerStatistics{
PeerType: "Sentinel",
BytesIn: 20,
CapBytesIn: map[string]uint64{"msgCap1": 20},
TypeBytesIn: map[string]uint64{"msgType1": 20},
BytesOut: 0,
CapBytesOut: map[string]uint64{},
TypeBytesOut: map[string]uint64{},
}, peerStats.GetPeerStatistics("test1"))

peerStats.AddOrUpdatePeer("test2", mockInboundUpdMsg)
require.Equal(t, 2, peerStats.GetPeersCount())
}

func TestGetPeers(t *testing.T) {
peerStats := diagnostics.NewPeerStats(10)

peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg)
peerStats.AddOrUpdatePeer("test2", mockInboundUpdMsg)
peerStats.AddOrUpdatePeer("test3", mockInboundUpdMsg)

peers := peerStats.GetPeers()
require.Equal(t, 3, len(peers))
require.Equal(t, &mockInboundPeerStats, peers["test1"])
}

func TestLastUpdated(t *testing.T) {
peerStats := diagnostics.NewPeerStats(1000)

peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg)
require.NotEmpty(t, peerStats.GetLastUpdate("test1"))

for i := 1; i < 20; i++ {
pid := "test" + strconv.Itoa(i)
peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg)
//wait for 1 milisecond to make sure that the last update time is different
time.Sleep(10 * time.Millisecond)
}

require.True(t, peerStats.GetLastUpdate("test2").After(peerStats.GetLastUpdate("test1")))

oldestPeers := peerStats.GetOldestUpdatedPeersWithSize(10)

// we have 100 peers, but we should get only 10 oldest
require.Equal(t, len(oldestPeers), 10)
// the oldest peer should be test1
require.Equal(t, "test1", oldestPeers[0].PeerID)

// update test1 to
peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg)
oldestPeers = peerStats.GetOldestUpdatedPeersWithSize(10)

// the oldest peer should not be test1
require.NotEqual(t, "test1", oldestPeers[0].PeerID)
}

func TestRemovePeersWhichExceedLimit(t *testing.T) {
limit := 100
peerStats := diagnostics.NewPeerStats(limit)

for i := 1; i < 105; i++ {
pid := "test" + strconv.Itoa(i)
peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg)
}

peerStats.RemovePeersWhichExceedLimit(limit)

require.Equal(t, limit, peerStats.GetPeersCount())

limit = 1000
peerStats.RemovePeersWhichExceedLimit(limit)

require.Equal(t, 100, peerStats.GetPeersCount())
}

func TestAddingPeersAboveTheLimit(t *testing.T) {
limit := 100
peerStats := diagnostics.NewPeerStats(limit)

for i := 1; i < 105; i++ {
pid := "test" + strconv.Itoa(i)
peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg)
}

require.Equal(t, limit, peerStats.GetPeersCount())

peerStats.AddOrUpdatePeer("test105", mockInboundUpdMsg)

require.Equal(t, limit, peerStats.GetPeersCount())
}

0 comments on commit 2b83da1

Please sign in to comment.