Skip to content

Commit

Permalink
Address review items:
Browse files Browse the repository at this point in the history
* Rebase btcsuite#499
* Fix tests by removing hardcoded useragent, protocol version

Moved flag fields from stats to Peer

Address review items:

* Remove defer in simple case
* Start err text with lower case
* Fixed data race in TestOutboundPeer
* Declare isValidBIP0111 before use

Remove addrMgr in example_test.go

Address review items:

* Use services flag 0 in example
* Remove defer in Add/Remove listeners
* Remove todo for alerts

Refactor to replace add/remove listeners with funcs

Use bytes.Buffer writer in log tests.

This prevents undesired log output to stdout while running the tests.

Move a couple of funcs for consistent ordering.

Use 0 for services in tests since not full nodes.

peer: generate and use unique nonce at package level.

This changes the logic around the nonce to make it handled by the
package rather than the caller.  This makes sense because the real
access pattern is not the same as what the tests need to do and it
simplifies the logic for the caller.

In particular, all peers that are created by this package, whether they
are inbound or outbound need to have the same "factory" nonce under any
realistic scenario outside of tests.  This is because the entire point
of the nonce is to be used to detect self connections.

This change also means the example had to be changed because it wasn't
really representative of the proper way a caller will interact with this
package due to requiring different nonces.  As a result, the example was
simplified to only showcase an outbound peer.

Don't log or send reject message due to disconnect.

Remove trace logs that aren't particularly helpful.

Allow the peer.Config.NewestBlock function to be nil.

lookupFunc not used in example_test.go - move it

Finish TODO to switch queueWg to queueQuit chan.

Remove duplicate negotiate print.

peer: Unique nonce per peer.

Rather than only generating a single nonce at init time, generate a
unique nonce for each pushed version message and keep track of them in a
limited and concurrent safe most-recently-used nonce map so the self
connection detection still works.

Remove unused mruinvmap bits from btcd main dir.

Access to fields in trace stmt need mutex.

Move comment about peer addr inside function.

Callers don't need to know about this.

Don't convert fields in state snapshot in peer.

The caller might want to do things with this information and
pre-formatting it for the RPC server would prevent that.  Instead,
return the fields using their native types in the snapshot and then
convert them in the RPC server as needed.

Explicitly comment concurrent safe funcs.

Also use a style consistent with the rest of the code base.

Add some more details in a few comments.

Make peer.Config.Listeners concrete instead of pointer.

This removes the need for checking against nil everywhere and still
allows is not to be specified since the zero value will contain all nil
pointers for the callbacks.

Rename listeners and remove On{Read,Write}.

Bring back OnRead/OnWrite with better comments.

Use new listener names in example.

Update server for new listeners

Fix build issues with go vet

Take chain params in config instead of net + regtest flag.

Set to testnet by default if not specified.

Make copy of peer config so caller can't modify after.

Use  global request map to avoid duplicate getdata

Address review items:

* Re-order peer entry in docs
* Move PingTime in assignment
* Cleanup peer tests

Restore TestPeerConnection

Added peer listener tests

Reuse peer config

Restore done peers

Make the Config.ProtocolVersion comments more explicit.

Consistent case in trace statement.

Improve comments on peer.Config fields.

Also reorder them to more naturally group their function.

Add handler for MsgMerkleBlock.

Various cleanup.

- Improve and correct some comments
- Correct a log print
- Add log print for disconnect due to not receiving initial version msg
- Code consistency with spacing and comments

Remove outdated docs
  • Loading branch information
