Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved downloader webseed performance #10715

Merged
merged 62 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
6bc97f1
updated torrent mod to alpha 11
mh0lt May 22, 2024
dbbcaff
Merge branch 'main' into dl_webseed_halt_fixes
mh0lt May 22, 2024
10bf2b0
remove single file test
mh0lt May 24, 2024
e24f644
inc from not to
mh0lt May 24, 2024
0ee38e7
updated torrent mod to alpha-12
mh0lt May 25, 2024
2d66e29
Merge branch 'main' into dl_webseed_halt_fixes
mh0lt May 25, 2024
0e238a4
added active count to roundtrip
mh0lt May 26, 2024
cabd6dc
change default hashers to numcpu's - 2
mh0lt May 29, 2024
510be13
force disable peice check
mh0lt May 29, 2024
2907048
Merge branch 'main' into dl_webseed_halt_fixes
mh0lt Jun 9, 2024
a880d1a
post merge fixes
mh0lt Jun 9, 2024
1699e50
add additional torrent download if its still unavailable
mh0lt Jun 9, 2024
14a0940
add completed bitmap processing
mh0lt Jun 11, 2024
7e63f02
remove import
mh0lt Jun 11, 2024
3ad794c
added flush persistence
mh0lt Jun 11, 2024
ada1a49
remove temp test code
mh0lt Jun 12, 2024
c0757ac
updated mods for erigontech/torrent release
mh0lt Jun 12, 2024
3e1df0e
Merge branch 'main' into dl_webseed_halt_fixes
mh0lt Jun 12, 2024
62452c0
update mods
mh0lt Jun 13, 2024
c0bab52
Merge branch 'main' into dl_webseed_halt_fixes
mh0lt Jun 13, 2024
0155fa2
tidied mods
mh0lt Jun 13, 2024
bdecbc4
Merge branch 'main' into dl_webseed_halt_fixes
AskAlexSharov Jun 14, 2024
4f52777
merge main
AskAlexSharov Jun 15, 2024
6696ee4
Merge branch 'main' into dl_webseed_halt_fixes
AskAlexSharov Jun 15, 2024
997cb9d
save
AskAlexSharov Jun 17, 2024
0a9c87a
Merge branch 'main' into dl_webseed_halt_fixes
AskAlexSharov Jun 17, 2024
35eae22
Merge branch 'main' into dl_webseed_halt_fixes
AskAlexSharov Jun 17, 2024
43be9e8
merge main
AskAlexSharov Jun 19, 2024
a282e57
Merge branch 'main' into dl_webseed_halt_fixes
AskAlexSharov Jun 22, 2024
e301135
copy stats atomics to avoid race reports
mh0lt Jun 22, 2024
1d983b3
remove db writer race
mh0lt Jun 23, 2024
67a41c9
fix race reading stats
mh0lt Jun 23, 2024
952d0d3
update mods to latest torrent lib
mh0lt Jun 24, 2024
a22bb13
Merge branch 'main' into dl_webseed_halt_fixes
mh0lt Jun 24, 2024
c26df75
Merge branch 'main' into dl_webseed_halt_fixes
AskAlexSharov Jun 25, 2024
e1fa6a3
fix deadlock after merge main
AskAlexSharov Jun 25, 2024
1e1a5ab
Merge branch 'main' into dl_webseed_halt_fixes
AskAlexSharov Jun 25, 2024
d8c6514
Merge branch 'main' into dl_webseed_halt_fixes
AskAlexSharov Jun 26, 2024
182597a
Merge branch 'main' into dl_webseed_halt_fixes
mh0lt Jun 26, 2024
b888d16
updated piece completion processing
mh0lt Jun 26, 2024
8c811af
Merge branch 'main' into dl_webseed_halt_fixes
AskAlexSharov Jun 27, 2024
fdf25b6
Merge branch 'main' into dl_webseed_halt_fixes
mh0lt Jun 27, 2024
819eaac
Merge branch 'dl_webseed_halt_fixes' of https://github.com/ledgerwatc…
mh0lt Jun 27, 2024
08f697b
updated torrent with stable memory pool
mh0lt Jun 28, 2024
9b9adbe
updated erigontech/torrent to v1.54.2-alpha-20
mh0lt Jun 29, 2024
a7501f1
Merge branch 'main' into dl_webseed_halt_fixes
mh0lt Jun 29, 2024
61e667e
updated erigontech/torrent to v1.54.2-alpha-21
mh0lt Jun 29, 2024
b0481b6
put printer
mh0lt Jun 29, 2024
5890ab2
flush printer
mh0lt Jun 30, 2024
9f26fb3
remove flushed printer
mh0lt Jun 30, 2024
614ee7c
flush printer
mh0lt Jun 30, 2024
8ca9ce1
added flush set
mh0lt Jun 30, 2024
fa30ca3
added flush set
mh0lt Jun 30, 2024
ac1e246
added flush set
mh0lt Jun 30, 2024
dae0642
add map
mh0lt Jun 30, 2024
b708cfa
remove printers
mh0lt Jun 30, 2024
ffa610f
Merge branch 'main' into dl_webseed_halt_fixes
AskAlexSharov Jul 1, 2024
9a4f6fa
Merge branch 'main' into dl_webseed_halt_fixes
mh0lt Jul 1, 2024
0232b85
Merge branch 'dl_webseed_halt_fixes' of https://github.com/ledgerwatc…
mh0lt Jul 1, 2024
b3675f4
updated erigontech/torrent to v1.54.2-alpha-22
mh0lt Jul 1, 2024
66b2282
updated erigontech/torrent to v1.54.2-alpha-23
mh0lt Jul 1, 2024
fe0fd6d
updated erigontech/torrent to v1.54.2-alpha-24
mh0lt Jul 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions erigon-lib/diagnostics/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.SetStagesList(info.StagesList)
d.mu.Unlock()

