Skip to content

Commit

Permalink
Diagnostics: Optimize db write (#11016)
Browse files Browse the repository at this point in the history
Fix for #10932
  • Loading branch information
dvovk authored Jul 4, 2024
1 parent fd4f1ac commit b4dd12e
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 74 deletions.
55 changes: 55 additions & 0 deletions erigon-lib/diagnostics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package diagnostics
import (
"context"
"net/http"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/c2h5oh/datasize"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -101,8 +105,59 @@ func (d *DiagnosticClient) Setup() {
d.setupBodiesDiagnostics(rootCtx)
d.setupResourcesUsageDiagnostics(rootCtx)
d.setupSpeedtestDiagnostics(rootCtx)
d.runSaveProcess(rootCtx)
d.runStopNodeListener(rootCtx)

//d.logDiagMsgs()

}

func (d *DiagnosticClient) runStopNodeListener(rootCtx context.Context) {
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
select {
case <-ch:
d.SaveData()
case <-rootCtx.Done():
}
}()
}

// Save diagnostic data by time interval to reduce save events
func (d *DiagnosticClient) runSaveProcess(rootCtx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
go func() {
for {
select {
case <-ticker.C:
d.SaveData()
case <-rootCtx.Done():
ticker.Stop()
return
}
}
}()
}

func (d *DiagnosticClient) SaveData() {
var funcs []func(tx kv.RwTx) error
funcs = append(funcs, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload), StagesListUpdater(d.syncStages), SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing))

err := d.db.Update(d.ctx, func(tx kv.RwTx) error {
for _, updater := range funcs {
updErr := updater(tx)
if updErr != nil {
return updErr
}
}

return nil
})

if err != nil {
log.Warn("Failed to save diagnostics data", "err", err)
}
}