tuxcanfly committed Oct 2, 2015
1 parent 3efd23a commit febd15d
Show file tree
Hide file tree
Showing 13 changed files with 1,273 additions and 1,546 deletions.
83 changes: 55 additions & 28 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,16 @@ func (p *peerInfo) relayTxDisabled() bool {
// It is safe for concurrent access.
func (p *peerInfo) addKnownAddress(addr string) {
p.knownAddrMutex.Lock()
defer p.knownAddrMutex.Unlock()

p.knownAddresses[addr] = struct{}{}
p.knownAddrMutex.Unlock()
}

// addressKnown returns true if the given address is known to the peer.
// It is safe for concurrent access.
func (p *peerInfo) addressKnown(addr string) bool {
p.knownAddrMutex.Lock()
defer p.knownAddrMutex.Unlock()

_, exists := p.knownAddresses[addr]
p.knownAddrMutex.Unlock()
return exists
}

Expand All @@ -117,7 +115,7 @@ type headersMsg struct {

// donePeerMsg signifies a newly disconnected peer to the block handler.
type donePeerMsg struct {
peer *peer.Peer
pInfo *peerInfo
}

// txMsg packages a bitcoin tx message and the peer it came from together
Expand Down Expand Up @@ -241,6 +239,8 @@ type blockManager struct {
started int32
shutdown int32
blockChain *blockchain.BlockChain
requestedTxns map[wire.ShaHash]struct{}
requestedBlocks map[wire.ShaHash]struct{}
progressLogger *blockProgressLogger
receivedLogBlocks int64
receivedLogTx int64
Expand Down Expand Up @@ -286,7 +286,7 @@ func (b *blockManager) peerInfo(p *peer.Peer) (*peerInfo, error) {

pInfo, ok := b.peers[p]
if !ok {
return nil, fmt.Errorf("Missing peer info for %s", p)
return nil, fmt.Errorf("missing peer info for %s", p)
}
return pInfo, nil
}
Expand Down Expand Up @@ -482,21 +482,32 @@ func (b *blockManager) handleNewPeerMsg(peers *list.List, p *peer.Peer) {
// removes the peer as a candidate for syncing and in the case where it was
// the current sync peer, attempts to select a new best peer to sync from. It
// is invoked from the syncHandler goroutine.
func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer.Peer) {
func (b *blockManager) handleDonePeerMsg(peers *list.List, dmsg *donePeerMsg) {
p := dmsg.pInfo.Peer

// Remove the peer from the list of candidate peers.
for e := peers.Front(); e != nil; e = e.Next() {
if e.Value == p {
peers.Remove(e)
break
}
}

bmgrLog.Infof("Lost peer %s", p)

// Remove peer from the global peer map so that requested tx and blocks
// will be fetched from elsewhere next time.
b.peersMtx.Lock()
delete(b.peers, p)
b.peersMtx.Unlock()
// Remove requested transactions from the global map so that they will
// be fetched from elsewhere next time we get an inv.
for k := range dmsg.pInfo.requestedTxns {
delete(b.requestedTxns, k)
}

// Remove requested blocks from the global map so that they will be
// fetched from elsewhere next time we get an inv.
// TODO(oga) we could possibly here check which peers have these blocks
// and request them now to speed things up a little.
for k := range dmsg.pInfo.requestedBlocks {
delete(b.requestedBlocks, k)
}

// Attempt to find a new peer to sync from if the quitting peer is the
// sync peer. Also, reset the headers-first state if in headers-first
Expand Down Expand Up @@ -542,6 +553,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// we'll retry next time we get an inv.
txHash := tmsg.tx.Sha()
delete(tmsg.pInfo.requestedTxns, *txHash)
delete(b.requestedTxns, *txHash)

if err != nil {
// When the error is a rule error, it means the transaction was
Expand Down Expand Up @@ -632,10 +644,11 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
}
}

// Remove block from request map. Either chain will know about it and
// Remove block from request maps. Either chain will know about it and
// so we shouldn't have any more instances of trying to fetch it, or we
// will fail the insert and thus we'll retry next time we get an inv.
delete(bmsg.pInfo.requestedBlocks, *blockSha)
delete(b.requestedBlocks, *blockSha)

// Process the block to include validation, best chain selection, orphan
// handling, etc.
Expand Down Expand Up @@ -751,10 +764,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// request more blocks using the header list when the request queue is
// getting short.
if !isCheckpointBlock {
peerRequestedBlocks := len(bmsg.pInfo.requestedBlocks)
if b.startHeader != nil &&
peerRequestedBlocks < minInFlightBlocks {
b.fetchHeaderBlocks(bmsg.pInfo)
len(bmsg.pInfo.requestedBlocks) < minInFlightBlocks {
b.fetchHeaderBlocks()
}
return
}
Expand Down Expand Up @@ -797,7 +809,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {

// fetchHeaderBlocks creates and sends a request to the syncPeer for the next
// list of blocks to be downloaded based on the current list of headers.
func (b *blockManager) fetchHeaderBlocks(pInfo *peerInfo) {
func (b *blockManager) fetchHeaderBlocks() {
// Nothing to do if there is no start header.
if b.startHeader == nil {
bmgrLog.Warnf("fetchHeaderBlocks called with no start header")
Expand All @@ -824,6 +836,12 @@ func (b *blockManager) fetchHeaderBlocks(pInfo *peerInfo) {
"fetch: %v", err)
}
if !haveInv {
b.requestedBlocks[*node.sha] = struct{}{}
pInfo, err := b.peerInfo(b.syncPeer)
if err != nil {
bmgrLog.Errorf("%v", err)
continue
}
pInfo.requestedBlocks[*node.sha] = struct{}{}
gdmsg.AddInvVect(iv)
numRequested++
Expand Down Expand Up @@ -922,7 +940,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
bmgrLog.Infof("Received %v block headers: Fetching blocks",
b.headerList.Len())
b.progressLogger.SetLastLogTime(time.Now())
b.fetchHeaderBlocks(hmsg.pInfo)
b.fetchHeaderBlocks()
return
}

Expand Down Expand Up @@ -1101,7 +1119,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
case wire.InvTypeBlock:
// Request the block if there is not already a pending
// request.
if _, exists := imsg.pInfo.requestedBlocks[iv.Hash]; !exists {
if _, exists := b.requestedBlocks[iv.Hash]; !exists {
b.requestedBlocks[iv.Hash] = struct{}{}
imsg.pInfo.requestedBlocks[iv.Hash] = struct{}{}
gdmsg.AddInvVect(iv)
numRequested++
Expand All @@ -1110,7 +1129,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
case wire.InvTypeTx:
// Request the transaction if there is not already a
// pending request.
if _, exists := imsg.pInfo.requestedTxns[iv.Hash]; !exists {
if _, exists := b.requestedTxns[iv.Hash]; !exists {
b.requestedTxns[iv.Hash] = struct{}{}
imsg.pInfo.requestedTxns[iv.Hash] = struct{}{}
gdmsg.AddInvVect(iv)
numRequested++
Expand Down Expand Up @@ -1165,7 +1185,7 @@ out:
b.handleHeadersMsg(msg)

case *donePeerMsg:
b.handleDonePeerMsg(candidatePeers, msg.peer)
b.handleDonePeerMsg(candidatePeers, msg)

case getSyncPeerMsg:
msg.reply <- b.syncPeer
Expand Down Expand Up @@ -1426,12 +1446,17 @@ func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, p *peer.Peer) {

// DonePeer informs the blockmanager that a peer has disconnected.
func (b *blockManager) DonePeer(p *peer.Peer) {
pInfo, err := b.peerInfo(p)
if err != nil {
bmgrLog.Errorf("%v", err)
return
}
// Ignore if we are shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}

b.msgChan <- &donePeerMsg{peer: p}
b.msgChan <- &donePeerMsg{pInfo: pInfo}
}

// Start begins the core block handler which processes block and inv messages.
Expand Down Expand Up @@ -1536,12 +1561,14 @@ func newBlockManager(s *server) (*blockManager, error) {
}

bm := blockManager{
server: s,
progressLogger: newBlockProgressLogger("Processed", bmgrLog),
msgChan: make(chan interface{}, cfg.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
peers: make(map[*peer.Peer]*peerInfo),
server: s,
requestedTxns: make(map[wire.ShaHash]struct{}),
requestedBlocks: make(map[wire.ShaHash]struct{}),
progressLogger: newBlockProgressLogger("Processed", bmgrLog),
msgChan: make(chan interface{}, cfg.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
peers: make(map[*peer.Peer]*peerInfo),
}
bm.progressLogger = newBlockProgressLogger("Processed", bmgrLog)
bm.blockChain = blockchain.New(s.db, s.chainParams, bm.handleNotifyMsg)
Expand Down
4 changes: 2 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ information.
for the underlying JSON-RPC command and return values
* [wire](https://github.com/btcsuite/btcd/tree/master/wire) - Implements the
Bitcoin wire protocol
* [peer](https://github.com/btcsuite/btcd/tree/master/peer) -
Provides a common base for creating and managing Bitcoin network peers.
* [blockchain](https://github.com/btcsuite/btcd/tree/master/blockchain) -
Implements Bitcoin block handling and chain selection rules
* [txscript](https://github.com/btcsuite/btcd/tree/master/txscript) -
Expand All @@ -214,5 +216,3 @@ information.
Provides a database interface for the Bitcoin block chain
* [btcutil](https://github.com/btcsuite/btcutil) - Provides Bitcoin-specific
convenience functions and types
* [peer](https://github.com/btcsuite/btcd/tree/master/peer) -
Provides a common base for creating and managing Bitcoin network peers.
108 changes: 0 additions & 108 deletions mruinvmap.go

This file was deleted.

Loading

0 comments on commit febd15d

Please sign in to comment.