Skip to content

Commit

Permalink
Better downloader balancing
Browse files Browse the repository at this point in the history
  • Loading branch information
xssnick committed Apr 20, 2024
1 parent b339934 commit d0d5f02
Showing 1 changed file with 37 additions and 25 deletions.
62 changes: 37 additions & 25 deletions storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/xssnick/tonutils-go/tvm/cell"
"math"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -60,7 +59,7 @@ type torrentDownloader struct {

torrent *Torrent

mx sync.RWMutex
mx sync.Mutex

globalCtx context.Context
downloadCancel func()
Expand Down Expand Up @@ -478,53 +477,66 @@ func (s *storagePeer) downloadPiece(ctx context.Context, id uint32) (*Piece, err
return &piece, nil
}

var DownloadMaxInflight = int32(20)

// DownloadPieceDetailed - same as DownloadPiece, but also returns proof data
func (t *torrentDownloader) DownloadPieceDetailed(ctx context.Context, pieceIndex uint32) (piece []byte, proof []byte, peer []byte, peerAddr string, err error) {
skip := map[string]*storagePeer{}
for {
peers := t.torrent.GetPeers()

var nodes = make([]*storagePeer, 0, len(peers))
for _, node := range peers {
if skip[string(node.peer.nodeId)] != nil {
continue
}
var bestNode *storagePeer

t.mx.Lock()
{
for _, node := range peers {
if skip[string(node.peer.nodeId)] != nil {
continue
}

inf := atomic.LoadInt32(&node.peer.inflight)
if inf >= DownloadMaxInflight {
continue
}

node.peer.piecesMx.RLock()
hasPiece := node.peer.hasPieces[pieceIndex]
node.peer.piecesMx.RUnlock()
if bestNode != nil && atomic.LoadInt32(&bestNode.inflight) < inf {
continue
}

node.peer.piecesMx.RLock()
hasPiece := node.peer.hasPieces[pieceIndex]
node.peer.piecesMx.RUnlock()

if hasPiece {
bestNode = node.peer
}
}

if hasPiece {
nodes = append(nodes, node.peer)
if bestNode != nil {
atomic.AddInt32(&bestNode.inflight, 1)
}
}
t.mx.Unlock()

if len(nodes) == 0 {
if bestNode == nil {
select {
case <-ctx.Done():
return nil, nil, nil, "", ctx.Err()
case <-time.After(250 * time.Millisecond):
case <-time.After(5 * time.Millisecond):
skip = map[string]*storagePeer{}
// no nodes, wait
}
continue
}

// use most free node
sort.Slice(nodes, func(i, j int) bool {
p1 := atomic.LoadInt32(&nodes[i].inflight) + (5 * atomic.LoadInt32(&nodes[i].fails))
p2 := atomic.LoadInt32(&nodes[j].inflight) + (5 * atomic.LoadInt32(&nodes[j].fails))
return p1 < p2
})

node := nodes[0]
pc, err := node.downloadPiece(ctx, pieceIndex)
pc, err := bestNode.downloadPiece(ctx, pieceIndex)
atomic.AddInt32(&bestNode.inflight, -1)
if err != nil {
skip[string(node.nodeId)] = node
skip[string(bestNode.nodeId)] = bestNode
continue
}

return pc.Data, pc.Proof, node.nodeId, node.nodeAddr, nil
return pc.Data, pc.Proof, bestNode.nodeId, bestNode.nodeAddr, nil
}
}

Expand Down

0 comments on commit d0d5f02

Please sign in to comment.