diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 5cf28c96d427..149996b3a6b5 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -3,19 +3,19 @@ package bitswap import ( + "math" "sync" "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - blocks "github.com/jbenet/go-ipfs/blocks" blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" exchange "github.com/jbenet/go-ipfs/exchange" + decision "github.com/jbenet/go-ipfs/exchange/bitswap/decision" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications" - strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" - wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" + wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" eventlog "github.com/jbenet/go-ipfs/util/eventlog" @@ -24,18 +24,25 @@ import ( var log = eventlog.Logger("bitswap") -// Number of providers to request for sending a wantlist to -// TODO: if a 'non-nice' strategy is implemented, consider increasing this value -const maxProvidersPerRequest = 3 +const ( + // Number of providers to request for sending a wantlist to + // TODO: if a 'non-nice' strategy is implemented, consider increasing this value + maxProvidersPerRequest = 3 + providerRequestTimeout = time.Second * 10 + hasBlockTimeout = time.Second * 15 + sizeBatchRequestChan = 32 + // kMaxPriority is the max priority as defined by the bitswap protocol + kMaxPriority = math.MaxInt32 +) -var providerRequestTimeout = time.Second * 10 -var hasBlockTimeout = time.Second * 15 -var rebroadcastDelay = time.Second * 10 +var ( + rebroadcastDelay = time.Second * 10 +) -// New initializes a BitSwap instance that communicates over the -// provided BitSwapNetwork. This function registers the returned instance as -// the network delegate. -// Runs until context is cancelled +// New initializes a BitSwap instance that communicates over the provided +// BitSwapNetwork. This function registers the returned instance as the network +// delegate. +// Runs until context is cancelled. func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, routing bsnet.Routing, bstore blockstore.Blockstore, nice bool) exchange.Interface { @@ -52,11 +59,11 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout blockstore: bstore, cancelFunc: cancelFunc, notifications: notif, - ledgermanager: strategy.NewLedgerManager(bstore, ctx), + engine: decision.NewEngine(ctx, bstore), routing: routing, sender: network, - wantlist: wl.New(), - batchRequests: make(chan []u.Key, 32), + wantlist: wantlist.NewThreadSafe(), + batchRequests: make(chan []u.Key, sizeBatchRequestChan), } network.SetDelegate(bs) go bs.clientWorker(ctx) @@ -85,13 +92,9 @@ type bitswap struct { // have more than a single block in the set batchRequests chan []u.Key - // strategy makes decisions about how to interact with partners. - // TODO: strategy commented out until we have a use for it again - //strategy strategy.Strategy + engine *decision.Engine - ledgermanager *strategy.LedgerManager - - wantlist *wl.Wantlist + wantlist *wantlist.ThreadSafe // cancelFunc signals cancellation to the bitswap event loop cancelFunc func() @@ -159,8 +162,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { } bs.wantlist.Remove(blk.Key()) bs.notifications.Publish(blk) - child, _ := context.WithTimeout(ctx, hasBlockTimeout) - return bs.routing.Provide(child, blk.Key()) + return bs.routing.Provide(ctx, blk.Key()) } func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error { @@ -169,7 +171,7 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e } message := bsmsg.New() for _, wanted := range bs.wantlist.Entries() { - message.AddEntry(wanted.Value, wanted.Priority) + message.AddEntry(wanted.Key, wanted.Priority) } wg := sync.WaitGroup{} for peerToQuery := range peers { @@ -193,21 +195,21 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e // FIXME ensure accounting is handled correctly when // communication fails. May require slightly different API to // get better guarantees. May need shared sequence numbers. - bs.ledgermanager.MessageSent(p, message) + bs.engine.MessageSent(p, message) }(peerToQuery) } wg.Wait() return nil } -func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) { +func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantlist.ThreadSafe) { ctx, cancel := context.WithCancel(ctx) defer cancel() message := bsmsg.New() message.SetFull(true) for _, e := range bs.wantlist.Entries() { - message.AddEntry(e.Value, e.Priority) + message.AddEntry(e.Key, e.Priority) } ps := pset.NewPeerSet() @@ -226,7 +228,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wan bs.send(ctx, prov, message) } } - }(e.Value) + }(e.Key) } wg.Wait() } @@ -236,18 +238,8 @@ func (bs *bitswap) taskWorker(ctx context.Context) { select { case <-ctx.Done(): return - case task := <-bs.ledgermanager.GetTaskChan(): - block, err := bs.blockstore.Get(task.Key) - if err != nil { - log.Errorf("Expected to have block %s, but it was not found!", task.Key) - continue - } - - message := bsmsg.New() - message.AddBlock(block) - // TODO: maybe add keys from our wantlist? - - bs.send(ctx, task.Target, message) + case envelope := <-bs.engine.Outbox(): + bs.send(ctx, envelope.Peer, envelope.Message) } } } @@ -272,7 +264,7 @@ func (bs *bitswap) clientWorker(parent context.Context) { continue } for i, k := range ks { - bs.wantlist.Add(k, len(ks)-i) + bs.wantlist.Add(k, kMaxPriority-i) } // NB: send want list to providers for the first peer in this list. // the assumption is made that the providers of the first key in @@ -312,32 +304,36 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm // This call records changes to wantlists, blocks received, // and number of bytes transfered. - bs.ledgermanager.MessageReceived(p, incoming) + bs.engine.MessageReceived(p, incoming) // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger - var blkeys []u.Key for _, block := range incoming.Blocks() { - blkeys = append(blkeys, block.Key()) - if err := bs.HasBlock(ctx, block); err != nil { + hasBlockCtx, _ := context.WithTimeout(ctx, hasBlockTimeout) + if err := bs.HasBlock(hasBlockCtx, block); err != nil { log.Error(err) } } - if len(blkeys) > 0 { - bs.cancelBlocks(ctx, blkeys) + var keys []u.Key + for _, block := range incoming.Blocks() { + keys = append(keys, block.Key()) } + bs.cancelBlocks(ctx, keys) // TODO: consider changing this function to not return anything return nil, nil } func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { + if len(bkeys) < 1 { + return + } message := bsmsg.New() message.SetFull(false) for _, k := range bkeys { message.Cancel(k) } - for _, p := range bs.ledgermanager.Peers() { + for _, p := range bs.engine.Peers() { err := bs.send(ctx, p, message) if err != nil { log.Errorf("Error sending message: %s", err) @@ -357,7 +353,7 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage if err := bs.sender.SendMessage(ctx, p, m); err != nil { return err } - return bs.ledgermanager.MessageSent(p, m) + return bs.engine.MessageSent(p, m) } func (bs *bitswap) Close() error { diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go new file mode 100644 index 000000000000..ea453943731b --- /dev/null +++ b/exchange/bitswap/decision/engine.go @@ -0,0 +1,224 @@ +package decision + +import ( + "sync" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + bstore "github.com/jbenet/go-ipfs/blocks/blockstore" + bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" + wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" +) + +// TODO consider taking responsibility for other types of requests. For +// example, there could be a |cancelQueue| for all of the cancellation +// messages that need to go out. There could also be a |wantlistQueue| for +// the local peer's wantlists. Alternatively, these could all be bundled +// into a single, intelligent global queue that efficiently +// batches/combines and takes all of these into consideration. +// +// Right now, messages go onto the network for four reasons: +// 1. an initial `sendwantlist` message to a provider of the first key in a request +// 2. a periodic full sweep of `sendwantlist` messages to all providers +// 3. upon receipt of blocks, a `cancel` message to all peers +// 4. draining the priority queue of `blockrequests` from peers +// +// Presently, only `blockrequests` are handled by the decision engine. +// However, there is an opportunity to give it more responsibility! If the +// decision engine is given responsibility for all of the others, it can +// intelligently decide how to combine requests efficiently. +// +// Some examples of what would be possible: +// +// * when sending out the wantlists, include `cancel` requests +// * when handling `blockrequests`, include `sendwantlist` and `cancel` as appropriate +// * when handling `cancel`, if we recently received a wanted block from a +// peer, include a partial wantlist that contains a few other high priority +// blocks +// +// In a sense, if we treat the decision engine as a black box, it could do +// whatever it sees fit to produce desired outcomes (get wanted keys +// quickly, maintain good relationships with peers, etc). + +var log = u.Logger("engine") + +const ( + sizeOutboxChan = 4 +) + +// Envelope contains a message for a Peer +type Envelope struct { + // Peer is the intended recipient + Peer peer.Peer + // Message is the payload + Message bsmsg.BitSwapMessage +} + +type Engine struct { + // peerRequestQueue is a priority queue of requests received from peers. + // Requests are popped from the queue, packaged up, and placed in the + // outbox. + peerRequestQueue *taskQueue + + // FIXME it's a bit odd for the client and the worker to both share memory + // (both modify the peerRequestQueue) and also to communicate over the + // workSignal channel. consider sending requests over the channel and + // allowing the worker to have exclusive access to the peerRequestQueue. In + // that case, no lock would be required. + workSignal chan struct{} + + // outbox contains outgoing messages to peers + outbox chan Envelope + + bs bstore.Blockstore + + lock sync.RWMutex // protects the fields immediatly below + // ledgerMap lists Ledgers by their Partner key. + ledgerMap map[u.Key]*ledger +} + +func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { + e := &Engine{ + ledgerMap: make(map[u.Key]*ledger), + bs: bs, + peerRequestQueue: newTaskQueue(), + outbox: make(chan Envelope, sizeOutboxChan), + workSignal: make(chan struct{}), + } + go e.taskWorker(ctx) + return e +} + +func (e *Engine) taskWorker(ctx context.Context) { + for { + nextTask := e.peerRequestQueue.Pop() + if nextTask == nil { + // No tasks in the list? + // Wait until there are! + select { + case <-ctx.Done(): + return + case <-e.workSignal: + } + continue + } + block, err := e.bs.Get(nextTask.Entry.Key) + if err != nil { + log.Warning("engine: task exists to send block, but block is not in blockstore") + continue + } + // construct message here so we can make decisions about any additional + // information we may want to include at this time. + m := bsmsg.New() + m.AddBlock(block) + // TODO: maybe add keys from our wantlist? + select { + case <-ctx.Done(): + return + case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}: + } + } +} + +func (e *Engine) Outbox() <-chan Envelope { + return e.outbox +} + +// Returns a slice of Peers with whom the local node has active sessions +func (e *Engine) Peers() []peer.Peer { + e.lock.RLock() + defer e.lock.RUnlock() + + response := make([]peer.Peer, 0) + for _, ledger := range e.ledgerMap { + response = append(response, ledger.Partner) + } + return response +} + +// MessageReceived performs book-keeping. Returns error if passed invalid +// arguments. +func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { + newWorkExists := false + defer func() { + if newWorkExists { + // Signal task generation to restart (if stopped!) + select { + case e.workSignal <- struct{}{}: + default: + } + } + }() + e.lock.Lock() + defer e.lock.Unlock() + + l := e.findOrCreate(p) + if m.Full() { + l.wantList = wl.New() + } + for _, entry := range m.Wantlist() { + if entry.Cancel { + l.CancelWant(entry.Key) + e.peerRequestQueue.Remove(entry.Key, p) + } else { + l.Wants(entry.Key, entry.Priority) + if exists, err := e.bs.Has(entry.Key); err == nil && exists { + newWorkExists = true + e.peerRequestQueue.Push(entry.Entry, p) + } + } + } + + for _, block := range m.Blocks() { + // FIXME extract blocks.NumBytes(block) or block.NumBytes() method + l.ReceivedBytes(len(block.Data)) + for _, l := range e.ledgerMap { + if l.WantListContains(block.Key()) { + newWorkExists = true + e.peerRequestQueue.Push(wl.Entry{block.Key(), 1}, l.Partner) + } + } + } + return nil +} + +// TODO add contents of m.WantList() to my local wantlist? NB: could introduce +// race conditions where I send a message, but MessageSent gets handled after +// MessageReceived. The information in the local wantlist could become +// inconsistent. Would need to ensure that Sends and acknowledgement of the +// send happen atomically + +func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error { + e.lock.Lock() + defer e.lock.Unlock() + + l := e.findOrCreate(p) + for _, block := range m.Blocks() { + l.SentBytes(len(block.Data)) + l.wantList.Remove(block.Key()) + e.peerRequestQueue.Remove(block.Key(), p) + } + + return nil +} + +func (e *Engine) numBytesSentTo(p peer.Peer) uint64 { + // NB not threadsafe + return e.findOrCreate(p).Accounting.BytesSent +} + +func (e *Engine) numBytesReceivedFrom(p peer.Peer) uint64 { + // NB not threadsafe + return e.findOrCreate(p).Accounting.BytesRecv +} + +// ledger lazily instantiates a ledger +func (e *Engine) findOrCreate(p peer.Peer) *ledger { + l, ok := e.ledgerMap[p.Key()] + if !ok { + l = newLedger(p) + e.ledgerMap[p.Key()] = l + } + return l +} diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go new file mode 100644 index 000000000000..148937573033 --- /dev/null +++ b/exchange/bitswap/decision/engine_test.go @@ -0,0 +1,93 @@ +package decision + +import ( + "strings" + "testing" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + blocks "github.com/jbenet/go-ipfs/blocks" + blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" + message "github.com/jbenet/go-ipfs/exchange/bitswap/message" + peer "github.com/jbenet/go-ipfs/peer" + testutil "github.com/jbenet/go-ipfs/util/testutil" +) + +type peerAndEngine struct { + peer.Peer + Engine *Engine +} + +func newPeerAndLedgermanager(idStr string) peerAndEngine { + return peerAndEngine{ + Peer: testutil.NewPeerWithIDString(idStr), + //Strategy: New(true), + Engine: NewEngine(context.TODO(), + blockstore.NewBlockstore(sync.MutexWrap(ds.NewMapDatastore()))), + } +} + +func TestConsistentAccounting(t *testing.T) { + sender := newPeerAndLedgermanager("Ernie") + receiver := newPeerAndLedgermanager("Bert") + + // Send messages from Ernie to Bert + for i := 0; i < 1000; i++ { + + m := message.New() + content := []string{"this", "is", "message", "i"} + m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " ")))) + + sender.Engine.MessageSent(receiver.Peer, m) + receiver.Engine.MessageReceived(sender.Peer, m) + } + + // Ensure sender records the change + if sender.Engine.numBytesSentTo(receiver.Peer) == 0 { + t.Fatal("Sent bytes were not recorded") + } + + // Ensure sender and receiver have the same values + if sender.Engine.numBytesSentTo(receiver.Peer) != receiver.Engine.numBytesReceivedFrom(sender.Peer) { + t.Fatal("Inconsistent book-keeping. Strategies don't agree") + } + + // Ensure sender didn't record receving anything. And that the receiver + // didn't record sending anything + if receiver.Engine.numBytesSentTo(sender.Peer) != 0 || sender.Engine.numBytesReceivedFrom(receiver.Peer) != 0 { + t.Fatal("Bert didn't send bytes to Ernie") + } +} + +func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) { + + sanfrancisco := newPeerAndLedgermanager("sf") + seattle := newPeerAndLedgermanager("sea") + + m := message.New() + + sanfrancisco.Engine.MessageSent(seattle.Peer, m) + seattle.Engine.MessageReceived(sanfrancisco.Peer, m) + + if seattle.Peer.Key() == sanfrancisco.Peer.Key() { + t.Fatal("Sanity Check: Peers have same Key!") + } + + if !peerIsPartner(seattle.Peer, sanfrancisco.Engine) { + t.Fatal("Peer wasn't added as a Partner") + } + + if !peerIsPartner(sanfrancisco.Peer, seattle.Engine) { + t.Fatal("Peer wasn't added as a Partner") + } +} + +func peerIsPartner(p peer.Peer, e *Engine) bool { + for _, partner := range e.Peers() { + if partner.Key() == p.Key() { + return true + } + } + return false +} diff --git a/exchange/bitswap/strategy/ledger.go b/exchange/bitswap/decision/ledger.go similarity index 99% rename from exchange/bitswap/strategy/ledger.go rename to exchange/bitswap/decision/ledger.go index 649c1e73e14d..eea87af1fcf6 100644 --- a/exchange/bitswap/strategy/ledger.go +++ b/exchange/bitswap/decision/ledger.go @@ -1,4 +1,4 @@ -package strategy +package decision import ( "time" diff --git a/exchange/bitswap/decision/taskqueue.go b/exchange/bitswap/decision/taskqueue.go new file mode 100644 index 000000000000..a76c56e9ba30 --- /dev/null +++ b/exchange/bitswap/decision/taskqueue.go @@ -0,0 +1,84 @@ +package decision + +import ( + "sync" + + wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" +) + +// TODO: at some point, the strategy needs to plug in here +// to help decide how to sort tasks (on add) and how to select +// tasks (on getnext). For now, we are assuming a dumb/nice strategy. +type taskQueue struct { + // TODO: make this into a priority queue + lock sync.Mutex + tasks []*task + taskmap map[string]*task +} + +func newTaskQueue() *taskQueue { + return &taskQueue{ + taskmap: make(map[string]*task), + } +} + +type task struct { + Entry wantlist.Entry + Target peer.Peer + Trash bool +} + +// Push currently adds a new task to the end of the list +func (tl *taskQueue) Push(entry wantlist.Entry, to peer.Peer) { + tl.lock.Lock() + defer tl.lock.Unlock() + if task, ok := tl.taskmap[taskKey(to, entry.Key)]; ok { + // TODO: when priority queue is implemented, + // rearrange this task + task.Entry.Priority = entry.Priority + return + } + task := &task{ + Entry: entry, + Target: to, + } + tl.tasks = append(tl.tasks, task) + tl.taskmap[taskKey(to, entry.Key)] = task +} + +// Pop 'pops' the next task to be performed. Returns nil no task exists. +func (tl *taskQueue) Pop() *task { + tl.lock.Lock() + defer tl.lock.Unlock() + var out *task + for len(tl.tasks) > 0 { + // TODO: instead of zero, use exponential distribution + // it will help reduce the chance of receiving + // the same block from multiple peers + out = tl.tasks[0] + tl.tasks = tl.tasks[1:] + delete(tl.taskmap, taskKey(out.Target, out.Entry.Key)) + if out.Trash { + continue // discarding tasks that have been removed + } + break // and return |out| + } + return out +} + +// Remove lazily removes a task from the queue +func (tl *taskQueue) Remove(k u.Key, p peer.Peer) { + tl.lock.Lock() + t, ok := tl.taskmap[taskKey(p, k)] + if ok { + t.Trash = true + } + tl.lock.Unlock() +} + +// taskKey returns a key that uniquely identifies a task. +func taskKey(p peer.Peer, k u.Key) string { + return string(p.Key() + k) +} diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 832eb7b29a84..e2346ed8d7e2 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -4,6 +4,7 @@ import ( proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" blocks "github.com/jbenet/go-ipfs/blocks" pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb" + wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" netmsg "github.com/jbenet/go-ipfs/net/message" nm "github.com/jbenet/go-ipfs/net/message" peer "github.com/jbenet/go-ipfs/peer" @@ -16,7 +17,7 @@ import ( type BitSwapMessage interface { // Wantlist returns a slice of unique keys that represent data wanted by // the sender. - Wantlist() []*Entry + Wantlist() []Entry // Blocks returns a slice of unique blocks Blocks() []*blocks.Block @@ -45,7 +46,7 @@ type Exportable interface { type impl struct { full bool - wantlist map[u.Key]*Entry + wantlist map[u.Key]Entry blocks map[u.Key]*blocks.Block // map to detect duplicates } @@ -56,15 +57,14 @@ func New() BitSwapMessage { func newMsg() *impl { return &impl{ blocks: make(map[u.Key]*blocks.Block), - wantlist: make(map[u.Key]*Entry), + wantlist: make(map[u.Key]Entry), full: true, } } type Entry struct { - Key u.Key - Priority int - Cancel bool + wantlist.Entry + Cancel bool } func newMessageFromProto(pbm pb.Message) BitSwapMessage { @@ -88,8 +88,8 @@ func (m *impl) Full() bool { return m.full } -func (m *impl) Wantlist() []*Entry { - var out []*Entry +func (m *impl) Wantlist() []Entry { + var out []Entry for _, e := range m.wantlist { out = append(out, e) } @@ -118,10 +118,12 @@ func (m *impl) addEntry(k u.Key, priority int, cancel bool) { e.Priority = priority e.Cancel = cancel } else { - m.wantlist[k] = &Entry{ - Key: k, - Priority: priority, - Cancel: cancel, + m.wantlist[k] = Entry{ + Entry: wantlist.Entry{ + Key: k, + Priority: priority, + }, + Cancel: cancel, } } } diff --git a/exchange/bitswap/strategy/ledger_test.go b/exchange/bitswap/strategy/ledger_test.go deleted file mode 100644 index 4271d525c201..000000000000 --- a/exchange/bitswap/strategy/ledger_test.go +++ /dev/null @@ -1 +0,0 @@ -package strategy diff --git a/exchange/bitswap/strategy/ledgermanager.go b/exchange/bitswap/strategy/ledgermanager.go deleted file mode 100644 index df10072ebef2..000000000000 --- a/exchange/bitswap/strategy/ledgermanager.go +++ /dev/null @@ -1,179 +0,0 @@ -package strategy - -import ( - "sync" - - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - - bstore "github.com/jbenet/go-ipfs/blocks/blockstore" - bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" - wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" -) - -var log = u.Logger("strategy") - -// LedgerMap lists Ledgers by their Partner key. -type ledgerMap map[peerKey]*ledger - -// FIXME share this externally -type peerKey u.Key - -type LedgerManager struct { - lock sync.RWMutex - ledgerMap ledgerMap - bs bstore.Blockstore - tasklist *TaskList - taskOut chan *Task - workSignal chan struct{} -} - -func NewLedgerManager(bs bstore.Blockstore, ctx context.Context) *LedgerManager { - lm := &LedgerManager{ - ledgerMap: make(ledgerMap), - bs: bs, - tasklist: NewTaskList(), - taskOut: make(chan *Task, 4), - workSignal: make(chan struct{}), - } - go lm.taskWorker(ctx) - return lm -} - -func (lm *LedgerManager) taskWorker(ctx context.Context) { - for { - nextTask := lm.tasklist.Pop() - if nextTask == nil { - // No tasks in the list? - // Wait until there are! - select { - case <-ctx.Done(): - return - case <-lm.workSignal: - } - continue - } - - select { - case <-ctx.Done(): - return - case lm.taskOut <- nextTask: - } - } -} - -func (lm *LedgerManager) GetTaskChan() <-chan *Task { - return lm.taskOut -} - -// Returns a slice of Peers with whom the local node has active sessions -func (lm *LedgerManager) Peers() []peer.Peer { - lm.lock.RLock() - defer lm.lock.RUnlock() - - response := make([]peer.Peer, 0) - for _, ledger := range lm.ledgerMap { - response = append(response, ledger.Partner) - } - return response -} - -// BlockIsWantedByPeer returns true if peer wants the block given by this -// key -func (lm *LedgerManager) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool { - lm.lock.RLock() - defer lm.lock.RUnlock() - - ledger := lm.findOrCreate(p) - return ledger.WantListContains(k) -} - -// MessageReceived performs book-keeping. Returns error if passed invalid -// arguments. -func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { - lm.lock.Lock() - defer lm.lock.Unlock() - - l := lm.findOrCreate(p) - if m.Full() { - l.wantList = wl.New() - } - for _, e := range m.Wantlist() { - if e.Cancel { - l.CancelWant(e.Key) - lm.tasklist.Cancel(e.Key, p) - } else { - l.Wants(e.Key, e.Priority) - lm.tasklist.Push(e.Key, e.Priority, p) - - // Signal task generation to restart (if stopped!) - select { - case lm.workSignal <- struct{}{}: - default: - } - } - } - - for _, block := range m.Blocks() { - // FIXME extract blocks.NumBytes(block) or block.NumBytes() method - l.ReceivedBytes(len(block.Data)) - for _, l := range lm.ledgerMap { - if l.WantListContains(block.Key()) { - lm.tasklist.Push(block.Key(), 1, l.Partner) - - // Signal task generation to restart (if stopped!) - select { - case lm.workSignal <- struct{}{}: - default: - } - - } - } - } - return nil -} - -// TODO add contents of m.WantList() to my local wantlist? NB: could introduce -// race conditions where I send a message, but MessageSent gets handled after -// MessageReceived. The information in the local wantlist could become -// inconsistent. Would need to ensure that Sends and acknowledgement of the -// send happen atomically - -func (lm *LedgerManager) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error { - lm.lock.Lock() - defer lm.lock.Unlock() - - l := lm.findOrCreate(p) - for _, block := range m.Blocks() { - l.SentBytes(len(block.Data)) - l.wantList.Remove(block.Key()) - lm.tasklist.Cancel(block.Key(), p) - } - - return nil -} - -func (lm *LedgerManager) NumBytesSentTo(p peer.Peer) uint64 { - lm.lock.RLock() - defer lm.lock.RUnlock() - - return lm.findOrCreate(p).Accounting.BytesSent -} - -func (lm *LedgerManager) NumBytesReceivedFrom(p peer.Peer) uint64 { - lm.lock.RLock() - defer lm.lock.RUnlock() - - return lm.findOrCreate(p).Accounting.BytesRecv -} - -// ledger lazily instantiates a ledger -func (lm *LedgerManager) findOrCreate(p peer.Peer) *ledger { - l, ok := lm.ledgerMap[peerKey(p.Key())] - if !ok { - l = newLedger(p) - lm.ledgerMap[peerKey(p.Key())] = l - } - return l -} diff --git a/exchange/bitswap/strategy/ledgermanager_test.go b/exchange/bitswap/strategy/ledgermanager_test.go deleted file mode 100644 index f2a98cb77431..000000000000 --- a/exchange/bitswap/strategy/ledgermanager_test.go +++ /dev/null @@ -1,107 +0,0 @@ -package strategy - -import ( - "strings" - "testing" - - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - - blocks "github.com/jbenet/go-ipfs/blocks" - message "github.com/jbenet/go-ipfs/exchange/bitswap/message" - peer "github.com/jbenet/go-ipfs/peer" - testutil "github.com/jbenet/go-ipfs/util/testutil" -) - -type peerAndLedgermanager struct { - peer.Peer - ls *LedgerManager -} - -func newPeerAndLedgermanager(idStr string) peerAndLedgermanager { - return peerAndLedgermanager{ - Peer: testutil.NewPeerWithIDString(idStr), - //Strategy: New(true), - ls: NewLedgerManager(nil, context.TODO()), - } -} - -func TestConsistentAccounting(t *testing.T) { - sender := newPeerAndLedgermanager("Ernie") - receiver := newPeerAndLedgermanager("Bert") - - // Send messages from Ernie to Bert - for i := 0; i < 1000; i++ { - - m := message.New() - content := []string{"this", "is", "message", "i"} - m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " ")))) - - sender.ls.MessageSent(receiver.Peer, m) - receiver.ls.MessageReceived(sender.Peer, m) - } - - // Ensure sender records the change - if sender.ls.NumBytesSentTo(receiver.Peer) == 0 { - t.Fatal("Sent bytes were not recorded") - } - - // Ensure sender and receiver have the same values - if sender.ls.NumBytesSentTo(receiver.Peer) != receiver.ls.NumBytesReceivedFrom(sender.Peer) { - t.Fatal("Inconsistent book-keeping. Strategies don't agree") - } - - // Ensure sender didn't record receving anything. And that the receiver - // didn't record sending anything - if receiver.ls.NumBytesSentTo(sender.Peer) != 0 || sender.ls.NumBytesReceivedFrom(receiver.Peer) != 0 { - t.Fatal("Bert didn't send bytes to Ernie") - } -} - -func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) { - beggar := newPeerAndLedgermanager("can't be chooser") - chooser := newPeerAndLedgermanager("chooses JIF") - - block := blocks.NewBlock([]byte("data wanted by beggar")) - - messageFromBeggarToChooser := message.New() - messageFromBeggarToChooser.AddEntry(block.Key(), 1) - - chooser.ls.MessageReceived(beggar.Peer, messageFromBeggarToChooser) - // for this test, doesn't matter if you record that beggar sent - - if !chooser.ls.BlockIsWantedByPeer(block.Key(), beggar.Peer) { - t.Fatal("chooser failed to record that beggar wants block") - } -} - -func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) { - - sanfrancisco := newPeerAndLedgermanager("sf") - seattle := newPeerAndLedgermanager("sea") - - m := message.New() - - sanfrancisco.ls.MessageSent(seattle.Peer, m) - seattle.ls.MessageReceived(sanfrancisco.Peer, m) - - if seattle.Peer.Key() == sanfrancisco.Peer.Key() { - t.Fatal("Sanity Check: Peers have same Key!") - } - - if !peerIsPartner(seattle.Peer, sanfrancisco.ls) { - t.Fatal("Peer wasn't added as a Partner") - } - - if !peerIsPartner(sanfrancisco.Peer, seattle.ls) { - t.Fatal("Peer wasn't added as a Partner") - } -} - -func peerIsPartner(p peer.Peer, ls *LedgerManager) bool { - for _, partner := range ls.Peers() { - if partner.Key() == p.Key() { - return true - } - } - return false -} diff --git a/exchange/bitswap/strategy/tasklist.go b/exchange/bitswap/strategy/tasklist.go deleted file mode 100644 index f0a1b7d0056d..000000000000 --- a/exchange/bitswap/strategy/tasklist.go +++ /dev/null @@ -1,72 +0,0 @@ -package strategy - -import ( - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" -) - -// TODO: at some point, the strategy needs to plug in here -// to help decide how to sort tasks (on add) and how to select -// tasks (on getnext). For now, we are assuming a dumb/nice strategy. -type TaskList struct { - tasks []*Task - taskmap map[u.Key]*Task -} - -func NewTaskList() *TaskList { - return &TaskList{ - taskmap: make(map[u.Key]*Task), - } -} - -type Task struct { - Key u.Key - Target peer.Peer - theirPriority int -} - -// Push currently adds a new task to the end of the list -// TODO: make this into a priority queue -func (tl *TaskList) Push(block u.Key, priority int, to peer.Peer) { - if task, ok := tl.taskmap[to.Key()+block]; ok { - // TODO: when priority queue is implemented, - // rearrange this Task - task.theirPriority = priority - return - } - task := &Task{ - Key: block, - Target: to, - theirPriority: priority, - } - tl.tasks = append(tl.tasks, task) - tl.taskmap[to.Key()+block] = task -} - -// Pop returns the next task to be performed by bitswap the task is then -// removed from the list -func (tl *TaskList) Pop() *Task { - var out *Task - for len(tl.tasks) > 0 { - // TODO: instead of zero, use exponential distribution - // it will help reduce the chance of receiving - // the same block from multiple peers - out = tl.tasks[0] - tl.tasks = tl.tasks[1:] - delete(tl.taskmap, out.Target.Key()+out.Key) - // Filter out blocks that have been cancelled - if out.theirPriority >= 0 { - break - } - } - - return out -} - -// Cancel lazily cancels the sending of a block to a given peer -func (tl *TaskList) Cancel(k u.Key, p peer.Peer) { - t, ok := tl.taskmap[p.Key()+k] - if ok { - t.theirPriority = -1 - } -} diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go index e20bb4457468..aa58ee155f5b 100644 --- a/exchange/bitswap/wantlist/wantlist.go +++ b/exchange/bitswap/wantlist/wantlist.go @@ -6,71 +6,103 @@ import ( "sync" ) +type ThreadSafe struct { + lk sync.RWMutex + Wantlist +} + +// not threadsafe type Wantlist struct { - lk sync.RWMutex - set map[u.Key]*Entry + set map[u.Key]Entry +} + +type Entry struct { + // TODO consider making entries immutable so they can be shared safely and + // slices can be copied efficiently. + Key u.Key + Priority int +} + +type entrySlice []Entry + +func (es entrySlice) Len() int { return len(es) } +func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } +func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priority } + +func NewThreadSafe() *ThreadSafe { + return &ThreadSafe{ + Wantlist: *New(), + } } func New() *Wantlist { return &Wantlist{ - set: make(map[u.Key]*Entry), + set: make(map[u.Key]Entry), } } -type Entry struct { - Value u.Key - Priority int +func (w *ThreadSafe) Add(k u.Key, priority int) { + // TODO rm defer for perf + w.lk.Lock() + defer w.lk.Unlock() + w.Wantlist.Add(k, priority) } -func (w *Wantlist) Add(k u.Key, priority int) { +func (w *ThreadSafe) Remove(k u.Key) { + // TODO rm defer for perf w.lk.Lock() defer w.lk.Unlock() + w.Wantlist.Remove(k) +} + +func (w *ThreadSafe) Contains(k u.Key) bool { + // TODO rm defer for perf + w.lk.RLock() + defer w.lk.RUnlock() + return w.Wantlist.Contains(k) +} + +func (w *ThreadSafe) Entries() []Entry { + w.lk.RLock() + defer w.lk.RUnlock() + return w.Wantlist.Entries() +} + +func (w *ThreadSafe) SortedEntries() []Entry { + w.lk.RLock() + defer w.lk.RUnlock() + return w.Wantlist.SortedEntries() +} + +func (w *Wantlist) Add(k u.Key, priority int) { if _, ok := w.set[k]; ok { return } - w.set[k] = &Entry{ - Value: k, + w.set[k] = Entry{ + Key: k, Priority: priority, } } func (w *Wantlist) Remove(k u.Key) { - w.lk.Lock() - defer w.lk.Unlock() delete(w.set, k) } func (w *Wantlist) Contains(k u.Key) bool { - w.lk.RLock() - defer w.lk.RUnlock() _, ok := w.set[k] return ok } -type entrySlice []*Entry - -func (es entrySlice) Len() int { return len(es) } -func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } -func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priority } - -func (w *Wantlist) Entries() []*Entry { - w.lk.RLock() - defer w.lk.RUnlock() - +func (w *Wantlist) Entries() []Entry { var es entrySlice - for _, e := range w.set { es = append(es, e) } - sort.Sort(es) return es } -func (w *Wantlist) SortedEntries() []*Entry { - w.lk.RLock() - defer w.lk.RUnlock() +func (w *Wantlist) SortedEntries() []Entry { var es entrySlice - for _, e := range w.set { es = append(es, e) } diff --git a/util/peerset/peerset.go b/util/peerset/peerset.go index e969b4a4b4ea..9b132b2352c3 100644 --- a/util/peerset/peerset.go +++ b/util/peerset/peerset.go @@ -7,7 +7,7 @@ import ( // PeerSet is a threadsafe set of peers type PeerSet struct { - ps map[string]bool + ps map[string]bool // FIXME can be map[string]struct{} lk sync.RWMutex size int } @@ -22,7 +22,7 @@ func NewPeerSet() *PeerSet { func NewLimitedPeerSet(size int) *PeerSet { ps := new(PeerSet) ps.ps = make(map[string]bool) - ps.size = -1 + ps.size = size return ps }