Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added blob sidecars snapshots for sepolia #9766

Merged
merged 40 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions cl/phase1/stages/stage_history_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co

var currEth1Progress atomic.Int64

bytesReadInTotal := atomic.Uint64{}
destinationSlotForEL := uint64(math.MaxUint64)
if cfg.engine != nil && cfg.engine.SupportInsertion() && cfg.beaconCfg.DenebForkEpoch != math.MaxUint64 {
destinationSlotForEL = cfg.beaconCfg.BellatrixForkEpoch * cfg.beaconCfg.SlotsPerEpoch
Expand All @@ -109,8 +108,6 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co

destinationSlotForCL := cfg.sn.SegmentsMax()

bytesReadInTotal.Add(uint64(blk.EncodingSizeSSZ()))

slot := blk.Block.Slot
if destinationSlotForCL <= blk.Block.Slot {
if err := beacon_indicies.WriteBeaconBlockAndIndicies(ctx, tx, blk, true); err != nil {
Expand Down Expand Up @@ -154,6 +151,8 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
finishCh := make(chan struct{})
// Start logging thread

isBackfilling := atomic.Bool{}

go func() {
logInterval := time.NewTicker(logIntervalTime)
defer logInterval.Stop()
Expand All @@ -177,24 +176,21 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
ratio := float64(logTime / time.Second)
speed := blockProgress / ratio
prevProgress = currProgress
peerCount, err := cfg.downloader.Peers()
if err != nil {
log.Debug("could not get peer count", "err", err)
continue
}

if speed == 0 {
continue
}
logArgs = append(logArgs,
"slot", currProgress,
"blockNumber", currEth1Progress.Load(),
"blk/sec", fmt.Sprintf("%.1f", speed),
"mbps/sec", fmt.Sprintf("%.1f", float64(bytesReadInTotal.Load())/(1000*1000*ratio)),
"peers", peerCount,
"snapshots", cfg.sn.SegmentsMax(),
)
bytesReadInTotal.Store(0)
logger.Info("Backfilling History", logArgs...)
logMsg := "Node is still syncing... downloading past blocks"
if isBackfilling.Load() {
logMsg = "Node has finished syncing... full history is being downloaded for archiving purposes"
}
logger.Info(logMsg, logArgs...)
case <-finishCh:
return
case <-ctx.Done():
Expand All @@ -211,7 +207,11 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
}
}
cfg.antiquary.NotifyBackfilled()
log.Info("Backfilling finished")
if cfg.backfilling {
cfg.logger.Info("full backfilling finished")
} else {
cfg.logger.Info("Missing blocks download finished (note: this does not mean that the history is complete, only that the missing blocks need for sync have been downloaded)")
}

close(finishCh)
if cfg.blobsBackfilling {
Expand Down Expand Up @@ -240,6 +240,7 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co
return err
}
defer tx2.Rollback()
isBackfilling.Store(true)

cfg.logger.Info("Ready to insert history, waiting for sync cycle to finish")

Expand Down
34 changes: 31 additions & 3 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, dirs datadir.Dirs, logger
var stats AggStats

lock, err := getSnapshotLock(ctx, cfg, db, &stats, mutex, logger)

if err != nil {
return nil, fmt.Errorf("can't initialize snapshot lock: %w", err)
}
Expand Down Expand Up @@ -658,7 +657,7 @@ func (d *Downloader) mainLoop(silent bool) error {
go func() {
defer d.wg.Done()
d.webseeds.Discover(d.ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles, d.cfg.Dirs.Snap)
// webseeds.Discover may create new .torrent files on disk
// apply webseeds to existing torrents
if err := d.addTorrentFilesFromDisk(true); err != nil && !errors.Is(err, context.Canceled) {
d.logger.Warn("[snapshots] addTorrentFilesFromDisk", "err", err)
}
Expand Down Expand Up @@ -2140,8 +2139,12 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
if d.alreadyHaveThisName(name) || !IsSnapNameAllowed(name) {
return nil
}
isProhibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
if err != nil {
return err
}

if d.torrentFiles.newDownloadsAreProhibited() && !d.torrentFiles.Exists(name) {
if isProhibited && !d.torrentFiles.Exists(name) {
return nil
}

Expand All @@ -2168,8 +2171,33 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
case <-ctx.Done():
return
case <-t.GotInfo():
case <-time.After(30 * time.Second): //fallback to r2
// TOOD: handle errors
// TOOD: add `d.webseeds.Complete` chan - to prevent race - Discover is also async
// TOOD: maybe run it in goroutine and return channel - to select with p2p

ok, err := d.webseeds.DownloadAndSaveTorrentFile(ctx, name)
if ok && err == nil {
ts, err := d.torrentFiles.LoadByPath(filepath.Join(d.SnapDir(), name+".torrent"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can move d.torrentFiles.LoadByPath inside d.webseeds.DownloadAndSaveTorrentFile and return ts as 3rd param.

if err != nil {
return
}
_, _, err = addTorrentFile(ctx, ts, d.torrentClient, d.db, d.webseeds)
if err != nil {
return
}
return
}

// wait for p2p
select {
case <-ctx.Done():
return
case <-t.GotInfo():
}
}

//TODO: remove whitelist check - Erigon may send us new seedable files
if !d.snapshotLock.Downloads.Contains(name) {
mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(d.SnapDir(), t.Info(), &mi, d.torrentFiles); err != nil {
Expand Down
7 changes: 2 additions & 5 deletions erigon-lib/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,8 @@ type GrpcServer struct {
d *Downloader
}

func (s *GrpcServer) ProhibitNewDownloads(context.Context, *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) {
if err := s.d.torrentFiles.prohibitNewDownloads(); err != nil {
return nil, err
}
return nil, nil
func (s *GrpcServer) ProhibitNewDownloads(ctx context.Context, req *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, s.d.torrentFiles.prohibitNewDownloads(req.Type)
}

// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/downloader/snaptype/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (s snapType) IdxFileNames(version Version, from uint64, to uint64) []string
}

func (s snapType) IdxFileName(version Version, from uint64, to uint64, index ...Index) string {

if len(index) == 0 {
if len(s.indexes) == 0 {
return ""
Expand Down Expand Up @@ -344,7 +343,7 @@ var (

BorSnapshotTypes = []Type{BorEvents, BorSpans}

CaplinSnapshotTypes = []Type{BeaconBlocks}
CaplinSnapshotTypes = []Type{BeaconBlocks, BlobSidecars}

AllTypes = []Type{
Headers,
Expand All @@ -353,5 +352,6 @@ var (
BorEvents,
BorSpans,
BeaconBlocks,
BlobSidecars,
}
)
76 changes: 60 additions & 16 deletions erigon-lib/downloader/torrent_files.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package downloader

import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
Expand All @@ -10,6 +12,7 @@ import (
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/ledgerwatch/erigon-lib/common/dir"
"golang.org/x/exp/slices"
)

// TorrentFiles - does provide thread-safe CRUD operations on .torrent files
Expand Down Expand Up @@ -125,29 +128,70 @@ const ProhibitNewDownloadsFileName = "prohibit_new_downloads.lock"
// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
// After "download once" - Erigon will produce and seed new files
// Downloader will able: seed new files (already existing on FS), download uncomplete parts of existing files (if Verify found some bad parts)
func (tf *TorrentFiles) prohibitNewDownloads() error {
func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
tf.lock.Lock()
defer tf.lock.Unlock()
return CreateProhibitNewDownloadsFile(tf.dir)
}
func (tf *TorrentFiles) newDownloadsAreProhibited() bool {
tf.lock.Lock()
defer tf.lock.Unlock()
return dir.FileExist(filepath.Join(tf.dir, ProhibitNewDownloadsFileName))
// open or create file ProhibitNewDownloadsFileName
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_RDONLY, 0644)
if err != nil {
return fmt.Errorf("open file: %w", err)
}
defer f.Close()
var prohibitedList []string
torrentListJsonBytes, err := io.ReadAll(f)
if err != nil {
return fmt.Errorf("read file: %w", err)
}
if len(torrentListJsonBytes) > 0 {
if err := json.Unmarshal(torrentListJsonBytes, &prohibitedList); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
}
if slices.Contains(prohibitedList, t) {
return nil
}
prohibitedList = append(prohibitedList, t)
f.Close()

//return dir.FileExist(filepath.Join(tf.dir, ProhibitNewDownloadsFileName)) ||
// dir.FileExist(filepath.Join(tf.dir, SnapshotsLockFileName))
// write new prohibited list by opening the file in truncate mode
f, err = os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("open file for writing: %w", err)
}
defer f.Close()
prohibitedListJsonBytes, err := json.Marshal(prohibitedList)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
if _, err := f.Write(prohibitedListJsonBytes); err != nil {
return fmt.Errorf("write: %w", err)
}

return f.Sync()
}

func CreateProhibitNewDownloadsFile(dir string) error {
fPath := filepath.Join(dir, ProhibitNewDownloadsFileName)
f, err := os.Create(fPath)
func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_APPEND|os.O_RDONLY, 0644)
if err != nil {
return err
return false, err
}
defer f.Close()
if err := f.Sync(); err != nil {
return err
var prohibitedList []string
torrentListJsonBytes, err := io.ReadAll(f)
if err != nil {
return false, fmt.Errorf("newDownloadsAreProhibited: read file: %w", err)
}
return nil
if len(torrentListJsonBytes) > 0 {
if err := json.Unmarshal(torrentListJsonBytes, &prohibitedList); err != nil {
return false, fmt.Errorf("newDownloadsAreProhibited: unmarshal: %w", err)
}
}
for _, p := range prohibitedList {
if strings.Contains(name, p) {
return true, nil
}
}
return false, nil
}
1 change: 1 addition & 0 deletions erigon-lib/downloader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ func _addTorrentFile(ctx context.Context, ts *torrent.TorrentSpec, torrentClient
return nil, false, fmt.Errorf("addTorrentFile %s: update failed: %w", ts.DisplayName, err)
}
} else {
t.AddWebSeeds(ts.Webseeds)
if err := db.Update(ctx, torrentInfoReset(ts.DisplayName, ts.InfoHash.Bytes(), 0)); err != nil {
return nil, false, fmt.Errorf("addTorrentFile %s: reset failed: %w", ts.DisplayName, err)
}
Expand Down
55 changes: 45 additions & 10 deletions erigon-lib/downloader/webseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ type WebSeeds struct {
}

func (d *WebSeeds) Discover(ctx context.Context, urls []*url.URL, files []string, rootDir string) {
if d.torrentFiles.newDownloadsAreProhibited() {
return
}
listsOfFiles := d.constructListsOfFiles(ctx, urls, files)
torrentMap := d.makeTorrentUrls(listsOfFiles)
webSeedMap := d.downloadTorrentFilesFromProviders(ctx, rootDir, torrentMap)
Expand All @@ -64,6 +61,14 @@ func (d *WebSeeds) constructListsOfFiles(ctx context.Context, httpProviders []*u
d.logger.Debug("[snapshots.webseed] get from HTTP provider", "err", err, "url", webSeedProviderURL.EscapedPath())
continue
}
// check if we need to prohibit new downloads for some files
for name := range manifestResponse {
prohibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
if prohibited || err != nil {
delete(manifestResponse, name)
}
}

listsOfFiles = append(listsOfFiles, manifestResponse)
}

Expand All @@ -74,6 +79,13 @@ func (d *WebSeeds) constructListsOfFiles(ctx context.Context, httpProviders []*u
d.logger.Debug("[snapshots.webseed] get from File provider", "err", err)
continue
}
// check if we need to prohibit new downloads for some files
for name := range response {
prohibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
if prohibited || err != nil {
delete(response, name)
}
}
listsOfFiles = append(listsOfFiles, response)
}
return listsOfFiles
Expand Down Expand Up @@ -219,17 +231,13 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi
tUrls := tUrls
e.Go(func() error {
for _, url := range tUrls {
res, err := d.callTorrentHttpProvider(ctx, url, name)
//validation happens inside
_, err := d.callTorrentHttpProvider(ctx, url, name)
if err != nil {
d.logger.Log(d.verbosity, "[snapshots] got from webseed", "name", name, "err", err, "url", url)
continue
}
if !d.torrentFiles.Exists(name) {
if err := d.torrentFiles.Create(name, res); err != nil {
d.logger.Log(d.verbosity, "[snapshots] .torrent from webseed rejected", "name", name, "err", err, "url", url)
continue
}
}
//don't save .torrent here - do it inside downloader.Add
webSeeMapLock.Lock()
webSeedMap[torrentMap[*url]] = struct{}{}
webSeeMapLock.Unlock()
Expand All @@ -244,6 +252,33 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi
return webSeedMap
}

func (d *WebSeeds) DownloadAndSaveTorrentFile(ctx context.Context, name string) (bool, error) {
urls, ok := d.ByFileName(name)
if !ok {
return false, nil
}
for _, urlStr := range urls {
parsedUrl, err := url.Parse(urlStr)
if err != nil {
continue
}
res, err := d.callTorrentHttpProvider(ctx, parsedUrl, name)
if err != nil {
return false, err
}
if d.torrentFiles.Exists(name) {
continue
}
if err := d.torrentFiles.Create(name, res); err != nil {
d.logger.Log(d.verbosity, "[snapshots] .torrent from webseed rejected", "name", name, "err", err)
continue
}
return true, nil
}

return false, nil
}

func (d *WebSeeds) callTorrentHttpProvider(ctx context.Context, url *url.URL, fileName string) ([]byte, error) {
request, err := http.NewRequest(http.MethodGet, url.String(), nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/erigontech/mdbx-go v0.27.24
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20240222083139-3cef6c872d07
github.com/ledgerwatch/erigon-snapshot v1.3.1-0.20240321134048-58ba2110522a
github.com/ledgerwatch/interfaces v0.0.0-20240320062914-b57f05746087
github.com/ledgerwatch/log/v3 v3.9.0
github.com/ledgerwatch/secp256k1 v1.0.0
Expand Down
Loading
Loading