d.saveSyncStagesToDB()
func() {
d.mu.Lock()
defer d.mu.Unlock()
d.SetStagesList(info.StagesList)
d.saveSyncStagesToDB()
}()
}
}
}()
Expand Down
13 changes: 7 additions & 6 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosi

cfg.ClientConfig.WebTransport = requestHandler

db, c, m, torrentClient, err := openClient(ctx, cfg.Dirs.Downloader, cfg.Dirs.Snap, cfg.ClientConfig, cfg.MdbxWriteMap)
db, c, m, torrentClient, err := openClient(ctx, cfg.Dirs.Downloader, cfg.Dirs.Snap, cfg.ClientConfig, cfg.MdbxWriteMap, logger)
if err != nil {
return nil, fmt.Errorf("openClient: %w", err)
}
Expand Down Expand Up @@ -838,6 +838,7 @@ func (d *Downloader) mainLoop(silent bool) error {
if ok && err == nil {
_, _, err = addTorrentFile(d.ctx, ts, d.torrentClient, d.db, d.webseeds)
if err != nil {
d.logger.Warn("[snapshots] addTorrentFile from webseed", "err", err)
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Debug the err

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
Expand Down Expand Up @@ -1092,10 +1093,10 @@ func (d *Downloader) mainLoop(silent bool) error {
}
}

d.lock.RLock()
d.lock.Lock()
downloadingLen := len(d.downloading)
d.stats.Downloading = int32(downloadingLen)
d.lock.RUnlock()
d.lock.Unlock()

// the call interval of the loop (elapsed sec) used to get slots/sec for
// calculating the number of files to download based on the loop speed
Expand Down Expand Up @@ -2139,7 +2140,7 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
}

if !stats.Completed {
logger.Debug("[snapshots] info",
logger.Debug("[snapshots] download info",
"len", len(torrents),
"webTransfers", webTransfers,
"torrent", torrentInfo,
Expand Down Expand Up @@ -2717,7 +2718,7 @@ func (d *Downloader) StopSeeding(hash metainfo.Hash) error {

func (d *Downloader) TorrentClient() *torrent.Client { return d.torrentClient }

func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientConfig, writeMap bool) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) {
func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientConfig, writeMap bool, logger log.Logger) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) {
dbCfg := mdbx.NewMDBX(log.New()).
Label(kv.DownloaderDB).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DownloaderTablesCfg }).
Expand All @@ -2735,7 +2736,7 @@ func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientC
return nil, nil, nil, nil, fmt.Errorf("torrentcfg.openClient: %w", err)
}
//c, err = NewMdbxPieceCompletion(db)
c, err = NewMdbxPieceCompletionBatch(db)
c, err = NewMdbxPieceCompletion(db, logger)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("torrentcfg.NewMdbxPieceCompletion: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion erigon-lib/downloader/downloadercfg/downloadercfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package downloadercfg

import (
"github.com/ledgerwatch/erigon-lib/common/dbg"
"net"
"net/url"
"os"
Expand Down Expand Up @@ -68,7 +69,7 @@ func Default() *torrent.ClientConfig {
// better don't increase because erigon periodically producing "new seedable files" - and adding them to downloader.
// it must not impact chain tip sync - so, limit resources to minimum by default.
// but when downloader is started as a separated process - rise it to max
//torrentConfig.PieceHashersPerTorrent = max(1, runtime.NumCPU()-1)
torrentConfig.PieceHashersPerTorrent = dbg.EnvInt("DL_HASHERS", min(16, max(2, runtime.NumCPU()-2)))

torrentConfig.MinDialTimeout = 6 * time.Second //default: 3s
torrentConfig.HandshakesTimeout = 8 * time.Second //default: 4s
Expand Down
165 changes: 101 additions & 64 deletions erigon-lib/downloader/mdbx_piece_completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package downloader
import (
"context"
"encoding/binary"
"sync"

"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/log/v3"
)

const (
Expand All @@ -33,17 +36,37 @@ const (
)

type mdbxPieceCompletion struct {
db kv.RwDB
db *mdbx.MdbxKV
mu sync.RWMutex
completed map[infohash.T]*roaring.Bitmap
flushed map[infohash.T]*roaring.Bitmap
logger log.Logger
}

var _ storage.PieceCompletion = (*mdbxPieceCompletion)(nil)

func NewMdbxPieceCompletion(db kv.RwDB) (ret storage.PieceCompletion, err error) {
ret = &mdbxPieceCompletion{db: db}
func NewMdbxPieceCompletion(db kv.RwDB, logger log.Logger) (ret storage.PieceCompletion, err error) {
ret = &mdbxPieceCompletion{
db: db.(*mdbx.MdbxKV),
logger: logger,
completed: map[infohash.T]*roaring.Bitmap{},
flushed: map[infohash.T]*roaring.Bitmap{}}
return
}

func (m mdbxPieceCompletion) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) {
func (m *mdbxPieceCompletion) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) {
m.mu.RLock()
if completed, ok := m.completed[pk.InfoHash]; ok {
if completed.Contains(uint32(pk.Index)) {
m.mu.RUnlock()
return storage.Completion{
Complete: true,
Ok: true,
}, nil
}
}
m.mu.RUnlock()

err = m.db.View(context.Background(), func(tx kv.Tx) error {
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
Expand All @@ -66,11 +89,14 @@ func (m mdbxPieceCompletion) Get(pk metainfo.PieceKey) (cn storage.Completion, e
return
}

func (m mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
func (m *mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
if c, err := m.Get(pk); err == nil && c.Ok && c.Complete == b {
return nil
}

m.mu.Lock()
defer m.mu.Unlock()

var tx kv.RwTx
var err error
// On power-off recent "no-sync" txs may be lost.
Expand All @@ -84,91 +110,102 @@ func (m mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
// 1K fsyncs/2minutes it's quite expensive, but even on cloud (high latency) drive it allow download 100mb/s
// and Erigon doesn't do anything when downloading snapshots
if b {
tx, err = m.db.BeginRwNosync(context.Background())
if err != nil {
return err
completed, ok := m.completed[pk.InfoHash]

if !ok {
completed = &roaring.Bitmap{}
m.completed[pk.InfoHash] = completed
}
} else {
tx, err = m.db.BeginRw(context.Background())
if err != nil {
return err

completed.Add(uint32(pk.Index))

if flushed, ok := m.flushed[pk.InfoHash]; !ok || !flushed.Contains(uint32(pk.Index)) {
return nil
}
}

tx, err = m.db.BeginRw(context.Background())
if err != nil {
return err
}

defer tx.Rollback()

var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))
err = putCompletion(tx, pk.InfoHash, uint32(pk.Index), b)

v := []byte(incomplete)
if b {
v = []byte(complete)
}
err = tx.Put(kv.BittorrentCompletion, key[:], v)
if err != nil {
return err
}

return tx.Commit()
}

func (m *mdbxPieceCompletion) Close() error {
m.db.Close()
return nil
}
func putCompletion(tx kv.RwTx, infoHash infohash.T, index uint32, c bool) error {
var key [infohash.Size + 4]byte
copy(key[:], infoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], index)

type mdbxPieceCompletionBatch struct {
db *mdbx.MdbxKV
v := []byte(incomplete)
if c {
v = []byte(complete)
}
//fmt.Println("PUT", infoHash, index, c)
return tx.Put(kv.BittorrentCompletion, key[:], v)
}

var _ storage.PieceCompletion = (*mdbxPieceCompletionBatch)(nil)
func (m *mdbxPieceCompletion) Flushed(infoHash infohash.T, flushed *roaring.Bitmap) {
m.mu.Lock()
defer m.mu.Unlock()

func NewMdbxPieceCompletionBatch(db kv.RwDB) (ret storage.PieceCompletion, err error) {
ret = &mdbxPieceCompletionBatch{db: db.(*mdbx.MdbxKV)}
return
tx, err := m.db.BeginRw(context.Background())

if err != nil {
m.logger.Warn("[snapshots] failed to flush piece completions", "hash", infoHash, err, err)
return
}

defer tx.Rollback()

m.putFlushed(tx, infoHash, flushed)

err = tx.Commit()

if err != nil {
m.logger.Warn("[snapshots] failed to flush piece completions", "hash", infoHash, err, err)
}
}

func (m *mdbxPieceCompletionBatch) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) {
err = m.db.View(context.Background(), func(tx kv.Tx) error {
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))
cn.Ok = true
v, err := tx.GetOne(kv.BittorrentCompletion, key[:])
if err != nil {
return err
}
switch string(v) {
case complete:
cn.Complete = true
case incomplete:
cn.Complete = false
default:
cn.Ok = false
func (m *mdbxPieceCompletion) putFlushed(tx kv.RwTx, infoHash infohash.T, flushed *roaring.Bitmap) {
if completed, ok := m.completed[infoHash]; ok {
setters := flushed.Clone()
setters.And(completed)

if setters.GetCardinality() > 0 {
setters.Iterate(func(piece uint32) bool {
// TODO deal with error (? don't remove from bitset ?)
_ = putCompletion(tx, infoHash, piece, true)
return true
})
}
return nil
})
return
}

func (m *mdbxPieceCompletionBatch) Set(pk metainfo.PieceKey, b bool) error {
if c, err := m.Get(pk); err == nil && c.Ok && c.Complete == b {
return nil
completed.AndNot(setters)

if completed.IsEmpty() {
delete(m.completed, infoHash)
}
}
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))

v := []byte(incomplete)
if b {
v = []byte(complete)
allFlushed, ok := m.flushed[infoHash]

if !ok {
allFlushed = &roaring.Bitmap{}
m.flushed[infoHash] = allFlushed
}
return m.db.Batch(func(tx kv.RwTx) error {
return tx.Put(kv.BittorrentCompletion, key[:], v)
})

allFlushed.Or(flushed)
}

func (m *mdbxPieceCompletionBatch) Close() error {
func (m *mdbxPieceCompletion) Close() error {
m.db.Close()
return nil
}
28 changes: 2 additions & 26 deletions erigon-lib/downloader/mdbx_piece_completion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon-lib/log/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -29,32 +30,7 @@ import (

func TestMdbxPieceCompletion(t *testing.T) {
db := memdb.NewTestDownloaderDB(t)
pc, err := NewMdbxPieceCompletion(db)
require.NoError(t, err)
defer pc.Close()

pk := metainfo.PieceKey{}

b, err := pc.Get(pk)
require.NoError(t, err)
assert.False(t, b.Ok)

require.NoError(t, pc.Set(pk, false))

b, err = pc.Get(pk)
require.NoError(t, err)
assert.Equal(t, storage.Completion{Complete: false, Ok: true}, b)

require.NoError(t, pc.Set(pk, true))

b, err = pc.Get(pk)
require.NoError(t, err)
assert.Equal(t, storage.Completion{Complete: true, Ok: true}, b)
}

func TestMdbxPieceCompletionBatch(t *testing.T) {
db := memdb.NewTestDownloaderDB(t)
pc, err := NewMdbxPieceCompletionBatch(db)
pc, err := NewMdbxPieceCompletion(db, log.New())
require.NoError(t, err)
defer pc.Close()

Expand Down
8 changes: 4 additions & 4 deletions erigon-lib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ require (
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pion/udp v0.1.4 // indirect
golang.org/x/mod v0.18.0 // indirect
golang.org/x/tools v0.22.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect
modernc.org/libc v1.50.4 // indirect
modernc.org/memory v1.8.0 // indirect
modernc.org/sqlite v1.29.8 // indirect
Expand Down Expand Up @@ -142,15 +143,14 @@ require (
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/net v0.26.0
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
modernc.org/mathutil v1.6.0 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
zombiezen.com/go/sqlite v0.13.1 // indirect
)

replace (
github.com/anacrolix/torrent => github.com/erigontech/torrent v1.54.2-alpha-10
github.com/anacrolix/torrent => github.com/erigontech/torrent v1.54.2-alpha-24
github.com/holiman/bloomfilter/v2 => github.com/AskAlexSharov/bloomfilter/v2 v2.0.8
github.com/tidwall/btree => github.com/AskAlexSharov/btree v1.6.2
)
Loading
Loading