diff --git a/bitswap.go b/bitswap.go index b3e472d2..cfaee4a3 100644 --- a/bitswap.go +++ b/bitswap.go @@ -307,7 +307,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks return out, nil } -// CancelWant removes a given key from the wantlist +// CancelWant removes a given key from the wantlist. func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) { if len(cids) == 0 { return @@ -363,7 +363,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { return nil } -// SessionsForBlock returns a slice of all sessions that may be interested in the given cid +// SessionsForBlock returns a slice of all sessions that may be interested in the given cid. func (bs *Bitswap) SessionsForBlock(c cid.Cid) []*Session { var out []*Session bs.sm.IterateSessions(func(session exchange.Fetcher) { @@ -442,14 +442,14 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) { } } -// Connected/Disconnected warns bitswap about peer connections +// Connected/Disconnected warns bitswap about peer connections. func (bs *Bitswap) PeerConnected(p peer.ID) { initialWants := bs.wm.CurrentBroadcastWants() bs.pm.Connected(p, initialWants) bs.engine.PeerConnected(p) } -// Connected/Disconnected warns bitswap about peer connections +// Connected/Disconnected warns bitswap about peer connections. func (bs *Bitswap) PeerDisconnected(p peer.ID) { bs.pm.Disconnected(p) bs.engine.PeerDisconnected(p) diff --git a/decision/engine.go b/decision/engine.go index 90155a1d..384c7c69 100644 --- a/decision/engine.go +++ b/decision/engine.go @@ -56,12 +56,12 @@ const ( maxMessageSize = 512 * 1024 ) -// Envelope contains a message for a Peer +// Envelope contains a message for a Peer. type Envelope struct { - // Peer is the intended recipient + // Peer is the intended recipient. Peer peer.ID - // Message is the payload + // Message is the payload. Message bsmsg.BitSwapMessage // A callback to notify the decision queue that the task is complete @@ -206,7 +206,7 @@ func (e *Engine) Outbox() <-chan (<-chan *Envelope) { return e.outbox } -// Returns a slice of Peers with whom the local node has active sessions +// Peers returns a slice of Peers with whom the local node has active sessions. func (e *Engine) Peers() []peer.ID { e.lock.Lock() defer e.lock.Unlock() diff --git a/decision/peer_request_queue.go b/decision/peer_request_queue.go index c02329fc..c7aaf553 100644 --- a/decision/peer_request_queue.go +++ b/decision/peer_request_queue.go @@ -45,7 +45,7 @@ type prq struct { frozen map[peer.ID]*activePartner } -// Push currently adds a new peerRequestTask to the end of the list +// Push currently adds a new peerRequestTask to the end of the list. func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) { tl.lock.Lock() defer tl.lock.Unlock() @@ -140,7 +140,7 @@ func (tl *prq) Pop() *peerRequestTask { return out } -// Remove removes a task from the queue +// Remove removes a task from the queue. func (tl *prq) Remove(k cid.Cid, p peer.ID) { tl.lock.Lock() t, ok := tl.taskMap[taskEntryKey{p, k}] @@ -210,12 +210,12 @@ type peerRequestTask struct { index int // book-keeping field used by the pq container } -// Index implements pq.Elem +// Index implements pq.Elem. func (t *peerRequestTask) Index() int { return t.index } -// SetIndex implements pq.Elem +// SetIndex implements pq.Elem. func (t *peerRequestTask) SetIndex(i int) { t.index = i } @@ -307,7 +307,7 @@ func partnerCompare(a, b pq.Elem) bool { return pa.active < pb.active } -// StartTask signals that a task was started for this partner +// StartTask signals that a task was started for this partner. func (p *activePartner) StartTask(k cid.Cid) { p.activelk.Lock() p.activeBlocks.Add(k) @@ -315,7 +315,7 @@ func (p *activePartner) StartTask(k cid.Cid) { p.activelk.Unlock() } -// TaskDone signals that a task was completed for this partner +// TaskDone signals that a task was completed for this partner. func (p *activePartner) TaskDone(k cid.Cid) { p.activelk.Lock() p.activeBlocks.Remove(k) @@ -326,12 +326,12 @@ func (p *activePartner) TaskDone(k cid.Cid) { p.activelk.Unlock() } -// Index implements pq.Elem +// Index implements pq.Elem. func (p *activePartner) Index() int { return p.index } -// SetIndex implements pq.Elem +// SetIndex implements pq.Elem. func (p *activePartner) SetIndex(i int) { p.index = i } diff --git a/message/message.go b/message/message.go index 3289507d..2b538a2f 100644 --- a/message/message.go +++ b/message/message.go @@ -21,7 +21,7 @@ type BitSwapMessage interface { // the sender. Wantlist() []Entry - // Blocks returns a slice of unique blocks + // Blocks returns a slice of unique blocks. Blocks() []blocks.Block // AddEntry adds an entry to the Wantlist. diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index bed0cd55..294bad19 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -15,13 +15,13 @@ import ( var log = logging.Logger("bitswap") // MessageNetwork is any network that can connect peers and generate a message -// sender +// sender. type MessageNetwork interface { ConnectTo(context.Context, peer.ID) error NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) } -// MessageQueue implements queuee of want messages to send to peers +// MessageQueue implements queue of want messages to send to peers. type MessageQueue struct { p peer.ID @@ -38,7 +38,7 @@ type MessageQueue struct { done chan struct{} } -// New creats a new MessageQueues +// New creats a new MessageQueue. func New(p peer.ID, network MessageNetwork) *MessageQueue { return &MessageQueue{ done: make(chan struct{}), @@ -50,19 +50,19 @@ func New(p peer.ID, network MessageNetwork) *MessageQueue { } } -// RefIncrement increments the refcount for a message queue +// RefIncrement increments the refcount for a message queue. func (mq *MessageQueue) RefIncrement() { mq.refcnt++ } // RefDecrement decrements the refcount for a message queue and returns true -// if the refcount is now 0 +// if the refcount is now 0. func (mq *MessageQueue) RefDecrement() bool { mq.refcnt-- return mq.refcnt > 0 } -// AddMessage adds new entries to an outgoing message for a given session +// AddMessage adds new entries to an outgoing message for a given session. func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) { if !mq.addEntries(entries, ses) { return @@ -74,7 +74,7 @@ func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) { } // Startup starts the processing of messages, and creates an initial message -// based on the given initial wantlist +// based on the given initial wantlist. func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) { // new peer, we will want to give them our full wantlist @@ -93,7 +93,7 @@ func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist. } -// Shutdown stops the processing of messages for a message queue +// Shutdown stops the processing of messages for a message queue. func (mq *MessageQueue) Shutdown() { close(mq.done) } diff --git a/network/interface.go b/network/interface.go index 6c325b1c..2d2c9b19 100644 --- a/network/interface.go +++ b/network/interface.go @@ -19,7 +19,7 @@ var ( ProtocolBitswap protocol.ID = "/ipfs/bitswap/1.1.0" ) -// BitSwapNetwork provides network connectivity for BitSwap sessions +// BitSwapNetwork provides network connectivity for BitSwap sessions. type BitSwapNetwork interface { // SendMessage sends a BitSwap message to a peer. @@ -49,7 +49,7 @@ type MessageSender interface { Reset() error } -// Implement Receiver to receive messages from the BitSwapNetwork +// Implement Receiver to receive messages from the BitSwapNetwork. type Receiver interface { ReceiveMessage( ctx context.Context, @@ -58,16 +58,16 @@ type Receiver interface { ReceiveError(error) - // Connected/Disconnected warns bitswap about peer connections + // Connected/Disconnected warns bitswap about peer connections. PeerConnected(peer.ID) PeerDisconnected(peer.ID) } type Routing interface { - // FindProvidersAsync returns a channel of providers for the given key + // FindProvidersAsync returns a channel of providers for the given key. FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID - // Provide provides the key to the network + // Provide provides the key to the network. Provide(context.Context, cid.Cid) error } diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index f6c04e35..da2a4b4c 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -26,7 +26,7 @@ var log = logging.Logger("bitswap_network") var sendMessageTimeout = time.Minute * 10 -// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host +// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host. func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork { bitswapNetwork := impl{ host: host, @@ -149,7 +149,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { return bsnet.host.Connect(ctx, pstore.PeerInfo{ID: p}) } -// FindProvidersAsync returns a channel of providers for the given key +// FindProvidersAsync returns a channel of providers for the given key. func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { // Since routing queries are expensive, give bitswap the peers to which we diff --git a/peermanager/peermanager.go b/peermanager/peermanager.go index 379fd4bd..30145cc5 100644 --- a/peermanager/peermanager.go +++ b/peermanager/peermanager.go @@ -16,7 +16,7 @@ var ( metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} ) -// PeerQueue provides a queer of messages to be sent for a single peer +// PeerQueue provides a queer of messages to be sent for a single peer. type PeerQueue interface { RefIncrement() RefDecrement() bool @@ -25,14 +25,14 @@ type PeerQueue interface { Shutdown() } -// PeerQueueFactory provides a function that will create a PeerQueue +// PeerQueueFactory provides a function that will create a PeerQueue. type PeerQueueFactory func(p peer.ID) PeerQueue type peerMessage interface { handle(pm *PeerManager) } -// PeerManager manages a pool of peers and sends messages to peers in the pool +// PeerManager manages a pool of peers and sends messages to peers in the pool. type PeerManager struct { // sync channel for Run loop peerMessages chan peerMessage @@ -45,7 +45,7 @@ type PeerManager struct { cancel func() } -// New creates a new PeerManager, given a context and a peerQueueFactory +// New creates a new PeerManager, given a context and a peerQueueFactory. func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { ctx, cancel := context.WithCancel(ctx) return &PeerManager{ @@ -57,7 +57,7 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { } } -// ConnectedPeers returns a list of peers this PeerManager is managing +// ConnectedPeers returns a list of peers this PeerManager is managing. func (pm *PeerManager) ConnectedPeers() []peer.ID { resp := make(chan []peer.ID) pm.peerMessages <- &getPeersMessage{resp} @@ -65,7 +65,7 @@ func (pm *PeerManager) ConnectedPeers() []peer.ID { } // Connected is called to add a new peer to the pool, and send it an initial set -// of wants +// of wants. func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) { select { case pm.peerMessages <- &connectPeerMessage{p, initialEntries}: @@ -73,7 +73,7 @@ func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) { } } -// Disconnected is called to remove a peer from the pool +// Disconnected is called to remove a peer from the pool. func (pm *PeerManager) Disconnected(p peer.ID) { select { case pm.peerMessages <- &disconnectPeerMessage{p}: @@ -81,8 +81,8 @@ func (pm *PeerManager) Disconnected(p peer.ID) { } } -// SendMessage is called to send a message to all or some peers in the pool -// if targets is nil, it sends to all +// SendMessage is called to send a message to all or some peers in the pool; +// if targets is nil, it sends to all. func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) { select { case pm.peerMessages <- &sendPeerMessage{entries: entries, targets: targets, from: from}: @@ -91,12 +91,12 @@ func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, fr } // Startup enables the run loop for the PeerManager - no processing will occur -// if startup is not called +// if startup is not called. func (pm *PeerManager) Startup() { go pm.run() } -// Shutdown shutsdown processing for the PeerManager +// Shutdown shutsdown processing for the PeerManager. func (pm *PeerManager) Shutdown() { pm.cancel() } diff --git a/session.go b/session.go index cd5f645a..39748e40 100644 --- a/session.go +++ b/session.go @@ -20,7 +20,7 @@ const activeWantsLimit = 16 // Session holds state for an individual bitswap transfer operation. // This allows bitswap to make smarter decisions about who to send wantlist -// info to, and who to request blocks from +// info to, and who to request blocks from. type Session struct { ctx context.Context tofetch *cidQueue @@ -51,7 +51,7 @@ type Session struct { } // NewSession creates a new bitswap session whose lifetime is bounded by the -// given context +// given context. func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { s := &Session{ activePeers: make(map[peer.ID]struct{}), @@ -302,7 +302,7 @@ func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks. return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants) } -// GetBlock fetches a single block +// GetBlock fetches a single block. func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) { return getBlock(parent, k, s.GetBlocks) } diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index 1ebee2fd..e0e8dec4 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -48,7 +48,7 @@ func (sm *SessionManager) GetNextSessionID() uint64 { type IterateSessionFunc func(session exchange.Fetcher) // IterateSessions loops through all managed sessions and applies the given -// IterateSessionFunc +// IterateSessionFunc. func (sm *SessionManager) IterateSessions(iterate IterateSessionFunc) { sm.sessLk.Lock() defer sm.sessLk.Unlock() diff --git a/testnet/internet_latency_delay_generator.go b/testnet/internet_latency_delay_generator.go index d1fd3ae1..25b9f5b8 100644 --- a/testnet/internet_latency_delay_generator.go +++ b/testnet/internet_latency_delay_generator.go @@ -10,7 +10,7 @@ import ( var sharedRNG = rand.New(rand.NewSource(time.Now().UnixNano())) // InternetLatencyDelayGenerator generates three clusters of delays, -// typical of the type of peers you would encounter on the interenet +// typical of the type of peers you would encounter on the interenet. // Given a base delay time T, the wait time generated will be either: // 1. A normalized distribution around the base time // 2. A normalized distribution around the base time plus a "medium" delay @@ -18,9 +18,9 @@ var sharedRNG = rand.New(rand.NewSource(time.Now().UnixNano())) // The size of the medium & large delays are determined when the generator // is constructed, as well as the relative percentages with which delays fall // into each of the three different clusters, and the standard deviation for -// the normalized distribution +// the normalized distribution. // This can be used to generate a number of scenarios typical of latency -// distribution among peers on the internet +// distribution among peers on the internet. func InternetLatencyDelayGenerator( mediumDelay time.Duration, largeDelay time.Duration, diff --git a/testnet/virtual.go b/testnet/virtual.go index 7d192117..d5a77494 100644 --- a/testnet/virtual.go +++ b/testnet/virtual.go @@ -146,7 +146,7 @@ func (nc *networkClient) Stats() bsnet.NetworkStats { } } -// FindProvidersAsync returns a channel of providers for the given key +// FindProvidersAsync returns a channel of providers for the given key. func (nc *networkClient) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { // NB: this function duplicates the PeerInfo -> ID transformation in the @@ -200,7 +200,7 @@ func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet. }, nil } -// Provide provides the key to the network +// Provide provides the key to the network. func (nc *networkClient) Provide(ctx context.Context, k cid.Cid) error { return nc.routing.Provide(ctx, k, true) } diff --git a/testutil/testutil.go b/testutil/testutil.go index f768f40d..9cfb3891 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -11,7 +11,7 @@ import ( var blockGenerator = blocksutil.NewBlockGenerator() var prioritySeq int -// GenerateCids produces n content identifiers +// GenerateCids produces n content identifiers. func GenerateCids(n int) []cid.Cid { cids := make([]cid.Cid, 0, n) for i := 0; i < n; i++ { @@ -21,7 +21,7 @@ func GenerateCids(n int) []cid.Cid { return cids } -// GenerateWantlist makes a populated wantlist +// GenerateWantlist makes a populated wantlist. func GenerateWantlist(n int, ses uint64) *wantlist.ThreadSafe { wl := wantlist.NewThreadSafe() for i := 0; i < n; i++ { @@ -32,7 +32,7 @@ func GenerateWantlist(n int, ses uint64) *wantlist.ThreadSafe { return wl } -// GenerateMessageEntries makes fake bitswap message entries +// GenerateMessageEntries makes fake bitswap message entries. func GenerateMessageEntries(n int, isCancel bool) []*bsmsg.Entry { bsmsgs := make([]*bsmsg.Entry, 0, n) for i := 0; i < n; i++ { @@ -48,7 +48,7 @@ func GenerateMessageEntries(n int, isCancel bool) []*bsmsg.Entry { var peerSeq int -// GeneratePeers creates n peer ids +// GeneratePeers creates n peer ids. func GeneratePeers(n int) []peer.ID { peerIds := make([]peer.ID, 0, n) for i := 0; i < n; i++ { @@ -61,13 +61,13 @@ func GeneratePeers(n int) []peer.ID { var nextSession uint64 -// GenerateSessionID make a unit session identifier +// GenerateSessionID make a unit session identifier. func GenerateSessionID() uint64 { nextSession++ return uint64(nextSession) } -// ContainsPeer returns true if a peer is found n a list of peers +// ContainsPeer returns true if a peer is found n a list of peers. func ContainsPeer(peers []peer.ID, p peer.ID) bool { for _, n := range peers { if p == n { diff --git a/wantlist/wantlist.go b/wantlist/wantlist.go index 83130072..947c964d 100644 --- a/wantlist/wantlist.go +++ b/wantlist/wantlist.go @@ -28,7 +28,7 @@ type Entry struct { Trash bool } -// NewRefEntry creates a new reference tracked wantlist entry +// NewRefEntry creates a new reference tracked wantlist entry. func NewRefEntry(c cid.Cid, p int) *Entry { return &Entry{ Cid: c, @@ -59,10 +59,10 @@ func New() *Wantlist { // by the session ID 'ses'. if a cid is added under multiple session IDs, then // it must be removed by each of those sessions before it is no longer 'in the // wantlist'. Calls to Add are idempotent given the same arguments. Subsequent -// calls with different values for priority will not update the priority +// calls with different values for priority will not update the priority. // TODO: think through priority changes here // Add returns true if the cid did not exist in the wantlist before this call -// (even if it was under a different session) +// (even if it was under a different session). func (w *ThreadSafe) Add(c cid.Cid, priority int, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() @@ -114,7 +114,7 @@ func (w *ThreadSafe) Remove(c cid.Cid, ses uint64) bool { } // Contains returns true if the given cid is in the wantlist tracked by one or -// more sessions +// more sessions. func (w *ThreadSafe) Contains(k cid.Cid) (*Entry, bool) { w.lk.RLock() defer w.lk.RUnlock() diff --git a/wantmanager/wantmanager.go b/wantmanager/wantmanager.go index 3dcff166..bf14ea71 100644 --- a/wantmanager/wantmanager.go +++ b/wantmanager/wantmanager.go @@ -21,7 +21,7 @@ const ( ) // WantSender sends changes out to the network as they get added to the wantlist -// managed by the WantManager +// managed by the WantManager. type WantSender interface { SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) } @@ -32,7 +32,7 @@ type wantMessage interface { // WantManager manages a global want list. It tracks two seperate want lists - // one for all wants, and one for wants that are specifically broadcast to the -// internet +// internet. type WantManager struct { // channel requests to the run loop // to get predictable behavior while running this in a go routine @@ -50,7 +50,7 @@ type WantManager struct { wantlistGauge metrics.Gauge } -// New initializes a new WantManager for a given context +// New initializes a new WantManager for a given context. func New(ctx context.Context) *WantManager { ctx, cancel := context.WithCancel(ctx) wantlistGauge := metrics.NewCtx(ctx, "wantlist_total", @@ -65,56 +65,56 @@ func New(ctx context.Context) *WantManager { } } -// SetDelegate specifies who will send want changes out to the internet +// SetDelegate specifies who will send want changes out to the internet. func (wm *WantManager) SetDelegate(wantSender WantSender) { wm.wantSender = wantSender } -// WantBlocks adds the given cids to the wantlist, tracked by the given session +// WantBlocks adds the given cids to the wantlist, tracked by the given session. func (wm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { log.Infof("want blocks: %s", ks) wm.addEntries(ctx, ks, peers, false, ses) } -// CancelWants removes the given cids from the wantlist, tracked by the given session +// CancelWants removes the given cids from the wantlist, tracked by the given session. func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { wm.addEntries(context.Background(), ks, peers, true, ses) } -// IsWanted returns whether a CID is currently wanted +// IsWanted returns whether a CID is currently wanted. func (wm *WantManager) IsWanted(c cid.Cid) bool { resp := make(chan bool) wm.wantMessages <- &isWantedMessage{c, resp} return <-resp } -// CurrentWants returns the list of current wants +// CurrentWants returns the list of current wants. func (wm *WantManager) CurrentWants() []*wantlist.Entry { resp := make(chan []*wantlist.Entry) wm.wantMessages <- ¤tWantsMessage{resp} return <-resp } -// CurrentBroadcastWants returns the current list of wants that are broadcasts +// CurrentBroadcastWants returns the current list of wants that are broadcasts. func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry { resp := make(chan []*wantlist.Entry) wm.wantMessages <- ¤tBroadcastWantsMessage{resp} return <-resp } -// WantCount returns the total count of wants +// WantCount returns the total count of wants. func (wm *WantManager) WantCount() int { resp := make(chan int) wm.wantMessages <- &wantCountMessage{resp} return <-resp } -// Startup starts processing for the WantManager +// Startup starts processing for the WantManager. func (wm *WantManager) Startup() { go wm.run() } -// Shutdown ends processing for the want manager +// Shutdown ends processing for the want manager. func (wm *WantManager) Shutdown() { wm.cancel() }