Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
fix: Remove batching from peer broker
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Sep 11, 2019
1 parent fb45343 commit a319ef1
Showing 1 changed file with 0 additions and 104 deletions.
104 changes: 0 additions & 104 deletions peerbroker/peerbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"sync"
"time"

logging "github.com/ipfs/go-log"
Expand Down Expand Up @@ -71,7 +70,6 @@ type PeerBroker struct {

// Don't access/modify outside of run loop
sources map[WantSource]struct{}
btchr *batcher
}

// New initializes a new PeerBroker for a given context.
Expand All @@ -84,7 +82,6 @@ func New(ctx context.Context, wantManager WantManager) *PeerBroker {
cancel: cancel,
wantManager: wantManager,
sources: make(map[WantSource]struct{}),
btchr: newBatcher(ctx, wantManager),
}
}

Expand Down Expand Up @@ -128,7 +125,6 @@ func (pb *PeerBroker) Startup() {

// Shutdown ends processing for the PeerBroker.
func (pb *PeerBroker) Shutdown() {
pb.btchr.shutdown()
pb.cancel()

for s := range pb.sources {
Expand All @@ -150,125 +146,25 @@ func (pb *PeerBroker) run() {
}

func (pb *PeerBroker) checkMatch() {
// log.Warningf(" checkMatch (%d sources)\n", len(pb.sources))
gotWant := true
for gotWant {
gotWant = false
cnt := 0
for s := range pb.sources {
peers := pb.wantManager.AvailablePeers()
// fmt.Printf(" avail peers (%d)\n", len(peers))
ask := s.MatchWantPeer(peers)
// fmt.Printf(" MatchWantPeer %v\n", w)
if ask != nil {
// log.Warningf(" got ask %s", ask)
gotWant = true
cnt++

// pb.btchr.addRequest(ask.Peer, batchWant{ask.Cid, ask.WantHaves, s.ID()})
// log.Warningf(" want-blk->%s want-haves->%s: %s\n", ask.Peer, ask.PeerHaves, ask.Cid.String()[2:8])
pb.wantManager.WantBlocks(pb.ctx, ask.Peer, s.ID(), []cid.Cid{ask.Cid}, ask.WantHaves)

for _, p := range ask.PeerHaves {
// log.Warningf(" ask.PeerHaves %s %s\n", p, ask.Cid.String()[2:8])
// pb.btchr.addRequest(p, batchWant{cid.Cid{}, []cid.Cid{ask.Cid}, s.ID()})
pb.wantManager.WantBlocks(pb.ctx, p, s.ID(), []cid.Cid{}, []cid.Cid{ask.Cid})
}
}
}
}
// log.Warningf(" checkMatch done (gotWant: %t)\n", gotWant)
// fmt.Printf(" checkMatch [done]\n")
// for s, pw := range batches {
// for p, b := range pw {
// if len(b) > 0 {
// pb.sendBatch(pb.ctx, s, p, b)
// }
// }
// }
}

type batcher struct {
sync.Mutex
ctx context.Context
wantManager WantManager
batches map[peer.ID]*batchInfo
}

type batchInfo struct {
timer *time.Timer
wants []batchWant
}

type batchWant struct {
Cid cid.Cid
WantHaves []cid.Cid
SesId uint64
}

func newBatcher(ctx context.Context, wm WantManager) *batcher {
return &batcher{
ctx: ctx,
batches: make(map[peer.ID]*batchInfo),
wantManager: wm,
}
}

func (b *batcher) shutdown() {
for p := range b.batches {
b.batches[p].timer.Stop()
delete(b.batches, p)
}
}

func (b *batcher) addRequest(p peer.ID, w batchWant) {
b.Lock()
defer b.Unlock()

bi, ok := b.batches[p]
if !ok {
bi = &batchInfo{
wants: make([]batchWant, 0, 1),
timer: time.AfterFunc(peerBatchDebounce, func() {
b.sendBatch(p)
}),
}
b.batches[p] = bi
}
bi.wants = append(bi.wants, w)

if len(bi.wants) == maxPeerBatchSize {
b.sendBatch(p)
}
}

func (b *batcher) sendBatch(p peer.ID) {
b.Lock()
defer b.Unlock()

b.batches[p].timer.Stop()

cids := cid.NewSet()
wantHaves := cid.NewSet()
batchWants := b.batches[p].wants
for _, b := range batchWants {
if b.Cid.Defined() {
cids.Add(b.Cid)
}
}
for _, b := range batchWants {
for _, c := range b.WantHaves {
if !cids.Has(c) {
wantHaves.Add(c)
}
}
}

// TODO: Respect session id
sesid := uint64(1)
b.wantManager.WantBlocks(b.ctx, p, sesid, cids.Keys(), wantHaves.Keys())

delete(b.batches, p)
}

type registerSourceMessage struct {
Expand Down

0 comments on commit a319ef1

Please sign in to comment.