Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Downloader: calc stat inside, add --torrent.download.slots and limit downloads inside #3986

Merged
merged 12 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
387 changes: 183 additions & 204 deletions cmd/downloader/downloader/downloader.go

Large diffs are not rendered by default.

118 changes: 44 additions & 74 deletions cmd/downloader/downloader/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ package downloader
import (
"context"
"errors"
"path/filepath"

"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
prototypes "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"google.golang.org/protobuf/types/known/emptypb"
)

Expand All @@ -23,108 +22,79 @@ var (
_ proto_downloader.DownloaderServer = &GrpcServer{}
)

func NewGrpcServer(db kv.RwDB, client *Protocols, snapshotDir *dir.Rw, silent bool) (*GrpcServer, error) {
func NewGrpcServer(db kv.RwDB, client *Protocols, snapshotDir *dir.Rw) (*GrpcServer, error) {
sn := &GrpcServer{
db: db,
t: client,
snapshotDir: snapshotDir,
silent: silent,
}
return sn, nil
}

func CreateTorrentFilesAndAdd(ctx context.Context, snapshotDir *dir.Rw, torrentClient *torrent.Client) error {
if err := BuildTorrentFilesIfNeed(ctx, snapshotDir); err != nil {
return err
}
if err := AddTorrentFiles(ctx, snapshotDir, torrentClient); err != nil {
return err
}
for _, t := range torrentClient.Torrents() {
t.AllowDataUpload()
if !t.Complete.Bool() {
t.AllowDataDownload()
t.DownloadAll()
}
}
return nil
}

type GrpcServer struct {
proto_downloader.UnimplementedDownloaderServer
t *Protocols
db kv.RwDB
snapshotDir *dir.Rw
silent bool
}

func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
infoHashes := make([]metainfo.Hash, len(request.Items))
for i, it := range request.Items {
torrentClient := s.t.TorrentClient
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
for _, it := range request.Items {
if it.TorrentHash == nil {
if err := BuildTorrentFileIfNeed(ctx, it.Path, s.snapshotDir); err != nil {
return nil, err
}
metaInfo, err := AddTorrentFile(ctx, filepath.Join(s.snapshotDir.Path, it.Path+".torrent"), s.t.TorrentClient)
_, err := BuildTorrentAndAdd(ctx, it.Path, s.snapshotDir, s.t.TorrentClient)
if err != nil {
return nil, err
}
infoHashes[i] = metaInfo.HashInfoBytes()
} else {
infoHashes[i] = gointerfaces.ConvertH160toAddress(it.TorrentHash)
continue
}
}
if err := ResolveAbsentTorrents(ctx, s.t.TorrentClient, infoHashes, s.snapshotDir, s.silent); err != nil {
return nil, err
}
for _, t := range s.t.TorrentClient.Torrents() {
t.AllowDataDownload()
t.AllowDataUpload()
if !t.Complete.Bool() {
t.DownloadAll()

hash := Proto2InfoHash(it.TorrentHash)
if _, ok := torrentClient.Torrent(hash); ok {
continue
}

magnet := mi.Magnet(&hash, nil)
go func(magnetUrl string) {
t, err := torrentClient.AddMagnet(magnetUrl)
if err != nil {
log.Warn("[downloader] add magnet link", "err", err)
return
}
t.DisallowDataDownload()
t.AllowDataUpload()
<-t.GotInfo()
mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(s.snapshotDir, t.Info(), &mi); err != nil {
log.Warn("[downloader] create torrent file", "err", err)
return
}
}(magnet.String())

}
return &emptypb.Empty{}, nil
}

func (s *GrpcServer) Stats(ctx context.Context, request *proto_downloader.StatsRequest) (*proto_downloader.StatsReply, error) {
torrents := s.t.TorrentClient.Torrents()
reply := &proto_downloader.StatsReply{Completed: true, Torrents: int32(len(torrents))}
stats := s.t.Stats()
return &proto_downloader.StatsReply{
MetadataReady: stats.MetadataReady,
FilesTotal: stats.FilesTotal,

peers := map[torrent.PeerID]struct{}{}
Completed: stats.Completed,
Progress: stats.Progress,

for _, t := range torrents {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-t.GotInfo():
reply.BytesCompleted += uint64(t.BytesCompleted())
reply.BytesTotal += uint64(t.Info().TotalLength())
reply.Completed = reply.Completed && t.Complete.Bool()
reply.Connections += uint64(len(t.PeerConns()))
PeersUnique: stats.PeersUnique,
ConnectionsTotal: stats.ConnectionsTotal,

for _, peer := range t.PeerConns() {
peers[peer.PeerID] = struct{}{}
}
default:
reply.Completed = false
}
}

reply.Peers = int32(len(peers))
reply.Progress = int32(100 * (float64(reply.BytesCompleted) / float64(reply.BytesTotal)))
if reply.Progress == 100 && !reply.Completed {
reply.Progress = 99
}
return reply, nil
BytesCompleted: stats.BytesCompleted,
BytesTotal: stats.BytesTotal,
UploadRate: stats.UploadRate,
DownloadRate: stats.DownloadRate,
}, nil
}

func Proto2InfoHashes(in []*prototypes.H160) []metainfo.Hash {
infoHashes := make([]metainfo.Hash, len(in))
i := 0
for _, h := range in {
infoHashes[i] = gointerfaces.ConvertH160toAddress(h)
i++
}
return infoHashes
func Proto2InfoHash(in *prototypes.H160) metainfo.Hash {
return gointerfaces.ConvertH160toAddress(in)
}
27 changes: 18 additions & 9 deletions cmd/downloader/downloader/torrentcfg/torrentcfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Cfg struct {
*torrent.ClientConfig
DB kv.RwDB
CompletionCloser io.Closer
DownloadSlots int
}

func Default() *torrent.ClientConfig {
Expand All @@ -33,25 +34,26 @@ func Default() *torrent.ClientConfig {
// enable dht
torrentConfig.NoDHT = true
//torrentConfig.DisableTrackers = true
//torrentConfig.DisableWebtorrent = true
torrentConfig.DisableWebtorrent = true
//torrentConfig.DisableWebseeds = true

// Increase default timeouts, because we often run on commodity networks
// Reduce defaults - to avoid peers with very bad geography
torrentConfig.MinDialTimeout = 1 * time.Second // default: 3sec
torrentConfig.NominalDialTimeout = 10 * time.Second // default: 20sec
torrentConfig.HandshakesTimeout = 1 * time.Second // default: 4sec

return torrentConfig
}

func New(snapshotsDir *dir.Rw, verbosity lg.Level, natif nat.Interface, downloadRate, uploadRate datasize.ByteSize, port, maxPeers, connsPerFile int, db kv.RwDB) (*Cfg, error) {
func New(snapshotsDir *dir.Rw, verbosity lg.Level, natif nat.Interface, downloadRate, uploadRate datasize.ByteSize, port, maxPeers, connsPerFile int, db kv.RwDB, downloadSlots int) (*Cfg, error) {
torrentConfig := Default()
// We would-like to reduce amount of goroutines in Erigon, so reducing next params
torrentConfig.EstablishedConnsPerTorrent = connsPerFile // default: 50
torrentConfig.TorrentPeersHighWater = maxPeers // default: 500
torrentConfig.TorrentPeersLowWater = 50 // default: 50
torrentConfig.HalfOpenConnsPerTorrent = 25 // default: 25
torrentConfig.TotalHalfOpenConns = 50 // default: 100
torrentConfig.EstablishedConnsPerTorrent = connsPerFile // default: 50
torrentConfig.HalfOpenConnsPerTorrent = min(25, connsPerFile) // default: 25
torrentConfig.TotalHalfOpenConns = 50 // default: 100

torrentConfig.TorrentPeersHighWater = maxPeers // default: 500
torrentConfig.TorrentPeersLowWater = min(50, maxPeers) // default: 50

torrentConfig.ListenPort = port
torrentConfig.Seed = true
Expand Down Expand Up @@ -100,5 +102,12 @@ func New(snapshotsDir *dir.Rw, verbosity lg.Level, natif nat.Interface, download
}
m := storage.NewMMapWithCompletion(snapshotsDir.Path, c)
torrentConfig.DefaultStorage = m
return &Cfg{ClientConfig: torrentConfig, DB: db, CompletionCloser: m}, nil
return &Cfg{ClientConfig: torrentConfig, DB: db, CompletionCloser: m, DownloadSlots: downloadSlots}, nil
}

func min(a, b int) int {
if a < b {
return a
}
return b
}
78 changes: 67 additions & 11 deletions cmd/downloader/downloader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"io"
"os"
"path/filepath"
"runtime"
"sync"
"time"

"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mmap_span"
Expand All @@ -21,11 +23,12 @@ import (
"github.com/ledgerwatch/erigon/cmd/downloader/trackers"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
)

// Trackers - break down by priority tier
var Trackers = [][]string{
trackers.First(10, trackers.Best),
trackers.First(7, trackers.Best),
//trackers.First(3, trackers.Udp),
//trackers.First(3, trackers.Https),
//trackers.First(10, trackers.Ws),
Expand Down Expand Up @@ -107,7 +110,10 @@ func BuildTorrentFileIfNeed(ctx context.Context, originalFileName string, root *
if !errors.Is(err, os.ErrNotExist) {
return err
}
info, err := BuildInfoBytesForFile(root.Path, originalFileName)
info := &metainfo.Info{PieceLength: torrentcfg.DefaultPieceSize}
if err := info.BuildFromFilePath(filepath.Join(root.Path, originalFileName)); err != nil {
return err
}
if err != nil {
return err
}
Expand All @@ -118,12 +124,24 @@ func BuildTorrentFileIfNeed(ctx context.Context, originalFileName string, root *
return nil
}

func BuildTorrentAndAdd(ctx context.Context, originalFileName string, snapshotDir *dir.Rw, client *torrent.Client) (*torrent.Torrent, error) {
if err := BuildTorrentFileIfNeed(ctx, originalFileName, snapshotDir); err != nil {
return nil, err
}
torrentFilePath := filepath.Join(snapshotDir.Path, originalFileName+".torrent")
t, err := AddTorrentFile(ctx, torrentFilePath, client)
if err != nil {
return nil, err
}
return t, nil
}

// BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
func BuildTorrentFilesIfNeed(ctx context.Context, root *dir.Rw) error {
func BuildTorrentFilesIfNeed(ctx context.Context, snapshotDir *dir.Rw) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

files, err := allSegmentFiles(root.Path)
files, err := allSegmentFiles(snapshotDir.Path)
if err != nil {
return err
}
Expand All @@ -133,14 +151,14 @@ func BuildTorrentFilesIfNeed(ctx context.Context, root *dir.Rw) error {
wg.Add(1)
go func(f string, i int) {
defer wg.Done()
errs <- BuildTorrentFileIfNeed(ctx, f, root)
errs <- BuildTorrentFileIfNeed(ctx, f, snapshotDir)

select {
default:
case <-ctx.Done():
errs <- ctx.Err()
case <-logEvery.C:
log.Info("[torrent] Creating .torrent files", "Progress", fmt.Sprintf("%d/%d", i, len(files)))
log.Info("[Snapshots] Creating .torrent files", "Progress", fmt.Sprintf("%d/%d", i, len(files)))
}
}(f, i)
}
Expand All @@ -156,12 +174,50 @@ func BuildTorrentFilesIfNeed(ctx context.Context, root *dir.Rw) error {
return nil
}

func BuildInfoBytesForFile(root string, fileName string) (*metainfo.Info, error) {
info := &metainfo.Info{PieceLength: torrentcfg.DefaultPieceSize}
if err := info.BuildFromFilePath(filepath.Join(root, fileName)); err != nil {
return nil, err
// BuildTorrentsAndAdd - create .torrent files from .seg files (big IO) - if .seg files were placed manually to snapshotDir
func BuildTorrentsAndAdd(ctx context.Context, snapshotDir *dir.Rw, client *torrent.Client) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
files, err := allSegmentFiles(snapshotDir.Path)
if err != nil {
return err
}
errs := make(chan error, len(files)*2)
wg := &sync.WaitGroup{}
workers := runtime.GOMAXPROCS(-1) - 1
if workers < 1 {
workers = 1
}
var sem = semaphore.NewWeighted(int64(workers))
for i, f := range files {
wg.Add(1)
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
go func(f string, i int) {
defer sem.Release(1)
defer wg.Done()

select {
case <-ctx.Done():
errs <- ctx.Err()
default:
}

_, err := BuildTorrentAndAdd(ctx, f, snapshotDir, client)
errs <- err
}(f, i)
}
return info, nil
go func() {
wg.Wait()
close(errs)
}()
for err := range errs {
if err != nil {
return err
}
}
return nil
}

func CreateTorrentFileIfNotExists(root *dir.Rw, info *metainfo.Info, mi *metainfo.MetaInfo) error {
Expand Down
Loading