Skip to content

Commit

Permalink
Merge pull request #459 from jbenet/bitswap-rounds-CR-by-btc
Browse files Browse the repository at this point in the history
Bitswap Rounds CR suggestions #2
  • Loading branch information
whyrusleeping committed Dec 17, 2014
2 parents b1b2dc5 + aaa46cc commit 00f746b
Show file tree
Hide file tree
Showing 12 changed files with 525 additions and 453 deletions.
96 changes: 46 additions & 50 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
package bitswap

import (
"math"
"sync"
"time"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange"
decision "github.com/jbenet/go-ipfs/exchange/bitswap/decision"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
Expand All @@ -24,18 +24,25 @@ import (

var log = eventlog.Logger("bitswap")

// Number of providers to request for sending a wantlist to
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
const maxProvidersPerRequest = 3
const (
// Number of providers to request for sending a wantlist to
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
maxProvidersPerRequest = 3
providerRequestTimeout = time.Second * 10
hasBlockTimeout = time.Second * 15
sizeBatchRequestChan = 32
// kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32
)

var providerRequestTimeout = time.Second * 10
var hasBlockTimeout = time.Second * 15
var rebroadcastDelay = time.Second * 10
var (
rebroadcastDelay = time.Second * 10
)

// New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as
// the network delegate.
// Runs until context is cancelled
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, routing bsnet.Routing,
bstore blockstore.Blockstore, nice bool) exchange.Interface {

Expand All @@ -52,11 +59,11 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout
blockstore: bstore,
cancelFunc: cancelFunc,
notifications: notif,
ledgermanager: strategy.NewLedgerManager(bstore, ctx),
engine: decision.NewEngine(ctx, bstore),
routing: routing,
sender: network,
wantlist: wl.New(),
batchRequests: make(chan []u.Key, 32),
wantlist: wantlist.NewThreadSafe(),
batchRequests: make(chan []u.Key, sizeBatchRequestChan),
}
network.SetDelegate(bs)
go bs.clientWorker(ctx)
Expand Down Expand Up @@ -85,13 +92,9 @@ type bitswap struct {
// have more than a single block in the set
batchRequests chan []u.Key

// strategy makes decisions about how to interact with partners.
// TODO: strategy commented out until we have a use for it again
//strategy strategy.Strategy
engine *decision.Engine

ledgermanager *strategy.LedgerManager

wantlist *wl.Wantlist
wantlist *wantlist.ThreadSafe

// cancelFunc signals cancellation to the bitswap event loop
cancelFunc func()
Expand Down Expand Up @@ -159,8 +162,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
}
bs.wantlist.Remove(blk.Key())
bs.notifications.Publish(blk)
child, _ := context.WithTimeout(ctx, hasBlockTimeout)
return bs.routing.Provide(child, blk.Key())
return bs.routing.Provide(ctx, blk.Key())
}

func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
Expand All @@ -169,7 +171,7 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
}
message := bsmsg.New()
for _, wanted := range bs.wantlist.Entries() {
message.AddEntry(wanted.Value, wanted.Priority)
message.AddEntry(wanted.Key, wanted.Priority)
}
wg := sync.WaitGroup{}
for peerToQuery := range peers {
Expand All @@ -193,21 +195,21 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
// FIXME ensure accounting is handled correctly when
// communication fails. May require slightly different API to
// get better guarantees. May need shared sequence numbers.
bs.ledgermanager.MessageSent(p, message)
bs.engine.MessageSent(p, message)
}(peerToQuery)
}
wg.Wait()
return nil
}

func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) {
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantlist.ThreadSafe) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

message := bsmsg.New()
message.SetFull(true)
for _, e := range bs.wantlist.Entries() {
message.AddEntry(e.Value, e.Priority)
message.AddEntry(e.Key, e.Priority)
}

ps := pset.NewPeerSet()
Expand All @@ -226,7 +228,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wan
bs.send(ctx, prov, message)
}
}
}(e.Value)
}(e.Key)
}
wg.Wait()
}
Expand All @@ -236,18 +238,8 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
select {
case <-ctx.Done():
return
case task := <-bs.ledgermanager.GetTaskChan():
block, err := bs.blockstore.Get(task.Key)
if err != nil {
log.Errorf("Expected to have block %s, but it was not found!", task.Key)
continue
}

message := bsmsg.New()
message.AddBlock(block)
// TODO: maybe add keys from our wantlist?

bs.send(ctx, task.Target, message)
case envelope := <-bs.engine.Outbox():
bs.send(ctx, envelope.Peer, envelope.Message)
}
}
}
Expand All @@ -272,7 +264,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
continue
}
for i, k := range ks {
bs.wantlist.Add(k, len(ks)-i)
bs.wantlist.Add(k, kMaxPriority-i)
}
// NB: send want list to providers for the first peer in this list.
// the assumption is made that the providers of the first key in
Expand Down Expand Up @@ -312,32 +304,36 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm

// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.ledgermanager.MessageReceived(p, incoming)
bs.engine.MessageReceived(p, incoming)
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger

var blkeys []u.Key
for _, block := range incoming.Blocks() {
blkeys = append(blkeys, block.Key())
if err := bs.HasBlock(ctx, block); err != nil {
hasBlockCtx, _ := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, block); err != nil {
log.Error(err)
}
}
if len(blkeys) > 0 {
bs.cancelBlocks(ctx, blkeys)
var keys []u.Key
for _, block := range incoming.Blocks() {
keys = append(keys, block.Key())
}
bs.cancelBlocks(ctx, keys)

// TODO: consider changing this function to not return anything
return nil, nil
}

func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
if len(bkeys) < 1 {
return
}
message := bsmsg.New()
message.SetFull(false)
for _, k := range bkeys {
message.Cancel(k)
}
for _, p := range bs.ledgermanager.Peers() {
for _, p := range bs.engine.Peers() {
err := bs.send(ctx, p, message)
if err != nil {
log.Errorf("Error sending message: %s", err)
Expand All @@ -357,7 +353,7 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage
if err := bs.sender.SendMessage(ctx, p, m); err != nil {
return err
}
return bs.ledgermanager.MessageSent(p, m)
return bs.engine.MessageSent(p, m)
}

func (bs *bitswap) Close() error {
Expand Down
Loading

0 comments on commit 00f746b

Please sign in to comment.