Skip to content

Commit

Permalink
Diagnostics: snapshot stage info gathering (#11105)
Browse files Browse the repository at this point in the history
- added snapshot fill from DB state gathering
- refactor persisted data parsing
  • Loading branch information
dvovk committed Jul 12, 2024
1 parent b524c92 commit 81c28cd
Show file tree
Hide file tree
Showing 10 changed files with 392 additions and 119 deletions.
78 changes: 74 additions & 4 deletions erigon-lib/diagnostics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ func NewDiagnosticClient(ctx context.Context, metricsMux *http.ServeMux, dataDir
return nil, err
}

hInfo := ReadSysInfo(db)
ss := ReadSyncStages(db)
snpdwl := ReadSnapshotDownloadInfo(db)
snpidx := ReadSnapshotIndexingInfo(db)
hInfo, ss, snpdwl, snpidx, snpfd := ReadSavedData(db)

return &DiagnosticClient{
ctx: ctx,
Expand All @@ -64,6 +61,7 @@ func NewDiagnosticClient(ctx context.Context, metricsMux *http.ServeMux, dataDir
syncStats: SyncStatistics{
SnapshotDownload: snpdwl,
SnapshotIndexing: snpidx,
SnapshotFillDB: snpfd,
},
hardwareInfo: hInfo,
snapshotFileList: SnapshoFilesList{},
Expand Down Expand Up @@ -188,3 +186,75 @@ func interfaceToJSONString(i interface{}) string {
}
return string(b)
}*/

func ReadSavedData(db kv.RoDB) (hinfo HardwareInfo, ssinfo []SyncStage, snpdwl SnapshotDownloadStatistics, snpidx SnapshotIndexingStatistics, snpfd SnapshotFillDBStatistics) {
var ramBytes []byte
var cpuBytes []byte
var diskBytes []byte
var ssinfoData []byte
var snpdwlData []byte
var snpidxData []byte
var snpfdData []byte
var err error

if err := db.View(context.Background(), func(tx kv.Tx) error {
ramBytes, err = ReadRAMInfoFromTx(tx)
if err != nil {
return err
}

cpuBytes, err = ReadCPUInfoFromTx(tx)
if err != nil {
return err
}

diskBytes, err = ReadDiskInfoFromTx(tx)
if err != nil {
return err
}

ssinfoData, err = SyncStagesFromTX(tx)
if err != nil {
return err
}

snpdwlData, err = SnapshotDownloadInfoFromTx(tx)
if err != nil {
return err
}

snpidxData, err = SnapshotIndexingInfoFromTx(tx)
if err != nil {
return err
}

snpfdData, err = SnapshotFillDBInfoFromTx(tx)
if err != nil {
return err
}

return nil
}); err != nil {
return HardwareInfo{}, []SyncStage{}, SnapshotDownloadStatistics{}, SnapshotIndexingStatistics{}, SnapshotFillDBStatistics{}
}

var ramInfo RAMInfo
var cpuInfo CPUInfo
var diskInfo DiskInfo
ParseData(ramBytes, &ramInfo)
ParseData(cpuBytes, &cpuInfo)
ParseData(diskBytes, &diskInfo)

hinfo = HardwareInfo{
RAM: ramInfo,
CPU: cpuInfo,
Disk: diskInfo,
}

ParseData(ssinfoData, &ssinfo)
ParseData(snpdwlData, &snpdwl)
ParseData(snpidxData, &snpidx)
ParseData(snpfdData, &snpfd)

return hinfo, ssinfo, snpdwl, snpidx, snpfd
}
20 changes: 20 additions & 0 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type PeerStatisticMsgUpdate struct {
type SyncStatistics struct {
SnapshotDownload SnapshotDownloadStatistics `json:"snapshotDownload"`
SnapshotIndexing SnapshotIndexingStatistics `json:"snapshotIndexing"`
SnapshotFillDB SnapshotFillDBStatistics `json:"snapshotFillDB"`
BlockExecution BlockExecutionStatistics `json:"blockExecution"`
SyncFinished bool `json:"syncFinished"`
}
Expand Down Expand Up @@ -131,6 +132,21 @@ type SnapshotSegmentIndexingFinishedUpdate struct {
SegmentName string `json:"segmentName"`
}

type SnapshotFillDBStatistics struct {
Stages []SnapshotFillDBStage `json:"stages"`
}

type SnapshotFillDBStage struct {
StageName string `json:"stageName"`
Current uint64 `json:"current"`
Total uint64 `json:"total"`
}

type SnapshotFillDBStageUpdate struct {
Stage SnapshotFillDBStage `json:"stage"`
TimeElapsed float64 `json:"timeElapsed"`
}

type BlockExecutionStatistics struct {
From uint64 `json:"from"`
To uint64 `json:"to"`
Expand Down Expand Up @@ -328,3 +344,7 @@ func (ti HeaderCanonicalMarkerUpdate) Type() Type {
func (ti HeadersProcessedUpdate) Type() Type {
return TypeOf(ti)
}

func (ti SnapshotFillDBStageUpdate) Type() Type {
return TypeOf(ti)
}
2 changes: 1 addition & 1 deletion erigon-lib/diagnostics/resources_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (d *DiagnosticClient) runMemoryStatsListener(rootCtx context.Context) {
return
case info := <-ch:
d.resourcesUsageMutex.Lock()
info.StageIndex = d.getCurrentSyncIdxs()
info.StageIndex = d.GetCurrentSyncIdxs()
d.resourcesUsage.MemoryUsage = append(d.resourcesUsage.MemoryUsage, info)
d.resourcesUsageMutex.Unlock()
}
Expand Down
111 changes: 86 additions & 25 deletions erigon-lib/diagnostics/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package diagnostics

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
)

var (
SnapshotDownloadStatisticsKey = []byte("diagSnapshotDownloadStatistics")
SnapshotIndexingStatisticsKey = []byte("diagSnapshotIndexingStatistics")
SnapshotFillDBStatisticsKey = []byte("diagSnapshotFillDBStatistics")
)

func (d *DiagnosticClient) setupSnapshotDiagnostics(rootCtx context.Context) {
Expand All @@ -22,6 +23,7 @@ func (d *DiagnosticClient) setupSnapshotDiagnostics(rootCtx context.Context) {
d.runSegmentIndexingFinishedListener(rootCtx)
d.runSnapshotFilesListListener(rootCtx)
d.runFileDownloadedListener(rootCtx)
d.runFillDBListener(rootCtx)
}

func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) {
Expand Down Expand Up @@ -84,7 +86,7 @@ func getPercentDownloaded(downloaded, total uint64) string {
func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subStageInfo string) {
d.mu.Lock()
defer d.mu.Unlock()
idxs := d.getCurrentSyncIdxs()
idxs := d.GetCurrentSyncIdxs()
if idxs.Stage == -1 || idxs.SubStage == -1 {
log.Debug("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo)
return
Expand Down Expand Up @@ -336,6 +338,68 @@ func (d *DiagnosticClient) UpdateFileDownloadedStatistics(downloadedInfo *FileDo
}
}

func (d *DiagnosticClient) runFillDBListener(rootCtx context.Context) {
go func() {
ctx, ch, closeChannel := Context[SnapshotFillDBStageUpdate](rootCtx, 1)
defer closeChannel()

StartProviders(ctx, TypeOf(SnapshotFillDBStageUpdate{}), log.Root())
for {
select {
case <-rootCtx.Done():
return
case info := <-ch:
d.SetFillDBInfo(info.Stage)

totalTimeString := time.Duration(info.TimeElapsed) * time.Second

d.mu.Lock()
d.updateSnapshotStageStats(SyncStageStats{
TimeElapsed: totalTimeString.String(),
TimeLeft: "unknown",
Progress: fmt.Sprintf("%d%%", (info.Stage.Current*100)/info.Stage.Total),
}, "Fill DB from snapshots")

err := d.db.Update(d.ctx, func(tx kv.RwTx) error {
err := SnapshotFillDBUpdater(d.syncStats.SnapshotFillDB)(tx)
if err != nil {
return err
}

err = StagesListUpdater(d.syncStages)(tx)
if err != nil {
return err
}

return nil
})

if err != nil {
log.Warn("[Diagnostics] Failed to update snapshot download info", "err", err)
}
d.mu.Unlock()
}
}
}()
}

func (d *DiagnosticClient) SetFillDBInfo(info SnapshotFillDBStage) {
d.mu.Lock()
defer d.mu.Unlock()

if d.syncStats.SnapshotFillDB.Stages == nil {
d.syncStats.SnapshotFillDB.Stages = []SnapshotFillDBStage{info}
} else {

for idx, stg := range d.syncStats.SnapshotFillDB.Stages {
if stg.StageName == info.StageName {
d.syncStats.SnapshotFillDB.Stages[idx] = info
break
}
}
}
}

func (d *DiagnosticClient) SyncStatistics() SyncStatistics {
return d.syncStats
}
Expand All @@ -344,38 +408,31 @@ func (d *DiagnosticClient) SnapshotFilesList() SnapshoFilesList {
return d.snapshotFileList
}

func ReadSnapshotDownloadInfo(db kv.RoDB) (info SnapshotDownloadStatistics) {
data := ReadDataFromTable(db, kv.DiagSyncStages, SnapshotDownloadStatisticsKey)

if len(data) == 0 {
return SnapshotDownloadStatistics{}
}

err := json.Unmarshal(data, &info)

func SnapshotDownloadInfoFromTx(tx kv.Tx) ([]byte, error) {
bytes, err := ReadDataFromTable(tx, kv.DiagSyncStages, SnapshotDownloadStatisticsKey)
if err != nil {
log.Error("[Diagnostics] Failed to read snapshot download info", "err", err)
return SnapshotDownloadStatistics{}
} else {
return info
return nil, err
}
}

func ReadSnapshotIndexingInfo(db kv.RoDB) (info SnapshotIndexingStatistics) {
data := ReadDataFromTable(db, kv.DiagSyncStages, SnapshotIndexingStatisticsKey)
return common.CopyBytes(bytes), nil
}

if len(data) == 0 {
return SnapshotIndexingStatistics{}
func SnapshotIndexingInfoFromTx(tx kv.Tx) ([]byte, error) {
bytes, err := ReadDataFromTable(tx, kv.DiagSyncStages, SnapshotIndexingStatisticsKey)
if err != nil {
return nil, err
}

err := json.Unmarshal(data, &info)
return common.CopyBytes(bytes), nil
}

func SnapshotFillDBInfoFromTx(tx kv.Tx) ([]byte, error) {
bytes, err := ReadDataFromTable(tx, kv.DiagSyncStages, SnapshotFillDBStatisticsKey)
if err != nil {
log.Error("[Diagnostics] Failed to read snapshot indexing info", "err", err)
return SnapshotIndexingStatistics{}
} else {
return info
return nil, err
}

return common.CopyBytes(bytes), nil
}

func SnapshotDownloadUpdater(info SnapshotDownloadStatistics) func(tx kv.RwTx) error {
Expand All @@ -385,3 +442,7 @@ func SnapshotDownloadUpdater(info SnapshotDownloadStatistics) func(tx kv.RwTx) e
func SnapshotIndexingUpdater(info SnapshotIndexingStatistics) func(tx kv.RwTx) error {
return PutDataToTable(kv.DiagSyncStages, SnapshotIndexingStatisticsKey, info)
}

func SnapshotFillDBUpdater(info SnapshotFillDBStatistics) func(tx kv.RwTx) error {
return PutDataToTable(kv.DiagSyncStages, SnapshotFillDBStatisticsKey, info)
}
44 changes: 31 additions & 13 deletions erigon-lib/diagnostics/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package diagnostics

import (
"context"
"encoding/json"
"fmt"

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
)
Expand Down Expand Up @@ -157,7 +158,7 @@ func (d *DiagnosticClient) runSubStageListener(rootCtx context.Context) {
}()
}

func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs {
func (d *DiagnosticClient) GetCurrentSyncIdxs() CurrentSyncStagesIdxs {
currentIdxs := CurrentSyncStagesIdxs{
Stage: -1,
SubStage: -1,
Expand Down Expand Up @@ -201,9 +202,18 @@ func (d *DiagnosticClient) SetSubStagesList(stageId string, subStages []SyncSubS
}
}

func (d *DiagnosticClient) SetCurrentSyncStage(css CurrentSyncStage) {
func (d *DiagnosticClient) SetCurrentSyncStage(css CurrentSyncStage) error {
d.mu.Lock()
defer d.mu.Unlock()
stageState, err := d.GetStageState(css.Stage)
if err != nil {
return err
}

if stageState == Completed {
return nil
}

isSet := false
for idx, stage := range d.syncStages {
if !isSet {
Expand All @@ -217,6 +227,8 @@ func (d *DiagnosticClient) SetCurrentSyncStage(css CurrentSyncStage) {
d.setStagesState(idx, Queued)
}
}

return nil
}

func (d *DiagnosticClient) setStagesState(stadeIdx int, state StageState) {
Expand Down Expand Up @@ -255,22 +267,28 @@ func (d *DiagnosticClient) SetCurrentSyncSubStage(css CurrentSyncSubStage) {
}
}

func ReadSyncStages(db kv.RoDB) []SyncStage {
data := ReadDataFromTable(db, kv.DiagSyncStages, StagesListKey)
func (d *DiagnosticClient) GetStageState(stageId string) (StageState, error) {
for _, stage := range d.syncStages {
if stage.ID == stageId {
return stage.State, nil
}
}

if len(data) == 0 {
return []SyncStage{}
stagesIdsList := make([]string, 0, len(d.syncStages))
for _, stage := range d.syncStages {
stagesIdsList = append(stagesIdsList, stage.ID)
}

var info []SyncStage
err := json.Unmarshal(data, &info)
return 0, fmt.Errorf("stage %s not found in stages list %s", stageId, stagesIdsList)
}

func SyncStagesFromTX(tx kv.Tx) ([]byte, error) {
bytes, err := ReadDataFromTable(tx, kv.DiagSyncStages, StagesListKey)
if err != nil {
log.Error("[Diagnostics] Failed to read stages list", "err", err)
return []SyncStage{}
} else {
return info
return nil, err
}

return common.CopyBytes(bytes), nil
}

func StagesListUpdater(info []SyncStage) func(tx kv.RwTx) error {
Expand Down
Loading

0 comments on commit 81c28cd

Please sign in to comment.