diff --git a/erigon-lib/diagnostics/client.go b/erigon-lib/diagnostics/client.go index 5e0ca6a5c4f..b9de0a86f3d 100644 --- a/erigon-lib/diagnostics/client.go +++ b/erigon-lib/diagnostics/client.go @@ -49,10 +49,7 @@ func NewDiagnosticClient(ctx context.Context, metricsMux *http.ServeMux, dataDir return nil, err } - hInfo := ReadSysInfo(db) - ss := ReadSyncStages(db) - snpdwl := ReadSnapshotDownloadInfo(db) - snpidx := ReadSnapshotIndexingInfo(db) + hInfo, ss, snpdwl, snpidx, snpfd := ReadSavedData(db) return &DiagnosticClient{ ctx: ctx, @@ -64,6 +61,7 @@ func NewDiagnosticClient(ctx context.Context, metricsMux *http.ServeMux, dataDir syncStats: SyncStatistics{ SnapshotDownload: snpdwl, SnapshotIndexing: snpidx, + SnapshotFillDB: snpfd, }, hardwareInfo: hInfo, snapshotFileList: SnapshoFilesList{}, @@ -188,3 +186,75 @@ func interfaceToJSONString(i interface{}) string { } return string(b) }*/ + +func ReadSavedData(db kv.RoDB) (hinfo HardwareInfo, ssinfo []SyncStage, snpdwl SnapshotDownloadStatistics, snpidx SnapshotIndexingStatistics, snpfd SnapshotFillDBStatistics) { + var ramBytes []byte + var cpuBytes []byte + var diskBytes []byte + var ssinfoData []byte + var snpdwlData []byte + var snpidxData []byte + var snpfdData []byte + var err error + + if err := db.View(context.Background(), func(tx kv.Tx) error { + ramBytes, err = ReadRAMInfoFromTx(tx) + if err != nil { + return err + } + + cpuBytes, err = ReadCPUInfoFromTx(tx) + if err != nil { + return err + } + + diskBytes, err = ReadDiskInfoFromTx(tx) + if err != nil { + return err + } + + ssinfoData, err = SyncStagesFromTX(tx) + if err != nil { + return err + } + + snpdwlData, err = SnapshotDownloadInfoFromTx(tx) + if err != nil { + return err + } + + snpidxData, err = SnapshotIndexingInfoFromTx(tx) + if err != nil { + return err + } + + snpfdData, err = SnapshotFillDBInfoFromTx(tx) + if err != nil { + return err + } + + return nil + }); err != nil { + return HardwareInfo{}, []SyncStage{}, SnapshotDownloadStatistics{}, SnapshotIndexingStatistics{}, SnapshotFillDBStatistics{} + } + + var ramInfo RAMInfo + var cpuInfo CPUInfo + var diskInfo DiskInfo + ParseData(ramBytes, &ramInfo) + ParseData(cpuBytes, &cpuInfo) + ParseData(diskBytes, &diskInfo) + + hinfo = HardwareInfo{ + RAM: ramInfo, + CPU: cpuInfo, + Disk: diskInfo, + } + + ParseData(ssinfoData, &ssinfo) + ParseData(snpdwlData, &snpdwl) + ParseData(snpidxData, &snpidx) + ParseData(snpfdData, &snpfd) + + return hinfo, ssinfo, snpdwl, snpidx, snpfd +} diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go index e9200bf25bc..0d8f29534c7 100644 --- a/erigon-lib/diagnostics/entities.go +++ b/erigon-lib/diagnostics/entities.go @@ -69,6 +69,7 @@ type PeerStatisticMsgUpdate struct { type SyncStatistics struct { SnapshotDownload SnapshotDownloadStatistics `json:"snapshotDownload"` SnapshotIndexing SnapshotIndexingStatistics `json:"snapshotIndexing"` + SnapshotFillDB SnapshotFillDBStatistics `json:"snapshotFillDB"` BlockExecution BlockExecutionStatistics `json:"blockExecution"` SyncFinished bool `json:"syncFinished"` } @@ -131,6 +132,21 @@ type SnapshotSegmentIndexingFinishedUpdate struct { SegmentName string `json:"segmentName"` } +type SnapshotFillDBStatistics struct { + Stages []SnapshotFillDBStage `json:"stages"` +} + +type SnapshotFillDBStage struct { + StageName string `json:"stageName"` + Current uint64 `json:"current"` + Total uint64 `json:"total"` +} + +type SnapshotFillDBStageUpdate struct { + Stage SnapshotFillDBStage `json:"stage"` + TimeElapsed float64 `json:"timeElapsed"` +} + type BlockExecutionStatistics struct { From uint64 `json:"from"` To uint64 `json:"to"` @@ -328,3 +344,7 @@ func (ti HeaderCanonicalMarkerUpdate) Type() Type { func (ti HeadersProcessedUpdate) Type() Type { return TypeOf(ti) } + +func (ti SnapshotFillDBStageUpdate) Type() Type { + return TypeOf(ti) +} diff --git a/erigon-lib/diagnostics/resources_usage.go b/erigon-lib/diagnostics/resources_usage.go index 36064ce44b8..65ec98e442e 100644 --- a/erigon-lib/diagnostics/resources_usage.go +++ b/erigon-lib/diagnostics/resources_usage.go @@ -31,7 +31,7 @@ func (d *DiagnosticClient) runMemoryStatsListener(rootCtx context.Context) { return case info := <-ch: d.resourcesUsageMutex.Lock() - info.StageIndex = d.getCurrentSyncIdxs() + info.StageIndex = d.GetCurrentSyncIdxs() d.resourcesUsage.MemoryUsage = append(d.resourcesUsage.MemoryUsage, info) d.resourcesUsageMutex.Unlock() } diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index 85579efebaa..4d374b26172 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -2,10 +2,10 @@ package diagnostics import ( "context" - "encoding/json" "fmt" "time" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" ) @@ -13,6 +13,7 @@ import ( var ( SnapshotDownloadStatisticsKey = []byte("diagSnapshotDownloadStatistics") SnapshotIndexingStatisticsKey = []byte("diagSnapshotIndexingStatistics") + SnapshotFillDBStatisticsKey = []byte("diagSnapshotFillDBStatistics") ) func (d *DiagnosticClient) setupSnapshotDiagnostics(rootCtx context.Context) { @@ -22,6 +23,7 @@ func (d *DiagnosticClient) setupSnapshotDiagnostics(rootCtx context.Context) { d.runSegmentIndexingFinishedListener(rootCtx) d.runSnapshotFilesListListener(rootCtx) d.runFileDownloadedListener(rootCtx) + d.runFillDBListener(rootCtx) } func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) { @@ -84,7 +86,7 @@ func getPercentDownloaded(downloaded, total uint64) string { func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subStageInfo string) { d.mu.Lock() defer d.mu.Unlock() - idxs := d.getCurrentSyncIdxs() + idxs := d.GetCurrentSyncIdxs() if idxs.Stage == -1 || idxs.SubStage == -1 { log.Debug("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo) return @@ -336,6 +338,68 @@ func (d *DiagnosticClient) UpdateFileDownloadedStatistics(downloadedInfo *FileDo } } +func (d *DiagnosticClient) runFillDBListener(rootCtx context.Context) { + go func() { + ctx, ch, closeChannel := Context[SnapshotFillDBStageUpdate](rootCtx, 1) + defer closeChannel() + + StartProviders(ctx, TypeOf(SnapshotFillDBStageUpdate{}), log.Root()) + for { + select { + case <-rootCtx.Done(): + return + case info := <-ch: + d.SetFillDBInfo(info.Stage) + + totalTimeString := time.Duration(info.TimeElapsed) * time.Second + + d.mu.Lock() + d.updateSnapshotStageStats(SyncStageStats{ + TimeElapsed: totalTimeString.String(), + TimeLeft: "unknown", + Progress: fmt.Sprintf("%d%%", (info.Stage.Current*100)/info.Stage.Total), + }, "Fill DB from snapshots") + + err := d.db.Update(d.ctx, func(tx kv.RwTx) error { + err := SnapshotFillDBUpdater(d.syncStats.SnapshotFillDB)(tx) + if err != nil { + return err + } + + err = StagesListUpdater(d.syncStages)(tx) + if err != nil { + return err + } + + return nil + }) + + if err != nil { + log.Warn("[Diagnostics] Failed to update snapshot download info", "err", err) + } + d.mu.Unlock() + } + } + }() +} + +func (d *DiagnosticClient) SetFillDBInfo(info SnapshotFillDBStage) { + d.mu.Lock() + defer d.mu.Unlock() + + if d.syncStats.SnapshotFillDB.Stages == nil { + d.syncStats.SnapshotFillDB.Stages = []SnapshotFillDBStage{info} + } else { + + for idx, stg := range d.syncStats.SnapshotFillDB.Stages { + if stg.StageName == info.StageName { + d.syncStats.SnapshotFillDB.Stages[idx] = info + break + } + } + } +} + func (d *DiagnosticClient) SyncStatistics() SyncStatistics { return d.syncStats } @@ -344,38 +408,31 @@ func (d *DiagnosticClient) SnapshotFilesList() SnapshoFilesList { return d.snapshotFileList } -func ReadSnapshotDownloadInfo(db kv.RoDB) (info SnapshotDownloadStatistics) { - data := ReadDataFromTable(db, kv.DiagSyncStages, SnapshotDownloadStatisticsKey) - - if len(data) == 0 { - return SnapshotDownloadStatistics{} - } - - err := json.Unmarshal(data, &info) - +func SnapshotDownloadInfoFromTx(tx kv.Tx) ([]byte, error) { + bytes, err := ReadDataFromTable(tx, kv.DiagSyncStages, SnapshotDownloadStatisticsKey) if err != nil { - log.Error("[Diagnostics] Failed to read snapshot download info", "err", err) - return SnapshotDownloadStatistics{} - } else { - return info + return nil, err } -} -func ReadSnapshotIndexingInfo(db kv.RoDB) (info SnapshotIndexingStatistics) { - data := ReadDataFromTable(db, kv.DiagSyncStages, SnapshotIndexingStatisticsKey) + return common.CopyBytes(bytes), nil +} - if len(data) == 0 { - return SnapshotIndexingStatistics{} +func SnapshotIndexingInfoFromTx(tx kv.Tx) ([]byte, error) { + bytes, err := ReadDataFromTable(tx, kv.DiagSyncStages, SnapshotIndexingStatisticsKey) + if err != nil { + return nil, err } - err := json.Unmarshal(data, &info) + return common.CopyBytes(bytes), nil +} +func SnapshotFillDBInfoFromTx(tx kv.Tx) ([]byte, error) { + bytes, err := ReadDataFromTable(tx, kv.DiagSyncStages, SnapshotFillDBStatisticsKey) if err != nil { - log.Error("[Diagnostics] Failed to read snapshot indexing info", "err", err) - return SnapshotIndexingStatistics{} - } else { - return info + return nil, err } + + return common.CopyBytes(bytes), nil } func SnapshotDownloadUpdater(info SnapshotDownloadStatistics) func(tx kv.RwTx) error { @@ -385,3 +442,7 @@ func SnapshotDownloadUpdater(info SnapshotDownloadStatistics) func(tx kv.RwTx) e func SnapshotIndexingUpdater(info SnapshotIndexingStatistics) func(tx kv.RwTx) error { return PutDataToTable(kv.DiagSyncStages, SnapshotIndexingStatisticsKey, info) } + +func SnapshotFillDBUpdater(info SnapshotFillDBStatistics) func(tx kv.RwTx) error { + return PutDataToTable(kv.DiagSyncStages, SnapshotFillDBStatisticsKey, info) +} diff --git a/erigon-lib/diagnostics/stages.go b/erigon-lib/diagnostics/stages.go index 47bae6d807c..ac911fa0934 100644 --- a/erigon-lib/diagnostics/stages.go +++ b/erigon-lib/diagnostics/stages.go @@ -2,8 +2,9 @@ package diagnostics import ( "context" - "encoding/json" + "fmt" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" ) @@ -157,7 +158,7 @@ func (d *DiagnosticClient) runSubStageListener(rootCtx context.Context) { }() } -func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs { +func (d *DiagnosticClient) GetCurrentSyncIdxs() CurrentSyncStagesIdxs { currentIdxs := CurrentSyncStagesIdxs{ Stage: -1, SubStage: -1, @@ -201,9 +202,18 @@ func (d *DiagnosticClient) SetSubStagesList(stageId string, subStages []SyncSubS } } -func (d *DiagnosticClient) SetCurrentSyncStage(css CurrentSyncStage) { +func (d *DiagnosticClient) SetCurrentSyncStage(css CurrentSyncStage) error { d.mu.Lock() defer d.mu.Unlock() + stageState, err := d.GetStageState(css.Stage) + if err != nil { + return err + } + + if stageState == Completed { + return nil + } + isSet := false for idx, stage := range d.syncStages { if !isSet { @@ -217,6 +227,8 @@ func (d *DiagnosticClient) SetCurrentSyncStage(css CurrentSyncStage) { d.setStagesState(idx, Queued) } } + + return nil } func (d *DiagnosticClient) setStagesState(stadeIdx int, state StageState) { @@ -255,22 +267,28 @@ func (d *DiagnosticClient) SetCurrentSyncSubStage(css CurrentSyncSubStage) { } } -func ReadSyncStages(db kv.RoDB) []SyncStage { - data := ReadDataFromTable(db, kv.DiagSyncStages, StagesListKey) +func (d *DiagnosticClient) GetStageState(stageId string) (StageState, error) { + for _, stage := range d.syncStages { + if stage.ID == stageId { + return stage.State, nil + } + } - if len(data) == 0 { - return []SyncStage{} + stagesIdsList := make([]string, 0, len(d.syncStages)) + for _, stage := range d.syncStages { + stagesIdsList = append(stagesIdsList, stage.ID) } - var info []SyncStage - err := json.Unmarshal(data, &info) + return 0, fmt.Errorf("stage %s not found in stages list %s", stageId, stagesIdsList) +} +func SyncStagesFromTX(tx kv.Tx) ([]byte, error) { + bytes, err := ReadDataFromTable(tx, kv.DiagSyncStages, StagesListKey) if err != nil { - log.Error("[Diagnostics] Failed to read stages list", "err", err) - return []SyncStage{} - } else { - return info + return nil, err } + + return common.CopyBytes(bytes), nil } func StagesListUpdater(info []SyncStage) func(tx kv.RwTx) error { diff --git a/erigon-lib/diagnostics/stages_test.go b/erigon-lib/diagnostics/stages_test.go index 2f49816a5fd..9be4be75c17 100644 --- a/erigon-lib/diagnostics/stages_test.go +++ b/erigon-lib/diagnostics/stages_test.go @@ -1,3 +1,19 @@ +// Copyright 2024 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + package diagnostics_test import ( @@ -31,17 +47,25 @@ func TestSetCurrentSyncStage(t *testing.T) { subStages := diagnostics.InitSubStagesFromList(snapshotsSubStages) d.SetSubStagesList("Snapshots", subStages) - d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "Snapshots"}) + err = d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "Snapshots"}) + require.NoError(t, err) require.Equal(t, d.GetSyncStages()[0].State, diagnostics.Running) - d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "BlockHashes"}) + err = d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "BlockHashes"}) + require.NoError(t, err) require.Equal(t, d.GetSyncStages()[0].State, diagnostics.Completed) require.Equal(t, d.GetSyncStages()[1].State, diagnostics.Running) - d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "Snapshots"}) - require.Equal(t, d.GetSyncStages()[0].State, diagnostics.Running) - require.Equal(t, d.GetSyncStages()[1].State, diagnostics.Queued) + err = d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "Snapshots"}) + require.NoError(t, err) + require.Equal(t, d.GetSyncStages()[0].State, diagnostics.Completed) + require.Equal(t, d.GetSyncStages()[1].State, diagnostics.Running) require.Equal(t, d.GetSyncStages()[2].State, diagnostics.Queued) + + //test not existed stage + err = d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "NotExistedStage"}) + require.Error(t, err) + } func TestSetCurrentSyncSubStage(t *testing.T) { @@ -53,7 +77,8 @@ func TestSetCurrentSyncSubStage(t *testing.T) { subStages := diagnostics.InitSubStagesFromList(snapshotsSubStages) d.SetSubStagesList("Snapshots", subStages) - d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "Snapshots"}) + err = d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "Snapshots"}) + require.NoError(t, err) d.SetCurrentSyncSubStage(diagnostics.CurrentSyncSubStage{SubStage: "Download header-chain"}) require.Equal(t, d.GetSyncStages()[0].SubStages[0].State, diagnostics.Running) @@ -67,6 +92,66 @@ func TestSetCurrentSyncSubStage(t *testing.T) { require.Equal(t, d.GetSyncStages()[0].SubStages[2].State, diagnostics.Queued) } +func TestGetStageState(t *testing.T) { + d, err := NewTestDiagnosticClient() + require.NoError(t, err) + + stages := diagnostics.InitStagesFromList(nodeStages) + d.SetStagesList(stages) + + // Test get stage state + for _, stageId := range nodeStages { + state, err := d.GetStageState(stageId) + require.NoError(t, err) + require.Equal(t, state, diagnostics.Queued) + } + + //Test get not existed stage state + _, err = d.GetStageState("NotExistedStage") + require.Error(t, err) + + //Test Snapshots Running state + err = d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "Snapshots"}) + require.NoError(t, err) + state, err := d.GetStageState("Snapshots") + require.NoError(t, err) + require.Equal(t, state, diagnostics.Running) + + //Test Snapshots Completed and BlockHashes running state + err = d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "BlockHashes"}) + require.NoError(t, err) + state, err = d.GetStageState("Snapshots") + require.NoError(t, err) + require.Equal(t, state, diagnostics.Completed) + state, err = d.GetStageState("BlockHashes") + require.NoError(t, err) + require.Equal(t, state, diagnostics.Running) +} + +func TestGetStageIndexes(t *testing.T) { + d, err := NewTestDiagnosticClient() + require.NoError(t, err) + + stages := diagnostics.InitStagesFromList(nodeStages) + d.SetStagesList(stages) + subStages := diagnostics.InitSubStagesFromList(snapshotsSubStages) + d.SetSubStagesList("Snapshots", subStages) + + err = d.SetCurrentSyncStage(diagnostics.CurrentSyncStage{Stage: "Snapshots"}) + require.NoError(t, err) + d.SetCurrentSyncSubStage(diagnostics.CurrentSyncSubStage{SubStage: "Download header-chain"}) + + idxs := d.GetCurrentSyncIdxs() + require.Equal(t, idxs, diagnostics.CurrentSyncStagesIdxs{Stage: 0, SubStage: 0}) +} + +func TestStagesState(t *testing.T) { + //Test StageState to string + require.Equal(t, diagnostics.StageState(0).String(), "Queued") + require.Equal(t, diagnostics.StageState(1).String(), "Running") + require.Equal(t, diagnostics.StageState(2).String(), "Completed") +} + var ( nodeStages = []string{"Snapshots", "BlockHashes", "Senders"} snapshotsSubStages = []string{"Download header-chain", "Download snapshots", "Indexing", "Fill DB"} diff --git a/erigon-lib/diagnostics/sys_info.go b/erigon-lib/diagnostics/sys_info.go index 68a7ce7d4c1..b470eab2aba 100644 --- a/erigon-lib/diagnostics/sys_info.go +++ b/erigon-lib/diagnostics/sys_info.go @@ -1,12 +1,11 @@ package diagnostics import ( - "encoding/json" - "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/mem" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/diskutils" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" @@ -136,70 +135,31 @@ func GetCPUInfo() CPUInfo { } } -func ReadSysInfo(db kv.RoDB) (info HardwareInfo) { - ram := ReadRAMInfo(db) - cpu := ReadCPUInfo(db) - disk := ReadDickInfo(db) - - return HardwareInfo{ - RAM: ram, - CPU: cpu, - Disk: disk, - } -} - -func ReadRAMInfo(db kv.RoDB) RAMInfo { - data := ReadDataFromTable(db, kv.DiagSystemInfo, SystemRamInfoKey) - - if len(data) == 0 { - return RAMInfo{} - } - - var info RAMInfo - err := json.Unmarshal(data, &info) - +func ReadRAMInfoFromTx(tx kv.Tx) ([]byte, error) { + bytes, err := ReadDataFromTable(tx, kv.DiagSystemInfo, SystemRamInfoKey) if err != nil { - log.Error("[Diagnostics] Failed to read RAM info", "err", err) - return RAMInfo{} - } else { - return info + return nil, err } -} -func ReadCPUInfo(db kv.RoDB) CPUInfo { - data := ReadDataFromTable(db, kv.DiagSystemInfo, SystemCpuInfoKey) - - if len(data) == 0 { - return CPUInfo{} - } - - var info CPUInfo - err := json.Unmarshal(data, &info) - - if err != nil { - log.Error("[Diagnostics] Failed to read CPU info", "err", err) - return CPUInfo{} - } else { - return info - } + return common.CopyBytes(bytes), nil } -func ReadDickInfo(db kv.RoDB) DiskInfo { - data := ReadDataFromTable(db, kv.DiagSystemInfo, SystemDiskInfoKey) - - if len(data) == 0 { - return DiskInfo{} +func ReadCPUInfoFromTx(tx kv.Tx) ([]byte, error) { + bytes, err := ReadDataFromTable(tx, kv.DiagSystemInfo, SystemCpuInfoKey) + if err != nil { + return nil, err } - var info DiskInfo - err := json.Unmarshal(data, &info) + return common.CopyBytes(bytes), nil +} +func ReadDiskInfoFromTx(tx kv.Tx) ([]byte, error) { + bytes, err := ReadDataFromTable(tx, kv.DiagSystemInfo, SystemDiskInfoKey) if err != nil { - log.Error("[Diagnostics] Failed to read Disk info", "err", err) - return DiskInfo{} - } else { - return info + return nil, err } + + return common.CopyBytes(bytes), nil } func RAMInfoUpdater(info RAMInfo) func(tx kv.RwTx) error { diff --git a/erigon-lib/diagnostics/utils.go b/erigon-lib/diagnostics/utils.go index 055bcb9ee71..fd6eef81fb7 100644 --- a/erigon-lib/diagnostics/utils.go +++ b/erigon-lib/diagnostics/utils.go @@ -1,28 +1,22 @@ package diagnostics import ( - "context" "encoding/json" "fmt" + "reflect" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/log/v3" ) -func ReadDataFromTable(db kv.RoDB, table string, key []byte) (data []byte) { - if err := db.View(context.Background(), func(tx kv.Tx) error { - bytes, err := tx.GetOne(table, key) +func ReadDataFromTable(tx kv.Tx, table string, key []byte) ([]byte, error) { + bytes, err := tx.GetOne(table, key) - if err != nil { - return err - } - - data = bytes - - return nil - }); err != nil { - return []byte{} + if err != nil { + return nil, err } - return data + + return bytes, nil } func PutDataToTable(table string, key []byte, info any) func(tx kv.RwTx) error { @@ -89,3 +83,14 @@ func SecondsToHHMMString(seconds uint64) string { return fmt.Sprintf("%dhrs:%dm", hours, minutes) } + +func ParseData(data []byte, v interface{}) { + if len(data) == 0 { + return + } + + err := json.Unmarshal(data, &v) + if err != nil { + log.Warn("[Diagnostics] Failed to parse data", "data", string(data), "type", reflect.TypeOf(v)) + } +} diff --git a/erigon-lib/diagnostics/utils_test.go b/erigon-lib/diagnostics/utils_test.go new file mode 100644 index 00000000000..1ca8f11bc9d --- /dev/null +++ b/erigon-lib/diagnostics/utils_test.go @@ -0,0 +1,27 @@ +package diagnostics_test + +import ( + "encoding/json" + "testing" + + "github.com/ledgerwatch/erigon-lib/diagnostics" + "github.com/stretchr/testify/require" +) + +func TestParseData(t *testing.T) { + var data []byte + var v diagnostics.RAMInfo + diagnostics.ParseData(data, v) + require.Equal(t, diagnostics.RAMInfo{}, v) + + newv := diagnostics.RAMInfo{ + Total: 1, + Free: 2, + } + + data, err := json.Marshal(newv) + require.NoError(t, err) + + diagnostics.ParseData(data, &v) + require.Equal(t, newv, v) +} diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index bbdbaa78ec7..bf90eaa1ea9 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -309,6 +309,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, dirs datadir.Dirs, blockReader services.FullBlockReader, agg *state.Aggregator, logger log.Logger) error { + startTime := time.Now() blocksAvailable := blockReader.FrozenBlocks() logEvery := time.NewTicker(logInterval) defer logEvery.Stop() @@ -355,6 +356,14 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, dirs case <-ctx.Done(): return ctx.Err() case <-logEvery.C: + diagnostics.Send(diagnostics.SnapshotFillDBStageUpdate{ + Stage: diagnostics.SnapshotFillDBStage{ + StageName: string(stage), + Current: header.Number.Uint64(), + Total: blocksAvailable, + }, + TimeElapsed: time.Since(startTime).Seconds(), + }) logger.Info(fmt.Sprintf("[%s] Total difficulty index: %dk/%dk", logPrefix, header.Number.Uint64()/1000, blockReader.FrozenBlocks()/1000)) default: } @@ -394,6 +403,14 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, dirs case <-ctx.Done(): return ctx.Err() case <-logEvery.C: + diagnostics.Send(diagnostics.SnapshotFillDBStageUpdate{ + Stage: diagnostics.SnapshotFillDBStage{ + StageName: string(stage), + Current: blockNum, + Total: blocksAvailable, + }, + TimeElapsed: time.Since(startTime).Seconds(), + }) logger.Info(fmt.Sprintf("[%s] MaxTxNums index: %dk/%dk", logPrefix, blockNum/1000, blockReader.FrozenBlocks()/1000)) default: } @@ -419,6 +436,16 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, dirs if err := rawdb.WriteSnapshots(tx, blockReader.FrozenFiles(), agg.Files()); err != nil { return err } + + default: + diagnostics.Send(diagnostics.SnapshotFillDBStageUpdate{ + Stage: diagnostics.SnapshotFillDBStage{ + StageName: string(stage), + Current: blocksAvailable, // as we are done with other stages + Total: blocksAvailable, + }, + TimeElapsed: time.Since(startTime).Seconds(), + }) } } return nil