/*func (d *DiagnosticClient) logDiagMsgs() {
Expand Down
5 changes: 3 additions & 2 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ type SegmentPeer struct {
}

type SnapshotIndexingStatistics struct {
Segments []SnapshotSegmentIndexingStatistics `json:"segments"`
TimeElapsed float64 `json:"timeElapsed"`
Segments []SnapshotSegmentIndexingStatistics `json:"segments"`
TimeElapsed float64 `json:"timeElapsed"`
IndexingFinished bool `json:"indexingFinished"`
}

type SnapshotSegmentIndexingStatistics struct {
Expand Down
73 changes: 31 additions & 42 deletions erigon-lib/diagnostics/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) {
d.syncStats.SnapshotDownload.Sys = info.Sys
d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished
d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady
d.mu.Unlock()

downloadedPercent := getPercentDownloaded(info.Downloaded, info.Total)
remainingBytes := info.Total - info.Downloaded
Expand All @@ -61,15 +62,8 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) {
Progress: downloadedPercent,
}, "Downloading snapshots")

if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot download info", "err", err)
}

d.saveSyncStagesToDB()

d.mu.Unlock()

if d.snapshotStageFinished() {
if info.DownloadFinished {
d.SaveData()
return
}
}
Expand All @@ -88,6 +82,8 @@ func getPercentDownloaded(downloaded, total uint64) string {
}

func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subStageInfo string) {
d.mu.Lock()
defer d.mu.Unlock()
idxs := d.getCurrentSyncIdxs()
if idxs.Stage == -1 || idxs.SubStage == -1 {
log.Debug("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo)
Expand All @@ -97,15 +93,6 @@ func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subSta
d.syncStages[idxs.Stage].SubStages[idxs.SubStage].Stats = stats
}

func (d *DiagnosticClient) snapshotStageFinished() bool {
idx := d.getCurrentSyncIdxs()
if idx.Stage > 0 {
return true
} else {
return false
}
}

func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context) {
go func() {
ctx, ch, closeChannel := Context[SegmentDownloadStatistics](rootCtx, 1)
Expand All @@ -132,11 +119,6 @@ func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context
} else {
d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info
}

if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot download info", "err", err)
}

d.mu.Unlock()
}
}
Expand All @@ -155,8 +137,11 @@ func (d *DiagnosticClient) runSegmentIndexingListener(rootCtx context.Context) {
return
case info := <-ch:
d.addOrUpdateSegmentIndexingState(info)
if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err)
d.updateIndexingStatus()

if d.syncStats.SnapshotIndexing.IndexingFinished {
d.SaveData()
return
}
}
}
Expand Down Expand Up @@ -192,16 +177,33 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener(rootCtx context.Co
})
}

if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err)
}

d.mu.Unlock()

d.updateIndexingStatus()
}
}
}()
}

func (d *DiagnosticClient) updateIndexingStatus() {
totalProgressPercent := 0
for _, seg := range d.syncStats.SnapshotIndexing.Segments {
totalProgressPercent += seg.Percent
}

totalProgress := totalProgressPercent / len(d.syncStats.SnapshotIndexing.Segments)

d.updateSnapshotStageStats(SyncStageStats{
TimeElapsed: SecondsToHHMMString(uint64(d.syncStats.SnapshotIndexing.TimeElapsed)),
TimeLeft: "unknown",
Progress: fmt.Sprintf("%d%%", totalProgress),
}, "Indexing snapshots")

if totalProgress >= 100 {
d.syncStats.SnapshotIndexing.IndexingFinished = true
}
}

func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingStatistics) {
d.mu.Lock()
defer d.mu.Unlock()
Expand All @@ -227,19 +229,6 @@ func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingS
}

d.syncStats.SnapshotIndexing.TimeElapsed = upd.TimeElapsed

totalProgress := 0
for _, seg := range d.syncStats.SnapshotIndexing.Segments {
totalProgress += seg.Percent
}

d.updateSnapshotStageStats(SyncStageStats{
TimeElapsed: SecondsToHHMMString(uint64(upd.TimeElapsed)),
TimeLeft: "unknown",
Progress: fmt.Sprintf("%d%%", totalProgress/len(d.syncStats.SnapshotIndexing.Segments)),
}, "Indexing snapshots")

d.saveSyncStagesToDB()
}

func (d *DiagnosticClient) runSnapshotFilesListListener(rootCtx context.Context) {
Expand Down
32 changes: 10 additions & 22 deletions erigon-lib/diagnostics/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,7 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.SetStagesList(info.StagesList)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
Expand All @@ -121,11 +117,7 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context)
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.SetCurrentSyncStage(info)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
Expand All @@ -142,11 +134,7 @@ func (d *DiagnosticClient) runCurrentSyncSubStageListener(rootCtx context.Contex
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.SetCurrentSyncSubStage(info)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
Expand All @@ -163,22 +151,12 @@ func (d *DiagnosticClient) runSubStageListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.SetSubStagesList(info.Stage, info.List)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
}

func (d *DiagnosticClient) saveSyncStagesToDB() {
if err := d.db.Update(d.ctx, StagesListUpdater(d.syncStages)); err != nil {
log.Error("[Diagnostics] Failed to update stages list", "err", err)
}
}

func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs {
currentIdxs := CurrentSyncStagesIdxs{
Stage: -1,
Expand All @@ -202,12 +180,17 @@ func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs {
}

func (d *DiagnosticClient) SetStagesList(stages []SyncStage) {
d.mu.Lock()
defer d.mu.Unlock()

if len(d.syncStages) != len(stages) {
d.syncStages = stages
}
}

func (d *DiagnosticClient) SetSubStagesList(stageId string, subStages []SyncSubStage) {
d.mu.Lock()
defer d.mu.Unlock()
for idx, stage := range d.syncStages {
if stage.ID == stageId {
if len(d.syncStages[idx].SubStages) != len(subStages) {
Expand All @@ -219,6 +202,8 @@ func (d *DiagnosticClient) SetSubStagesList(stageId string, subStages []SyncSubS
}

func (d *DiagnosticClient) SetCurrentSyncStage(css CurrentSyncStage) {
d.mu.Lock()
defer d.mu.Unlock()
isSet := false
for idx, stage := range d.syncStages {
if !isSet {
Expand Down Expand Up @@ -246,6 +231,9 @@ func (d *DiagnosticClient) setSubStagesState(stadeIdx int, state StageState) {
}

func (d *DiagnosticClient) SetCurrentSyncSubStage(css CurrentSyncSubStage) {
d.mu.Lock()
defer d.mu.Unlock()

for idx, stage := range d.syncStages {
if stage.State == Running {
for subIdx, subStage := range stage.SubStages {
Expand Down
23 changes: 15 additions & 8 deletions erigon-lib/diagnostics/sys_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@ var (

func (d *DiagnosticClient) setupSysInfoDiagnostics() {
sysInfo := GetSysInfo(d.dataDirPath)
if err := d.db.Update(d.ctx, RAMInfoUpdater(sysInfo.RAM)); err != nil {
log.Error("[Diagnostics] Failed to update RAM info", "err", err)
}

if err := d.db.Update(d.ctx, CPUInfoUpdater(sysInfo.CPU)); err != nil {
log.Error("[Diagnostics] Failed to update CPU info", "err", err)
}
var funcs []func(tx kv.RwTx) error
funcs = append(funcs, RAMInfoUpdater(sysInfo.RAM), CPUInfoUpdater(sysInfo.CPU), DiskInfoUpdater(sysInfo.Disk))

if err := d.db.Update(d.ctx, DiskInfoUpdater(sysInfo.Disk)); err != nil {
log.Error("[Diagnostics] Failed to update Disk info", "err", err)
err := d.db.Update(d.ctx, func(tx kv.RwTx) error {
for _, updater := range funcs {
updErr := updater(tx)
if updErr != nil {
return updErr
}
}

return nil
})

if err != nil {
log.Warn("[Diagnostics] Failed to update system info", "err", err)
}

d.mu.Lock()
Expand Down

0 comments on commit b4dd12e

Please sign in to comment.