diff --git a/peerbroker/peerbroker.go b/peerbroker/peerbroker.go index ce3911b0..3ce356d2 100644 --- a/peerbroker/peerbroker.go +++ b/peerbroker/peerbroker.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "sync" "time" logging "github.com/ipfs/go-log" @@ -71,7 +70,6 @@ type PeerBroker struct { // Don't access/modify outside of run loop sources map[WantSource]struct{} - btchr *batcher } // New initializes a new PeerBroker for a given context. @@ -84,7 +82,6 @@ func New(ctx context.Context, wantManager WantManager) *PeerBroker { cancel: cancel, wantManager: wantManager, sources: make(map[WantSource]struct{}), - btchr: newBatcher(ctx, wantManager), } } @@ -128,7 +125,6 @@ func (pb *PeerBroker) Startup() { // Shutdown ends processing for the PeerBroker. func (pb *PeerBroker) Shutdown() { - pb.btchr.shutdown() pb.cancel() for s := range pb.sources { @@ -150,125 +146,25 @@ func (pb *PeerBroker) run() { } func (pb *PeerBroker) checkMatch() { - // log.Warningf(" checkMatch (%d sources)\n", len(pb.sources)) gotWant := true for gotWant { gotWant = false cnt := 0 for s := range pb.sources { peers := pb.wantManager.AvailablePeers() - // fmt.Printf(" avail peers (%d)\n", len(peers)) ask := s.MatchWantPeer(peers) - // fmt.Printf(" MatchWantPeer %v\n", w) if ask != nil { - // log.Warningf(" got ask %s", ask) gotWant = true cnt++ - // pb.btchr.addRequest(ask.Peer, batchWant{ask.Cid, ask.WantHaves, s.ID()}) - // log.Warningf(" want-blk->%s want-haves->%s: %s\n", ask.Peer, ask.PeerHaves, ask.Cid.String()[2:8]) pb.wantManager.WantBlocks(pb.ctx, ask.Peer, s.ID(), []cid.Cid{ask.Cid}, ask.WantHaves) for _, p := range ask.PeerHaves { - // log.Warningf(" ask.PeerHaves %s %s\n", p, ask.Cid.String()[2:8]) - // pb.btchr.addRequest(p, batchWant{cid.Cid{}, []cid.Cid{ask.Cid}, s.ID()}) pb.wantManager.WantBlocks(pb.ctx, p, s.ID(), []cid.Cid{}, []cid.Cid{ask.Cid}) } } } } - // log.Warningf(" checkMatch done (gotWant: %t)\n", gotWant) - // fmt.Printf(" checkMatch [done]\n") - // for s, pw := range batches { - // for p, b := range pw { - // if len(b) > 0 { - // pb.sendBatch(pb.ctx, s, p, b) - // } - // } - // } -} - -type batcher struct { - sync.Mutex - ctx context.Context - wantManager WantManager - batches map[peer.ID]*batchInfo -} - -type batchInfo struct { - timer *time.Timer - wants []batchWant -} - -type batchWant struct { - Cid cid.Cid - WantHaves []cid.Cid - SesId uint64 -} - -func newBatcher(ctx context.Context, wm WantManager) *batcher { - return &batcher{ - ctx: ctx, - batches: make(map[peer.ID]*batchInfo), - wantManager: wm, - } -} - -func (b *batcher) shutdown() { - for p := range b.batches { - b.batches[p].timer.Stop() - delete(b.batches, p) - } -} - -func (b *batcher) addRequest(p peer.ID, w batchWant) { - b.Lock() - defer b.Unlock() - - bi, ok := b.batches[p] - if !ok { - bi = &batchInfo{ - wants: make([]batchWant, 0, 1), - timer: time.AfterFunc(peerBatchDebounce, func() { - b.sendBatch(p) - }), - } - b.batches[p] = bi - } - bi.wants = append(bi.wants, w) - - if len(bi.wants) == maxPeerBatchSize { - b.sendBatch(p) - } -} - -func (b *batcher) sendBatch(p peer.ID) { - b.Lock() - defer b.Unlock() - - b.batches[p].timer.Stop() - - cids := cid.NewSet() - wantHaves := cid.NewSet() - batchWants := b.batches[p].wants - for _, b := range batchWants { - if b.Cid.Defined() { - cids.Add(b.Cid) - } - } - for _, b := range batchWants { - for _, c := range b.WantHaves { - if !cids.Has(c) { - wantHaves.Add(c) - } - } - } - - // TODO: Respect session id - sesid := uint64(1) - b.wantManager.WantBlocks(b.ctx, p, sesid, cids.Keys(), wantHaves.Keys()) - - delete(b.batches, p) } type registerSourceMessage struct {