Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

downloader: added downloaded torrents notifier #11850

Merged
merged 21 commits into from
Sep 5, 2024
63 changes: 63 additions & 0 deletions erigon-lib/direct/downloader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package direct

import (
"context"
"io"

proto_downloader "github.com/erigontech/erigon-lib/gointerfaces/downloaderproto"
"google.golang.org/grpc"
Expand Down Expand Up @@ -51,3 +52,65 @@ func (c *DownloaderClient) SetLogPrefix(ctx context.Context, in *proto_downloade
func (c *DownloaderClient) Completed(ctx context.Context, in *proto_downloader.CompletedRequest, opts ...grpc.CallOption) (*proto_downloader.CompletedReply, error) {
return c.server.Completed(ctx, in)
}

func (c *DownloaderClient) TorrentCompleted(ctx context.Context, in *proto_downloader.TorrentCompletedRequest, opts ...grpc.CallOption) (proto_downloader.Downloader_TorrentCompletedClient, error) {
ch := make(chan *downloadedReply, 1<<16)
streamServer := &DownloadeSubscribeS{ch: ch, ctx: ctx}

go func() {
streamServer.Err(c.server.TorrentCompleted(in, streamServer))
}()

return &DownloadeSubscribeC{ch: ch, ctx: ctx}, nil
}

type DownloadeSubscribeC struct {
ch chan *downloadedReply
ctx context.Context
grpc.ClientStream
}

func (c *DownloadeSubscribeC) Recv() (*proto_downloader.TorrentCompletedReply, error) {
if c.ctx.Err() != nil {
return nil, io.EOF
}

m, ok := <-c.ch
if !ok || m == nil {
return nil, io.EOF
}
return m.r, m.err
}
func (c *DownloadeSubscribeC) Context() context.Context { return c.ctx }

type DownloadeSubscribeS struct {
ch chan *downloadedReply
ctx context.Context
grpc.ServerStream
}

type downloadedReply struct {
r *proto_downloader.TorrentCompletedReply
err error
}

func (s *DownloadeSubscribeS) Send(m *proto_downloader.TorrentCompletedReply) error {
if s.ctx.Err() != nil {
if s.ch != nil {
ch := s.ch
s.ch = nil
close(ch)
}
return s.ctx.Err()
}

s.ch <- &downloadedReply{r: m}
return nil
}
func (s *DownloadeSubscribeS) Context() context.Context { return s.ctx }
func (s *DownloadeSubscribeS) Err(err error) {
if err == nil {
return
}
s.ch <- &downloadedReply{err: err}
}
47 changes: 45 additions & 2 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/erigontech/erigon-lib/diagnostics"
"github.com/erigontech/erigon-lib/downloader/downloadercfg"
"github.com/erigontech/erigon-lib/downloader/snaptype"
prototypes "github.com/erigontech/erigon-lib/gointerfaces/typesproto"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/mdbx"
"github.com/erigontech/erigon-lib/log/v3"
Expand Down Expand Up @@ -97,8 +98,15 @@ type Downloader struct {

stuckFileDetailedLogs bool

logPrefix string
startTime time.Time
logPrefix string
startTime time.Time
broadcast func(name string, hash *prototypes.H160)
Copy link
Contributor

@mh0lt mh0lt Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rename this to notifyCompleted - you don't need an extra wrapper to call a function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

completedTorrents map[string]completedTorrentInfo
}

type completedTorrentInfo struct {
path string
hash *prototypes.H160
}

type downloadInfo struct {
Expand Down Expand Up @@ -345,6 +353,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosi
downloading: map[string]*downloadInfo{},
webseedsDiscover: discover,
logPrefix: "",
completedTorrents: make(map[string]completedTorrentInfo),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This duplicates the info in downloading above and the info held locally in the local maps here:

https://github.com/erigontech/erigon/blob/ba451aac8b75bd851155e95ecfdab6fd5e8ec08b/erigon-lib/downloader/downloader.go#L889C3-L889C12

could you rationalize this so we end up with a single collection which is held by the downloader which keeps the whole download lifecycle.

This is a rationalization which is well overdue - seems like as you are adding another collection it would be a good time to make this change. Note that you may need to protect access to this map with the downloader mutex

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not do this as at some point we are deleting from the complete array.

delete(complete, status.name)

This meant that it may not contains completed parts and the complete array may be using for some other stuff so I decided to create own collection.

I think such changes must be done as separate task as it will be to many changes to logic in the same PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is every time we add new collections it makes the code gradually more complex - I don't think the change is that big. You can just add a status with the following states

complete
checking
failed
waiting

to the downloadInfo struct them you only need one map. If you want to add more info you need to do the refactor. I'm ok if you do 2 PR's one with the refactor and the second adding the notifier - but I don't think we should make the code change like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to simplify the change you can just do complete.

At the moment you are setting 2 variable with an external function doing the same thing here:

complete[t.Name()] = struct{}{}
...
d.torrentCompleted(t.Name(), t.InfoHash())

I don't think you should add this code duplication.

}
d.webseeds.SetTorrent(d.torrentFS, snapLock.Downloads, cfg.DownloadTorrentFilesFromWebseed)

Expand Down Expand Up @@ -906,6 +915,7 @@ func (d *Downloader) mainLoop(silent bool) error {
for _, t := range torrents {
if _, ok := complete[t.Name()]; ok {
clist = append(clist, t.Name())
d.torrentCompleted(t.Name(), t.InfoHash())
continue
}

Expand Down Expand Up @@ -959,6 +969,7 @@ func (d *Downloader) mainLoop(silent bool) error {
} else {
clist = append(clist, t.Name())
complete[t.Name()] = struct{}{}
d.torrentCompleted(t.Name(), t.InfoHash())
continue
}
}
Expand Down Expand Up @@ -1025,6 +1036,8 @@ func (d *Downloader) mainLoop(silent bool) error {
d.lock.Unlock()
complete[t.Name()] = struct{}{}
clist = append(clist, t.Name())
d.torrentCompleted(t.Name(), t.InfoHash())

continue
}

Expand Down Expand Up @@ -2762,6 +2775,7 @@ func (d *Downloader) BuildTorrentFilesIfNeed(ctx context.Context, chain string,
_, err := BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs, d.torrentFS, chain, ignore, false)
return err
}

func (d *Downloader) Stats() AggStats {
d.lock.RLock()
defer d.lock.RUnlock()
Expand Down Expand Up @@ -2946,3 +2960,32 @@ func calculateTime(amountLeft, rate uint64) string {
func (d *Downloader) Completed() bool {
return d.stats.Completed
}

// Store completed torrents in order to notify GrpcServer subscribers when they subscribe and there is already downloaded files
func (d *Downloader) torrentCompleted(tName string, tHash metainfo.Hash) {
d.lock.Lock()
defer d.lock.Unlock()
hash := InfoHashes2Proto(tHash)

//check is torrent already completed cause some funcs may call this method multiple times
if _, ok := d.completedTorrents[tName]; !ok {
d.notifyCompleted(tName, hash)
}

d.completedTorrents[tName] = completedTorrentInfo{
path: tName,
hash: hash,
}
}

// Notify GrpcServer subscribers about completed torrent
func (d *Downloader) notifyCompleted(tName string, tHash *prototypes.H160) {
d.broadcast(tName, tHash)
}

func (d *Downloader) getCompletedTorrents() map[string]completedTorrentInfo {
d.lock.RLock()
defer d.lock.RUnlock()

return d.completedTorrents
}
56 changes: 54 additions & 2 deletions erigon-lib/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"os"
"path/filepath"
"slices"
"sync"
"time"

"github.com/anacrolix/torrent/metainfo"
Expand All @@ -38,12 +40,20 @@ var (
)

func NewGrpcServer(d *Downloader) (*GrpcServer, error) {
return &GrpcServer{d: d}, nil
svr := &GrpcServer{
d: d,
}

d.broadcast = svr.broadcast

return svr, nil
}

type GrpcServer struct {
proto_downloader.UnimplementedDownloaderServer
d *Downloader
d *Downloader
mu sync.RWMutex
subscribers []proto_downloader.Downloader_TorrentCompletedServer
}

func (s *GrpcServer) ProhibitNewDownloads(ctx context.Context, req *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) {
Expand Down Expand Up @@ -129,6 +139,10 @@ func Proto2InfoHash(in *prototypes.H160) metainfo.Hash {
return gointerfaces.ConvertH160toAddress(in)
}

func InfoHashes2Proto(in metainfo.Hash) *prototypes.H160 {
return gointerfaces.ConvertAddressToH160(in)
}

func (s *GrpcServer) SetLogPrefix(ctx context.Context, request *proto_downloader.SetLogPrefixRequest) (*emptypb.Empty, error) {
s.d.SetLogPrefix(request.Prefix)

Expand All @@ -138,3 +152,41 @@ func (s *GrpcServer) SetLogPrefix(ctx context.Context, request *proto_downloader
func (s *GrpcServer) Completed(ctx context.Context, request *proto_downloader.CompletedRequest) (*proto_downloader.CompletedReply, error) {
return &proto_downloader.CompletedReply{Completed: s.d.Completed()}, nil
}

func (s *GrpcServer) TorrentCompleted(req *proto_downloader.TorrentCompletedRequest, stream proto_downloader.Downloader_TorrentCompletedServer) error {
// Register the new subscriber
s.mu.Lock()
s.subscribers = append(s.subscribers, stream)
s.mu.Unlock()

//Notifying about all completed torrents to the new subscriber
cmp := s.d.getCompletedTorrents()
for _, cmpInfo := range cmp {
s.broadcast(cmpInfo.path, cmpInfo.hash)
}

return nil
}

func (s *GrpcServer) broadcast(name string, hash *prototypes.H160) {
s.mu.RLock()
defer s.mu.RUnlock()

var unsub []int

for i, s := range s.subscribers {
if s.Context().Err() != nil {
unsub = append(unsub, i)
continue
}

s.Send(&proto_downloader.TorrentCompletedReply{
Name: name,
Hash: hash,
})
}

for i := len(unsub) - 1; i >= 0; i-- {
s.subscribers = slices.Delete(s.subscribers, unsub[i], unsub[i])
}
}
5 changes: 5 additions & 0 deletions erigon-lib/downloader/downloadergrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,8 @@ func String2Proto(in string) *prototypes.H160 {
copy(infoHash[:], inHex)
return gointerfaces.ConvertAddressToH160(infoHash)
}

func Proto2String(in *prototypes.H160) string {
addr := gointerfaces.ConvertH160toAddress(in)
return hex.EncodeToString(addr[:])
}
Loading
Loading