Skip to content

Commit

Permalink
Added downloader http handling and extended logging (#9962)
Browse files Browse the repository at this point in the history
* Added cf headers to webseed requests

* Extended torrent tequest transport to retry 500+ status responses and
count progress and fails

* Added internal torrent status logging for stopped and no-progress
files
  • Loading branch information
mh0lt authored Apr 17, 2024
1 parent 3af7e45 commit 2140e2f
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 29 deletions.
228 changes: 209 additions & 19 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package downloader

import (
"bufio"
"bytes"
"context"
"encoding/hex"
Expand All @@ -31,6 +32,7 @@ import (
"path/filepath"
"reflect"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
Expand All @@ -41,7 +43,6 @@ import (
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/c2h5oh/datasize"
dir2 "github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/log/v3"
"github.com/tidwall/btree"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -100,6 +101,11 @@ type webDownloadInfo struct {
torrent *torrent.Torrent
}

type downloadProgress struct {
time time.Time
progress float32
}

type AggStats struct {
MetadataReady, FilesTotal int32
LastMetadataUpdate *time.Time
Expand All @@ -117,34 +123,105 @@ type AggStats struct {
UploadRate, DownloadRate uint64
LocalFileHashes int
LocalFileHashTime time.Duration

WebseedTripCount *atomic.Int64
WebseedDiscardCount *atomic.Int64
WebseedServerFails *atomic.Int64
WebseedBytesDownload *atomic.Int64

lastTorrentStatus time.Time
downloadProgress map[string]downloadProgress
}

type requestHandler struct {
http.Transport
downloader *Downloader
}

var headers = http.Header{
var cloudflareHeaders = http.Header{
"lsjdjwcush6jbnjj3jnjscoscisoc5s": []string{"I%OSJDNFKE783DDHHJD873EFSIVNI7384R78SSJBJBCCJBC32JABBJCBJK45"},
}

func (r *requestHandler) RoundTrip(req *http.Request) (resp *http.Response, err error) {
for key, value := range headers {
func insertCloudflareHeaders(req *http.Request) {
for key, value := range cloudflareHeaders {
req.Header[key] = value
}
}

func (r *requestHandler) RoundTrip(req *http.Request) (resp *http.Response, err error) {
insertCloudflareHeaders(req)

resp, err = r.Transport.RoundTrip(req)

delay := 500 * time.Millisecond
attempts := 1
retry := true

const maxDelay = 5 * time.Second
const maxAttempts = 10

for err == nil && retry {
r.downloader.stats.WebseedTripCount.Add(1)

switch resp.StatusCode {
case http.StatusOK:
if len(req.Header.Get("Range")) > 0 {
// the torrent lib is expecting http.StatusPartialContent so it will discard this
// if this count is higher than 0, its likely there is a server side config issue
// as it implies that the server is not handling range requests correctly and is just
// returning the whole file - which the torrent lib can't handle
//
// TODO: We could count the bytes - probably need to take this from the req though
// as its not clear the amount of the content which will be read. This needs
// further investigation - if required.
r.downloader.stats.WebseedDiscardCount.Add(1)
}

r.downloader.stats.WebseedBytesDownload.Add(resp.ContentLength)
retry = false

case http.StatusInternalServerError, http.StatusBadGateway:
r.downloader.stats.WebseedServerFails.Add(1)

attempts++
delayTimer := time.NewTimer(delay)

select {
case <-delayTimer.C:
// Note this assumes the req.Body is nil
resp, err = r.Transport.RoundTrip(req)
r.downloader.stats.WebseedTripCount.Add(1)

if err == nil && delay < maxDelay {
delay = delay + (time.Duration(rand.Intn(200-75)+75)*delay)/100
}

case <-req.Context().Done():
err = req.Context().Err()
}
retry = attempts > maxAttempts

default:
r.downloader.stats.WebseedBytesDownload.Add(resp.ContentLength)
retry = false
}
}

return r.Transport.RoundTrip(req)
return resp, err
}

func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosity log.Lvl, discover bool) (*Downloader, error) {
cfg.ClientConfig.WebTransport = &requestHandler{
http.Transport{
requestHandler := &requestHandler{
Transport: http.Transport{
Proxy: cfg.ClientConfig.HTTPProxy,
DialContext: cfg.ClientConfig.HTTPDialContext,
// I think this value was observed from some webseeds. It seems reasonable to extend it
// to other uses of HTTP from the client.
MaxConnsPerHost: 10,
}}

cfg.ClientConfig.WebTransport = requestHandler

db, c, m, torrentClient, err := openClient(ctx, cfg.Dirs.Downloader, cfg.Dirs.Snap, cfg.ClientConfig)
if err != nil {
return nil, fmt.Errorf("openClient: %w", err)
Expand All @@ -162,7 +239,13 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosi
}

mutex := &sync.RWMutex{}
var stats AggStats
stats := AggStats{
WebseedTripCount: &atomic.Int64{},
WebseedBytesDownload: &atomic.Int64{},
WebseedDiscardCount: &atomic.Int64{},
WebseedServerFails: &atomic.Int64{},
downloadProgress: map[string]downloadProgress{},
}

lock, err := getSnapshotLock(ctx, cfg, db, &stats, mutex, logger)
if err != nil {
Expand All @@ -189,6 +272,8 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosi
}
d.webseeds.SetTorrent(d.torrentFiles, lock.Downloads, cfg.DownloadTorrentFilesFromWebseed)

requestHandler.downloader = d

if cfg.ClientConfig.DownloadRateLimiter != nil {
downloadLimit := cfg.ClientConfig.DownloadRateLimiter.Limit()
d.downloadLimit = &downloadLimit
Expand Down Expand Up @@ -1032,7 +1117,7 @@ func (d *Downloader) mainLoop(silent bool) error {

switch {
case len(t.PeerConns()) > 0:
d.logger.Debug("[snapshots] Downloading from BitTorrent", "file", t.Name(), "peers", len(t.PeerConns()))
d.logger.Debug("[snapshots] Downloading from BitTorrent", "file", t.Name(), "peers", len(t.PeerConns()), "webpeers", len(t.WebseedPeerConns()))
delete(waiting, t.Name())
d.torrentDownload(t, downloadComplete, sem)
case len(t.WebseedPeerConns()) > 0:
Expand Down Expand Up @@ -1410,7 +1495,7 @@ func (d *Downloader) webDownload(peerUrls []*url.URL, t *torrent.Torrent, i *web

if !ok {
var err error
session, err = d.webDownloadClient.NewSession(d.ctx, d.SnapDir(), peerUrl, headers)
session, err = d.webDownloadClient.NewSession(d.ctx, d.SnapDir(), peerUrl, cloudflareHeaders)

if err != nil {
return nil, err
Expand Down Expand Up @@ -1699,10 +1784,10 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
diagnostics.Send(diagnostics.SnapshoFilesList{Files: filesList})
}

downloading := map[string]struct{}{}
downloading := map[string]float32{}

for file := range d.downloading {
downloading[file] = struct{}{}
downloading[file] = 0
}

var dbInfo int
Expand Down Expand Up @@ -1745,6 +1830,17 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
bytesCompleted = t.BytesCompleted()
}
progress := float32(float64(100) * (float64(bytesCompleted) / float64(tLen)))

if _, ok := downloading[torrentName]; ok {

if progress != stats.downloadProgress[torrentName].progress {
stats.downloadProgress[torrentName] = downloadProgress{time: time.Now(), progress: progress}
}
} else {
// we only care about progress of downloading files
delete(stats.downloadProgress, torrentName)
}

stats.BytesCompleted += uint64(bytesCompleted)
stats.BytesTotal += uint64(tLen)

Expand Down Expand Up @@ -1796,6 +1892,10 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {

// more detailed statistic: download rate of each peer (for each file)
if !torrentComplete && progress != 0 {
if _, ok := downloading[torrentName]; ok {
downloading[torrentName] = progress
}

d.logger.Log(d.verbosity, "[snapshots] progress", "file", torrentName, "progress", fmt.Sprintf("%.2f%%", progress), "peers", len(peersOfThisFile), "webseeds", len(weebseedPeersOfThisFile))
d.logger.Log(d.verbosity, "[snapshots] webseed peers", webseedRates...)
d.logger.Log(d.verbosity, "[snapshots] bittorrent peers", rates...)
Expand Down Expand Up @@ -1885,7 +1985,18 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
}

if !stats.Completed {
d.logger.Debug("[snapshots] info", "len", len(torrents), "webTransfers", webTransfers, "torrent", torrentInfo, "db", dbInfo, "t-complete", tComplete, "db-complete", dbComplete, "localHashes", stats.LocalFileHashes, "localHashTime", stats.LocalFileHashTime)
d.logger.Debug("[snapshots] info",
"len", len(torrents),
"webTransfers", webTransfers,
"torrent", torrentInfo,
"db", dbInfo,
"t-complete", tComplete,
"db-complete", dbComplete,
"webseed-trips", stats.WebseedTripCount.Load(),
"webseed-discards", stats.WebseedDiscardCount.Load(),
"webseed-fails", stats.WebseedServerFails.Load(),
"webseed-bytes", common.ByteCount(uint64(stats.WebseedBytesDownload.Load())),
"localHashes", stats.LocalFileHashes, "localHashTime", stats.LocalFileHashTime)
}

if lastMetadataReady != stats.MetadataReady {
Expand All @@ -1901,24 +2012,52 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
d.logger.Info("[snapshots] no metadata yet", "files", amount, "list", strings.Join(noMetadata, ","))
}

var noDownloadProgress []string

if len(zeroProgress) > 0 {
amount := len(zeroProgress)

for _, file := range zeroProgress {
if _, ok := downloading[file]; ok {
noDownloadProgress = append(noDownloadProgress, file)
}
}

if len(zeroProgress) > 5 {
zeroProgress = append(zeroProgress[:5], "...")
}

d.logger.Info("[snapshots] no progress yet", "files", amount, "list", strings.Join(zeroProgress, ","))
}

if len(d.downloading) > 0 {
amount := len(d.downloading)
if len(downloading) > 0 {
amount := len(downloading)

files := make([]string, 0, len(downloading))
for file, progress := range downloading {
files = append(files, fmt.Sprintf("%s (%.0f%%)", file, progress))

for file := range d.downloading {
files = append(files, file)
if dp, ok := stats.downloadProgress[file]; ok {
if time.Since(dp.time) > 30*time.Minute {
noDownloadProgress = append(noDownloadProgress, file)
}
}
}
sort.Strings(files)

d.logger.Log(d.verbosity, "[snapshots] downloading", "files", amount, "list", strings.Join(files, ", "))
}

d.logger.Log(d.verbosity, "[snapshots] downloading", "files", amount, "list", strings.Join(files, ","))
if time.Since(stats.lastTorrentStatus) > 5*time.Minute {
stats.lastTorrentStatus = time.Now()

if len(noDownloadProgress) > 0 {
progressStatus := getProgressStatus(d.torrentClient, noDownloadProgress)
for file, status := range progressStatus {
d.logger.Debug(fmt.Sprintf("[snapshots] torrent status: %s\n %s", file,
string(bytes.TrimRight(bytes.ReplaceAll(status, []byte("\n"), []byte("\n ")), "\n "))))
}
}
}

if stats.BytesDownload > prevStats.BytesDownload {
Expand Down Expand Up @@ -1948,6 +2087,57 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
d.stats = stats
}

type filterWriter struct {
files map[string][]byte
remainder []byte
file string
}

func (f *filterWriter) Write(p []byte) (n int, err error) {
written := len(p)

p = append(f.remainder, p...)

for len(p) > 0 {
scanned, line, _ := bufio.ScanLines(p, false)

if scanned > 0 {
if len(f.file) > 0 {
if len(line) == 0 {
f.file = ""
} else {
line = append(line, '\n')
f.files[f.file] = append(f.files[f.file], line...)
}
} else {
if _, ok := f.files[string(line)]; ok {
f.file = string(line)
}
}

p = p[scanned:]
} else {
f.remainder = p
p = nil
}
}
return written, nil
}

func getProgressStatus(torrentClient *torrent.Client, noDownloadProgress []string) map[string][]byte {
writer := filterWriter{
files: map[string][]byte{},
}

for _, file := range noDownloadProgress {
writer.files[file] = nil
}

torrentClient.WriteStatus(&writer)

return writer.files
}

func getWebseedsRatesForlogs(weebseedPeersOfThisFile []*torrent.Peer, fName string, finished bool) ([]interface{}, []diagnostics.SegmentPeer) {
seeds := make([]diagnostics.SegmentPeer, 0, len(weebseedPeersOfThisFile))
webseedRates := make([]interface{}, 0, len(weebseedPeersOfThisFile)*2)
Expand Down Expand Up @@ -2009,7 +2199,7 @@ func (d *Downloader) VerifyData(ctx context.Context, whiteList []string, failFas
continue
}

if !dir2.FileExist(filepath.Join(d.SnapDir(), t.Name())) {
if !dir.FileExist(filepath.Join(d.SnapDir(), t.Name())) {
continue
}

Expand Down
Loading

0 comments on commit 2140e2f

Please sign in to comment.