diff --git a/blockmanager.go b/blockmanager.go index eb4cdad8d53..34fa74de915 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -79,18 +79,16 @@ func (p *peerInfo) relayTxDisabled() bool { // It is safe for concurrent access. func (p *peerInfo) addKnownAddress(addr string) { p.knownAddrMutex.Lock() - defer p.knownAddrMutex.Unlock() - p.knownAddresses[addr] = struct{}{} + p.knownAddrMutex.Unlock() } // addressKnown returns true if the given address is known to the peer. // It is safe for concurrent access. func (p *peerInfo) addressKnown(addr string) bool { p.knownAddrMutex.Lock() - defer p.knownAddrMutex.Unlock() - _, exists := p.knownAddresses[addr] + p.knownAddrMutex.Unlock() return exists } @@ -117,7 +115,7 @@ type headersMsg struct { // donePeerMsg signifies a newly disconnected peer to the block handler. type donePeerMsg struct { - peer *peer.Peer + pInfo *peerInfo } // txMsg packages a bitcoin tx message and the peer it came from together @@ -241,6 +239,8 @@ type blockManager struct { started int32 shutdown int32 blockChain *blockchain.BlockChain + requestedTxns map[wire.ShaHash]struct{} + requestedBlocks map[wire.ShaHash]struct{} progressLogger *blockProgressLogger receivedLogBlocks int64 receivedLogTx int64 @@ -286,7 +286,7 @@ func (b *blockManager) peerInfo(p *peer.Peer) (*peerInfo, error) { pInfo, ok := b.peers[p] if !ok { - return nil, fmt.Errorf("Missing peer info for %s", p) + return nil, fmt.Errorf("missing peer info for %s", p) } return pInfo, nil } @@ -482,7 +482,9 @@ func (b *blockManager) handleNewPeerMsg(peers *list.List, p *peer.Peer) { // removes the peer as a candidate for syncing and in the case where it was // the current sync peer, attempts to select a new best peer to sync from. It // is invoked from the syncHandler goroutine. -func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer.Peer) { +func (b *blockManager) handleDonePeerMsg(peers *list.List, dmsg *donePeerMsg) { + p := dmsg.pInfo.Peer + // Remove the peer from the list of candidate peers. for e := peers.Front(); e != nil; e = e.Next() { if e.Value == p { @@ -490,13 +492,22 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer.Peer) { break } } + bmgrLog.Infof("Lost peer %s", p) - // Remove peer from the global peer map so that requested tx and blocks - // will be fetched from elsewhere next time. - b.peersMtx.Lock() - delete(b.peers, p) - b.peersMtx.Unlock() + // Remove requested transactions from the global map so that they will + // be fetched from elsewhere next time we get an inv. + for k := range dmsg.pInfo.requestedTxns { + delete(b.requestedTxns, k) + } + + // Remove requested blocks from the global map so that they will be + // fetched from elsewhere next time we get an inv. + // TODO(oga) we could possibly here check which peers have these blocks + // and request them now to speed things up a little. + for k := range dmsg.pInfo.requestedBlocks { + delete(b.requestedBlocks, k) + } // Attempt to find a new peer to sync from if the quitting peer is the // sync peer. Also, reset the headers-first state if in headers-first @@ -542,6 +553,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // we'll retry next time we get an inv. txHash := tmsg.tx.Sha() delete(tmsg.pInfo.requestedTxns, *txHash) + delete(b.requestedTxns, *txHash) if err != nil { // When the error is a rule error, it means the transaction was @@ -632,10 +644,11 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { } } - // Remove block from request map. Either chain will know about it and + // Remove block from request maps. Either chain will know about it and // so we shouldn't have any more instances of trying to fetch it, or we // will fail the insert and thus we'll retry next time we get an inv. delete(bmsg.pInfo.requestedBlocks, *blockSha) + delete(b.requestedBlocks, *blockSha) // Process the block to include validation, best chain selection, orphan // handling, etc. @@ -751,10 +764,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // request more blocks using the header list when the request queue is // getting short. if !isCheckpointBlock { - peerRequestedBlocks := len(bmsg.pInfo.requestedBlocks) if b.startHeader != nil && - peerRequestedBlocks < minInFlightBlocks { - b.fetchHeaderBlocks(bmsg.pInfo) + len(bmsg.pInfo.requestedBlocks) < minInFlightBlocks { + b.fetchHeaderBlocks() } return } @@ -797,7 +809,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // fetchHeaderBlocks creates and sends a request to the syncPeer for the next // list of blocks to be downloaded based on the current list of headers. -func (b *blockManager) fetchHeaderBlocks(pInfo *peerInfo) { +func (b *blockManager) fetchHeaderBlocks() { // Nothing to do if there is no start header. if b.startHeader == nil { bmgrLog.Warnf("fetchHeaderBlocks called with no start header") @@ -824,6 +836,12 @@ func (b *blockManager) fetchHeaderBlocks(pInfo *peerInfo) { "fetch: %v", err) } if !haveInv { + b.requestedBlocks[*node.sha] = struct{}{} + pInfo, err := b.peerInfo(b.syncPeer) + if err != nil { + bmgrLog.Errorf("%v", err) + continue + } pInfo.requestedBlocks[*node.sha] = struct{}{} gdmsg.AddInvVect(iv) numRequested++ @@ -922,7 +940,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { bmgrLog.Infof("Received %v block headers: Fetching blocks", b.headerList.Len()) b.progressLogger.SetLastLogTime(time.Now()) - b.fetchHeaderBlocks(hmsg.pInfo) + b.fetchHeaderBlocks() return } @@ -1101,7 +1119,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { case wire.InvTypeBlock: // Request the block if there is not already a pending // request. - if _, exists := imsg.pInfo.requestedBlocks[iv.Hash]; !exists { + if _, exists := b.requestedBlocks[iv.Hash]; !exists { + b.requestedBlocks[iv.Hash] = struct{}{} imsg.pInfo.requestedBlocks[iv.Hash] = struct{}{} gdmsg.AddInvVect(iv) numRequested++ @@ -1110,7 +1129,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { case wire.InvTypeTx: // Request the transaction if there is not already a // pending request. - if _, exists := imsg.pInfo.requestedTxns[iv.Hash]; !exists { + if _, exists := b.requestedTxns[iv.Hash]; !exists { + b.requestedTxns[iv.Hash] = struct{}{} imsg.pInfo.requestedTxns[iv.Hash] = struct{}{} gdmsg.AddInvVect(iv) numRequested++ @@ -1165,7 +1185,7 @@ out: b.handleHeadersMsg(msg) case *donePeerMsg: - b.handleDonePeerMsg(candidatePeers, msg.peer) + b.handleDonePeerMsg(candidatePeers, msg) case getSyncPeerMsg: msg.reply <- b.syncPeer @@ -1426,12 +1446,17 @@ func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, p *peer.Peer) { // DonePeer informs the blockmanager that a peer has disconnected. func (b *blockManager) DonePeer(p *peer.Peer) { + pInfo, err := b.peerInfo(p) + if err != nil { + bmgrLog.Errorf("%v", err) + return + } // Ignore if we are shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { return } - b.msgChan <- &donePeerMsg{peer: p} + b.msgChan <- &donePeerMsg{pInfo: pInfo} } // Start begins the core block handler which processes block and inv messages. @@ -1536,12 +1561,14 @@ func newBlockManager(s *server) (*blockManager, error) { } bm := blockManager{ - server: s, - progressLogger: newBlockProgressLogger("Processed", bmgrLog), - msgChan: make(chan interface{}, cfg.MaxPeers*3), - headerList: list.New(), - quit: make(chan struct{}), - peers: make(map[*peer.Peer]*peerInfo), + server: s, + requestedTxns: make(map[wire.ShaHash]struct{}), + requestedBlocks: make(map[wire.ShaHash]struct{}), + progressLogger: newBlockProgressLogger("Processed", bmgrLog), + msgChan: make(chan interface{}, cfg.MaxPeers*3), + headerList: list.New(), + quit: make(chan struct{}), + peers: make(map[*peer.Peer]*peerInfo), } bm.progressLogger = newBlockProgressLogger("Processed", bmgrLog) bm.blockChain = blockchain.New(s.db, s.chainParams, bm.handleNotifyMsg) diff --git a/docs/README.md b/docs/README.md index 4137591bc5a..39dc17b28c9 100644 --- a/docs/README.md +++ b/docs/README.md @@ -203,6 +203,8 @@ information. for the underlying JSON-RPC command and return values * [wire](https://github.com/btcsuite/btcd/tree/master/wire) - Implements the Bitcoin wire protocol + * [peer](https://github.com/btcsuite/btcd/tree/master/peer) - + Provides a common base for creating and managing Bitcoin network peers. * [blockchain](https://github.com/btcsuite/btcd/tree/master/blockchain) - Implements Bitcoin block handling and chain selection rules * [txscript](https://github.com/btcsuite/btcd/tree/master/txscript) - @@ -214,5 +216,3 @@ information. Provides a database interface for the Bitcoin block chain * [btcutil](https://github.com/btcsuite/btcutil) - Provides Bitcoin-specific convenience functions and types - * [peer](https://github.com/btcsuite/btcd/tree/master/peer) - - Provides a common base for creating and managing Bitcoin network peers. diff --git a/mruinvmap.go b/mruinvmap.go deleted file mode 100644 index f2e1f6ab006..00000000000 --- a/mruinvmap.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright (c) 2013-2015 The btcsuite developers -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - -package main - -import ( - "bytes" - "container/list" - "fmt" - - "github.com/btcsuite/btcd/wire" -) - -// MruInventoryMap provides a map that is limited to a maximum number of items -// with eviction for the oldest entry when the limit is exceeded. -type MruInventoryMap struct { - invMap map[wire.InvVect]*list.Element // nearly O(1) lookups - invList *list.List // O(1) insert, update, delete - limit uint -} - -// String returns the map as a human-readable string. -func (m MruInventoryMap) String() string { - lastEntryNum := len(m.invMap) - 1 - curEntry := 0 - buf := bytes.NewBufferString("[") - for iv := range m.invMap { - buf.WriteString(fmt.Sprintf("%v", iv)) - if curEntry < lastEntryNum { - buf.WriteString(", ") - } - curEntry++ - } - buf.WriteString("]") - - return fmt.Sprintf("<%d>%s", m.limit, buf.String()) -} - -// Exists returns whether or not the passed inventory item is in the map. -func (m *MruInventoryMap) Exists(iv *wire.InvVect) bool { - if _, exists := m.invMap[*iv]; exists { - return true - } - return false -} - -// Add adds the passed inventory to the map and handles eviction of the oldest -// item if adding the new item would exceed the max limit. Adding an existing -// item makes it the most recently used item. -func (m *MruInventoryMap) Add(iv *wire.InvVect) { - // When the limit is zero, nothing can be added to the map, so just - // return. - if m.limit == 0 { - return - } - - // When the entry already exists move it to the front of the list - // thereby marking it most recently used. - if node, exists := m.invMap[*iv]; exists { - m.invList.MoveToFront(node) - return - } - - // Evict the least recently used entry (back of the list) if the the new - // entry would exceed the size limit for the map. Also reuse the list - // node so a new one doesn't have to be allocated. - if uint(len(m.invMap))+1 > m.limit { - node := m.invList.Back() - lru := node.Value.(*wire.InvVect) - - // Evict least recently used item. - delete(m.invMap, *lru) - - // Reuse the list node of the item that was just evicted for the - // new item. - node.Value = iv - m.invList.MoveToFront(node) - m.invMap[*iv] = node - return - } - - // The limit hasn't been reached yet, so just add the new item. - node := m.invList.PushFront(iv) - m.invMap[*iv] = node - return -} - -// Delete deletes the passed inventory item from the map (if it exists). -func (m *MruInventoryMap) Delete(iv *wire.InvVect) { - if node, exists := m.invMap[*iv]; exists { - m.invList.Remove(node) - delete(m.invMap, *iv) - } -} - -// NewMruInventoryMap returns a new inventory map that is limited to the number -// of entries specified by limit. When the number of entries exceeds the limit, -// the oldest (least recently used) entry will be removed to make room for the -// new entry. -func NewMruInventoryMap(limit uint) *MruInventoryMap { - m := MruInventoryMap{ - invMap: make(map[wire.InvVect]*list.Element), - invList: list.New(), - limit: limit, - } - return &m -} diff --git a/mruinvmap_test.go b/mruinvmap_test.go deleted file mode 100644 index 10a2d9ee4d7..00000000000 --- a/mruinvmap_test.go +++ /dev/null @@ -1,169 +0,0 @@ -// Copyright (c) 2013-2015 The btcsuite developers -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - -package main - -import ( - "crypto/rand" - "fmt" - "testing" - - "github.com/btcsuite/btcd/wire" -) - -// TestMruInventoryMap ensures the MruInventoryMap behaves as expected including -// limiting, eviction of least-recently used entries, specific entry removal, -// and existence tests. -func TestMruInventoryMap(t *testing.T) { - // Create a bunch of fake inventory vectors to use in testing the mru - // inventory code. - numInvVects := 10 - invVects := make([]*wire.InvVect, 0, numInvVects) - for i := 0; i < numInvVects; i++ { - hash := &wire.ShaHash{byte(i)} - iv := wire.NewInvVect(wire.InvTypeBlock, hash) - invVects = append(invVects, iv) - } - - tests := []struct { - name string - limit int - }{ - {name: "limit 0", limit: 0}, - {name: "limit 1", limit: 1}, - {name: "limit 5", limit: 5}, - {name: "limit 7", limit: 7}, - {name: "limit one less than available", limit: numInvVects - 1}, - {name: "limit all available", limit: numInvVects}, - } - -testLoop: - for i, test := range tests { - // Create a new mru inventory map limited by the specified test - // limit and add all of the test inventory vectors. This will - // cause evicition since there are more test inventory vectors - // than the limits. - mruInvMap := NewMruInventoryMap(uint(test.limit)) - for j := 0; j < numInvVects; j++ { - mruInvMap.Add(invVects[j]) - } - - // Ensure the limited number of most recent entries in the - // inventory vector list exist. - for j := numInvVects - 1; j >= numInvVects-test.limit; j-- { - if !mruInvMap.Exists(invVects[j]) { - t.Errorf("Exists #%d (%s) entry %s does not "+ - "exist", i, test.name, *invVects[j]) - continue testLoop - } - } - - // Ensure the entries before the limited number of most recent - // entries in the inventory vector list do not exist. - for j := numInvVects - test.limit - 1; j >= 0; j-- { - if mruInvMap.Exists(invVects[j]) { - t.Errorf("Exists #%d (%s) entry %s exists", i, - test.name, *invVects[j]) - continue testLoop - } - } - - // Readd the entry that should currently be the least-recently - // used entry so it becomes the most-recently used entry, then - // force an eviction by adding an entry that doesn't exist and - // ensure the evicted entry is the new least-recently used - // entry. - // - // This check needs at least 2 entries. - if test.limit > 1 { - origLruIndex := numInvVects - test.limit - mruInvMap.Add(invVects[origLruIndex]) - - iv := wire.NewInvVect(wire.InvTypeBlock, - &wire.ShaHash{0x00, 0x01}) - mruInvMap.Add(iv) - - // Ensure the original lru entry still exists since it - // was updated and should've have become the mru entry. - if !mruInvMap.Exists(invVects[origLruIndex]) { - t.Errorf("MRU #%d (%s) entry %s does not exist", - i, test.name, *invVects[origLruIndex]) - continue testLoop - } - - // Ensure the entry that should've become the new lru - // entry was evicted. - newLruIndex := origLruIndex + 1 - if mruInvMap.Exists(invVects[newLruIndex]) { - t.Errorf("MRU #%d (%s) entry %s exists", i, - test.name, *invVects[newLruIndex]) - continue testLoop - } - } - - // Delete all of the entries in the inventory vector list, - // including those that don't exist in the map, and ensure they - // no longer exist. - for j := 0; j < numInvVects; j++ { - mruInvMap.Delete(invVects[j]) - if mruInvMap.Exists(invVects[j]) { - t.Errorf("Delete #%d (%s) entry %s exists", i, - test.name, *invVects[j]) - continue testLoop - } - } - } -} - -// TestMruInventoryMapStringer tests the stringized output for the -// MruInventoryMap type. -func TestMruInventoryMapStringer(t *testing.T) { - // Create a couple of fake inventory vectors to use in testing the mru - // inventory stringer code. - hash1 := &wire.ShaHash{0x01} - hash2 := &wire.ShaHash{0x02} - iv1 := wire.NewInvVect(wire.InvTypeBlock, hash1) - iv2 := wire.NewInvVect(wire.InvTypeBlock, hash2) - - // Create new mru inventory map and add the inventory vectors. - mruInvMap := NewMruInventoryMap(uint(2)) - mruInvMap.Add(iv1) - mruInvMap.Add(iv2) - - // Ensure the stringer gives the expected result. Since map iteration - // is not ordered, either entry could be first, so account for both - // cases. - wantStr1 := fmt.Sprintf("<%d>[%s, %s]", 2, *iv1, *iv2) - wantStr2 := fmt.Sprintf("<%d>[%s, %s]", 2, *iv2, *iv1) - gotStr := mruInvMap.String() - if gotStr != wantStr1 && gotStr != wantStr2 { - t.Fatalf("unexpected string representation - got %q, want %q "+ - "or %q", gotStr, wantStr1, wantStr2) - } -} - -// BenchmarkMruInventoryList performs basic benchmarks on the most recently -// used inventory handling. -func BenchmarkMruInventoryList(b *testing.B) { - // Create a bunch of fake inventory vectors to use in benchmarking - // the mru inventory code. - b.StopTimer() - numInvVects := 100000 - invVects := make([]*wire.InvVect, 0, numInvVects) - for i := 0; i < numInvVects; i++ { - hashBytes := make([]byte, wire.HashSize) - rand.Read(hashBytes) - hash, _ := wire.NewShaHash(hashBytes) - iv := wire.NewInvVect(wire.InvTypeBlock, hash) - invVects = append(invVects, iv) - } - b.StartTimer() - - // Benchmark the add plus evicition code. - limit := 20000 - mruInvMap := NewMruInventoryMap(uint(limit)) - for i := 0; i < b.N; i++ { - mruInvMap.Add(invVects[i%numInvVects]) - } -} diff --git a/peer/example_test.go b/peer/example_test.go index 8ef8f3d94eb..17de2d8dc05 100644 --- a/peer/example_test.go +++ b/peer/example_test.go @@ -5,139 +5,107 @@ package peer_test import ( - "errors" "fmt" - "log" "net" "time" - "github.com/btcsuite/btcd/addrmgr" + "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" ) -// lookupFunc is a callback which resolves IPs from the provided host string. -// In this example, a standard "ip:port" hostname is used, therefore this func -// is not implemented. -func lookupFunc(host string) ([]net.IP, error) { - return nil, errors.New("not implemented") -} - -// newestSha returns the latest known block to this peer. -// In this example, it returns a hard-coded hash and height. -func newestSha() (*wire.ShaHash, int32, error) { - hashStr := "14a0810ac680a3eb3f82edc878cea25ec41d6b790744e5daeef" - hash, err := wire.NewShaHashFromStr(hashStr) - if err != nil { - return nil, 0, err +// mockRemotePeer creates a basic inbound peer listening on the simnet port for +// use with Example_peerConnection. It does not return until the listner is +// active. +func mockRemotePeer() error { + // Configure peer to act as a simnet node that offers no services. + peerCfg := &peer.Config{ + UserAgentName: "peer", // User agent name to advertise. + UserAgentVersion: "1.0.0", // User agent version to advertise. + ChainParams: &chaincfg.SimNetParams, } - return hash, 234439, nil -} -// This example demonstrates initializing both inbound and outbound peers. An -// inbound peer listening on simnet port i.e 18555 is started first, then an -// outbound peer is connected to it. -// Peers negotiate protocol by exchanging version and verack messages. For -// demonstration, a simple handler for version message is attached to both -// peers. -func Example_peerConnection() { - addrMgr := addrmgr.New("test", lookupFunc) - // Configure peers to act as a simnet full node. - peerCfg := &peer.Config{ - // Way to get the latest known block to this peer. - NewestBlock: newestSha, - // Way to get the most appropriate local address. - BestLocalAddress: addrMgr.GetBestLocalAddress, - // User agent details to advertise. - UserAgentName: "peer", - UserAgentVersion: "1.0", - // Network and service flag to use. - Net: wire.SimNet, - Services: wire.SFNodeNetwork, + // Accept connections on the simnet port. + listener, err := net.Listen("tcp", "127.0.0.1:18555") + if err != nil { + return err } - // Chan to sync the outbound and inbound peers. - listening := make(chan error) go func() { - // Accept connections on the simnet port. - l1, err := net.Listen("tcp", "127.0.0.1:18555") + conn, err := listener.Accept() if err != nil { - listening <- err + fmt.Printf("Accept: error %v\n", err) return } - // Signal that we are listening for connections. - listening <- nil - c1, err := l1.Accept() - if err != nil { - log.Fatalf("Listen: error %v\n", err) - } - // Get a nonce for the inbound peer. - nonce, err := wire.RandomUint64() - if err != nil { - log.Fatalf("wire.RandomUint64 err: %v", err) - } - // Start the inbound peer. - p1 := peer.NewInboundPeer(peerCfg, nonce, c1) - // Add a listener for version message. - // Listeners are identified by the provided string so they can be - // removed later, if required. - p1.AddVersionMsgListener("handleVersionMsg", func(p *peer.Peer, msg *wire.MsgVersion) { - fmt.Println("inbound: received version") - }) - err = p1.Start() - if err != nil { + // Create and start the inbound peer. + p := peer.NewInboundPeer(peerCfg, conn) + if err := p.Start(); err != nil { fmt.Printf("Start: error %v\n", err) return } }() - // Get a nonce for the outbound peer. - nonce, err := wire.RandomUint64() - if err != nil { - fmt.Printf("wire.RandomUint64 err: %v", err) + + return nil +} + +// This example demonstrates the basic process for initializing and creating an +// outbound peer. Peers negotiate by exchanging version and verack messages. +// For demonstration, a simple handler for version message is attached to the +// peer. +func Example_newOutboundPeer() { + // Ordinarily this will not be needed since the outbound peer will be + // connecting to a remote peer, however, since this example is executed + // and tested, a mock remote peer is needed to listen for the outbound + // peer. + if err := mockRemotePeer(); err != nil { + fmt.Printf("mockRemotePeer: unexpected error %v\n", err) return } - // Get a network address for use with the outbound peer. - na, err := addrMgr.HostToNetAddress("127.0.0.1", uint16(18555), peerCfg.Services) + + // Create an outbound peer that is configured to act as a simnet node + // that offers no services and has listeners for the version and verack + // messages. The verack listener is used here to signal the code below + // when the handshake has been finished by signalling a channel. + verack := make(chan struct{}) + peerCfg := &peer.Config{ + UserAgentName: "peer", // User agent name to advertise. + UserAgentVersion: "1.0.0", // User agent version to advertise. + ChainParams: &chaincfg.SimNetParams, + Services: 0, + Listeners: peer.MessageListeners{ + OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) { + fmt.Println("outbound: received version") + }, + OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) { + verack <- struct{}{} + }, + }, + } + na := wire.NewNetAddressIPPort(net.IP{127, 0, 0, 1}, uint16(18555), + peerCfg.Services) + p := peer.NewOutboundPeer(peerCfg, na) + + // Establish the connection to the peer address and mark it connected. + conn, err := net.Dial("tcp", p.Addr()) if err != nil { - fmt.Printf("HostToNetAddress: error %v\n", err) + fmt.Printf("net.Dial: error %v\n", err) return } - // Wait until the inbound peer is listening for connections. - err = <-listening - if err != nil { - fmt.Printf("Listen: error %v\n", err) + if err := p.Connect(conn); err != nil { + fmt.Printf("Connect: error %v\n", err) return } - // Start the outbound peer. - p2 := peer.NewOutboundPeer(peerCfg, nonce, na) - go func() { - conn, err := net.Dial("tcp", p2.Addr()) - if err != nil { - fmt.Printf("btcDial: error %v\n", err) - return - } - if err := p2.Connect(conn); err != nil { - fmt.Printf("Connect: error %v\n", err) - return - } - }() - // Add a listener for version message. - p2.AddVersionMsgListener("handleVersionMsg", func(p *peer.Peer, msg *wire.MsgVersion) { - fmt.Println("outbound: received version") - }) - // Wait until verack is received to finish the handshake. To do this, we - // add a verack listener on the outbound peer and use a chan to sync. - verack := make(chan struct{}) - p2.AddVerAckMsgListener("handleVerAckMsg", func(p *peer.Peer, msg *wire.MsgVerAck) { - verack <- struct{}{} - }) - // In case something goes wrong, timeout. + + // Wait for the verack message or timeout in case of failure. select { case <-verack: case <-time.After(time.Second * 1): fmt.Printf("Example_peerConnection: verack timeout") } + + // Shutdown the peer. + p.Shutdown() + // Output: - // inbound: received version // outbound: received version } diff --git a/peer/export_test.go b/peer/export_test.go new file mode 100644 index 00000000000..06ec78a1a57 --- /dev/null +++ b/peer/export_test.go @@ -0,0 +1,18 @@ +// Copyright (c) 2015 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +/* +This test file is part of the peer package rather than than the peer_test +package so it can bridge access to the internals to properly test cases which +are either not possible or can't reliably be tested via the public interface. +The functions are only exported while the tests are being run. +*/ + +package peer + +// TstAllowSelfConns allows the test package to allow self connections by +// disabling the detection logic. +func TstAllowSelfConns() { + allowSelfConns = true +} diff --git a/peer/log_test.go b/peer/log_test.go index 42295d7573c..39b49388bd9 100644 --- a/peer/log_test.go +++ b/peer/log_test.go @@ -4,9 +4,9 @@ package peer_test import ( + "bytes" "errors" "io" - "os" "testing" "github.com/btcsuite/btcd/peer" @@ -27,19 +27,19 @@ func TestSetLogWriter(t *testing.T) { }, { name: "invalid log level", - w: os.Stdout, + w: bytes.NewBuffer(nil), level: "wrong", expected: errors.New("invalid log level"), }, { name: "use off level", - w: os.Stdout, + w: bytes.NewBuffer(nil), level: "off", expected: errors.New("min level can't be greater than max. Got min: 6, max: 5"), }, { name: "pass", - w: os.Stdout, + w: bytes.NewBuffer(nil), level: "debug", expected: nil, }, diff --git a/peer/mrunoncemap.go b/peer/mrunoncemap.go new file mode 100644 index 00000000000..c0676cdae16 --- /dev/null +++ b/peer/mrunoncemap.go @@ -0,0 +1,120 @@ +// Copyright (c) 2013-2015 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package peer + +import ( + "bytes" + "container/list" + "fmt" + "sync" +) + +// mruNonceMap provides a map that is limited to a maximum number of items +// with eviction for the oldest entry when the limit is exceeded. +type mruNonceMap struct { + mtx sync.Mutex + nonceMap map[uint64]*list.Element // nearly O(1) lookups + nonceList *list.List // O(1) insert, update, delete + limit uint +} + +// String returns the map as a human-readable string. +func (m *mruNonceMap) String() string { + m.mtx.Lock() + defer m.mtx.Unlock() + + lastEntryNum := len(m.nonceMap) - 1 + curEntry := 0 + buf := bytes.NewBufferString("[") + for nonce := range m.nonceMap { + buf.WriteString(fmt.Sprintf("%d", nonce)) + if curEntry < lastEntryNum { + buf.WriteString(", ") + } + curEntry++ + } + buf.WriteString("]") + + return fmt.Sprintf("<%d>%s", m.limit, buf.String()) +} + +// Exists returns whether or not the passed nonce is in the map. +func (m *mruNonceMap) Exists(nonce uint64) bool { + m.mtx.Lock() + defer m.mtx.Unlock() + + if _, exists := m.nonceMap[nonce]; exists { + return true + } + return false +} + +// Add adds the passed nonce to the map and handles eviction of the oldest item +// if adding the new item would exceed the max limit. Adding an existing item +// makes it the most recently used item. +func (m *mruNonceMap) Add(nonce uint64) { + m.mtx.Lock() + defer m.mtx.Unlock() + + // When the limit is zero, nothing can be added to the map, so just + // return. + if m.limit == 0 { + return + } + + // When the entry already exists move it to the front of the list + // thereby marking it most recently used. + if node, exists := m.nonceMap[nonce]; exists { + m.nonceList.MoveToFront(node) + return + } + + // Evict the least recently used entry (back of the list) if the the new + // entry would exceed the size limit for the map. Also reuse the list + // node so a new one doesn't have to be allocated. + if uint(len(m.nonceMap))+1 > m.limit { + node := m.nonceList.Back() + lru := node.Value.(uint64) + + // Evict least recently used item. + delete(m.nonceMap, lru) + + // Reuse the list node of the item that was just evicted for the + // new item. + node.Value = nonce + m.nonceList.MoveToFront(node) + m.nonceMap[nonce] = node + return + } + + // The limit hasn't been reached yet, so just add the new item. + node := m.nonceList.PushFront(nonce) + m.nonceMap[nonce] = node + return +} + +// Delete deletes the passed nonce from the map (if it exists). +func (m *mruNonceMap) Delete(nonce uint64) { + m.mtx.Lock() + defer m.mtx.Unlock() + + if node, exists := m.nonceMap[nonce]; exists { + m.nonceList.Remove(node) + delete(m.nonceMap, nonce) + } +} + +// newMruNonceMap returns a new nonce map that is limited to the number of +// entries specified by limit. When the number of entries exceeds the limit, +// the oldest (least recently used) entry will be removed to make room for the +// new entry. +func newMruNonceMap(limit uint) *mruNonceMap { + m := mruNonceMap{ + nonceMap: make(map[uint64]*list.Element), + nonceList: list.New(), + limit: limit, + } + return &m +} diff --git a/peer/mrunoncemap_test.go b/peer/mrunoncemap_test.go new file mode 100644 index 00000000000..659dd43f605 --- /dev/null +++ b/peer/mrunoncemap_test.go @@ -0,0 +1,152 @@ +// Copyright (c) 2013-2015 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package peer + +import ( + "fmt" + "testing" +) + +// TestMruNonceMap ensures the mruNonceMap behaves as expected including +// limiting, eviction of least-recently used entries, specific entry removal, +// and existence tests. +func TestMruNonceMap(t *testing.T) { + // Create a bunch of fake nonces to use in testing the mru nonce code. + numNonces := 10 + nonces := make([]uint64, 0, numNonces) + for i := 0; i < numNonces; i++ { + nonces = append(nonces, uint64(i)) + } + + tests := []struct { + name string + limit int + }{ + {name: "limit 0", limit: 0}, + {name: "limit 1", limit: 1}, + {name: "limit 5", limit: 5}, + {name: "limit 7", limit: 7}, + {name: "limit one less than available", limit: numNonces - 1}, + {name: "limit all available", limit: numNonces}, + } + +testLoop: + for i, test := range tests { + // Create a new mru nonce map limited by the specified test + // limit and add all of the test nonces. This will cause + // evicition since there are more test nonces than the limits. + mruNonceMap := newMruNonceMap(uint(test.limit)) + for j := 0; j < numNonces; j++ { + mruNonceMap.Add(nonces[j]) + } + + // Ensure the limited number of most recent entries in the list + // exist. + for j := numNonces - 1; j >= numNonces-test.limit; j-- { + if !mruNonceMap.Exists(nonces[j]) { + t.Errorf("Exists #%d (%s) entry %d does not "+ + "exist", i, test.name, nonces[j]) + continue testLoop + } + } + + // Ensure the entries before the limited number of most recent + // entries in the list do not exist. + for j := numNonces - test.limit - 1; j >= 0; j-- { + if mruNonceMap.Exists(nonces[j]) { + t.Errorf("Exists #%d (%s) entry %d exists", i, + test.name, nonces[j]) + continue testLoop + } + } + + // Readd the entry that should currently be the least-recently + // used entry so it becomes the most-recently used entry, then + // force an eviction by adding an entry that doesn't exist and + // ensure the evicted entry is the new least-recently used + // entry. + // + // This check needs at least 2 entries. + if test.limit > 1 { + origLruIndex := numNonces - test.limit + mruNonceMap.Add(nonces[origLruIndex]) + + mruNonceMap.Add(uint64(numNonces) + 1) + + // Ensure the original lru entry still exists since it + // was updated and should've have become the mru entry. + if !mruNonceMap.Exists(nonces[origLruIndex]) { + t.Errorf("MRU #%d (%s) entry %d does not exist", + i, test.name, nonces[origLruIndex]) + continue testLoop + } + + // Ensure the entry that should've become the new lru + // entry was evicted. + newLruIndex := origLruIndex + 1 + if mruNonceMap.Exists(nonces[newLruIndex]) { + t.Errorf("MRU #%d (%s) entry %d exists", i, + test.name, nonces[newLruIndex]) + continue testLoop + } + } + + // Delete all of the entries in the list, including those that + // don't exist in the map, and ensure they no longer exist. + for j := 0; j < numNonces; j++ { + mruNonceMap.Delete(nonces[j]) + if mruNonceMap.Exists(nonces[j]) { + t.Errorf("Delete #%d (%s) entry %d exists", i, + test.name, nonces[j]) + continue testLoop + } + } + } +} + +// TestMruNonceMapStringer tests the stringized output for the mruNonceMap type. +func TestMruNonceMapStringer(t *testing.T) { + // Create a couple of fake nonces to use in testing the mru nonce + // stringer code. + nonce1 := uint64(10) + nonce2 := uint64(20) + + // Create new mru nonce map and add the nonces. + mruNonceMap := newMruNonceMap(uint(2)) + mruNonceMap.Add(nonce1) + mruNonceMap.Add(nonce2) + + // Ensure the stringer gives the expected result. Since map iteration + // is not ordered, either entry could be first, so account for both + // cases. + wantStr1 := fmt.Sprintf("<%d>[%d, %d]", 2, nonce1, nonce2) + wantStr2 := fmt.Sprintf("<%d>[%d, %d]", 2, nonce2, nonce1) + gotStr := mruNonceMap.String() + if gotStr != wantStr1 && gotStr != wantStr2 { + t.Fatalf("unexpected string representation - got %q, want %q "+ + "or %q", gotStr, wantStr1, wantStr2) + } +} + +// BenchmarkMruNonceList performs basic benchmarks on the most recently used +// nonce handling. +func BenchmarkMruNonceList(b *testing.B) { + // Create a bunch of fake nonces to use in benchmarking the mru nonce + // code. + b.StopTimer() + numNonces := 100000 + nonces := make([]uint64, 0, numNonces) + for i := 0; i < numNonces; i++ { + nonces = append(nonces, uint64(i)) + } + b.StartTimer() + + // Benchmark the add plus evicition code. + limit := 20000 + mruNonceMap := newMruNonceMap(uint(limit)) + for i := 0; i < b.N; i++ { + mruNonceMap.Add(nonces[i%numNonces]) + } +} diff --git a/peer/peer.go b/peer/peer.go index 7bc1500cd9a..ea58094157e 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -17,6 +17,7 @@ import ( "time" "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/go-socks/socks" "github.com/davecgh/go-spew/spew" @@ -24,7 +25,7 @@ import ( const ( // MaxProtocolVersion is the max protocol version the peer supports. - MaxProtocolVersion = 70002 + MaxProtocolVersion = 70011 // BlockStallTimeout is the number of seconds we will wait for a // "block" response after we send out a "getdata" for an announced @@ -46,8 +47,8 @@ const ( // reply before we will ping a host. pingTimeout = 2 * time.Minute - // negotiateTimeout is the duration of inactivity before we timeout a peer - // that hasn't completed the initial version negotiation. + // negotiateTimeout is the duration of inactivity before we timeout a + // peer that hasn't completed the initial version negotiation. negotiateTimeout = 30 * time.Second // idleTimeout is the duration of inactivity before we time out a peer. @@ -66,43 +67,154 @@ var ( // zeroHash is the zero value hash (all zeros). It is defined as a // convenience. zeroHash wire.ShaHash + + // sentNonces houses the unique nonces that are generated when pushing + // version messages that are used to detect self connections. + sentNonces = newMruNonceMap(50) + + // allowSelfConns is only used to allow the tests to bypass the self + // connection detecting and disconnect logic since they intentionally + // do so for testing purposes. + allowSelfConns bool ) +// MessageListeners defines callback function pointers to invoke with message +// listeners. Since all of the functions are nil by default, all listeners are +// effectively ignored until their handlers are set to a concrete callback. +// +// NOTE: Unless otherwise documented, these listeners must NOT directly call any +// blocking calls on the peer instance since the inHandler goroutine blocks +// until the callback has completed. Doing so will result in a deadlock +// situation. +type MessageListeners struct { + // OnGetAddr is invoked when a peer receives a getaddr bitcoin message. + OnGetAddr func(p *Peer, msg *wire.MsgGetAddr) + + // OnAddr is invoked when a peer receives an addr bitcoin message. + OnAddr func(p *Peer, msg *wire.MsgAddr) + + // OnPing is invoked when a peer receives a ping bitcoin message. + OnPing func(p *Peer, msg *wire.MsgPing) + + // OnPong is invoked when a peer receives a pong bitcoin message. + OnPong func(p *Peer, msg *wire.MsgPong) + + // OnAlert is invoked when a peer receives an alert bitcoin message. + OnAlert func(p *Peer, msg *wire.MsgAlert) + + // OnMemPool is invoked when a peer receives a mempool bitcoin message. + OnMemPool func(p *Peer, msg *wire.MsgMemPool) + + // OnTx is invoked when a peer receives a tx bitcoin message. + OnTx func(p *Peer, msg *wire.MsgTx) + + // OnBlock is invoked when a peer receives a block bitcoin message. + OnBlock func(p *Peer, msg *wire.MsgBlock, buf []byte) + + // OnInv is invoked when a peer receives an inv bitcoin message. + OnInv func(p *Peer, msg *wire.MsgInv) + + // OnHeaders is invoked when a peer receives a headers bitcoin message. + OnHeaders func(p *Peer, msg *wire.MsgHeaders) + + // OnNotFound is invoked when a peer receives a notfound bitcoin + // message. + OnNotFound func(p *Peer, msg *wire.MsgNotFound) + + // OnGetData is invoked when a peer receives a getdata bitcoin message. + OnGetData func(p *Peer, msg *wire.MsgGetData) + + // OnGetBlocks is invoked when a peer receives a getblocks bitcoin + // message. + OnGetBlocks func(p *Peer, msg *wire.MsgGetBlocks) + + // OnGetHeaders is invoked when a peer receives a getheaders bitcoin + // message. + OnGetHeaders func(p *Peer, msg *wire.MsgGetHeaders) + + // OnFilterAdd is invoked when a peer receives a filteradd bitcoin + // message. + OnFilterAdd func(p *Peer, msg *wire.MsgFilterAdd) + + // OnFilterClear is invoked when a peer receives a filterclear bitcoin + // message. + OnFilterClear func(p *Peer, msg *wire.MsgFilterClear) + + // OnFilterLoad is invoked when a peer receives a filterload bitcoin + // message. + OnFilterLoad func(p *Peer, msg *wire.MsgFilterLoad) + + // OnMerkleBlock is invoked when a peer receives a merkleblock bitcoin + // message. + OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock) + + // OnVersion is invoked when a peer receives a version bitcoin message. + OnVersion func(p *Peer, msg *wire.MsgVersion) + + // OnVerAck is invoked when a peer receives a verack bitcoin message. + OnVerAck func(p *Peer, msg *wire.MsgVerAck) + + // OnReject is invoked when a peer receives a reject bitcoin message. + OnReject func(p *Peer, msg *wire.MsgReject) + + // OnRead is invoked when a peer receives a bitcoin message. It + // consists of the number of bytes read, the message, and whether or not + // an error in the read occurred. Typically, callers will opt to use + // the callbacks for the specific message types, however this can be + // useful for circumstances such as keeping track of server-wide byte + // counts or working with custom message types for which the peer does + // not directly provide a callback. + OnRead func(p *Peer, bytesRead int, msg wire.Message, err error) + + // OnWrite is invoked when a peer receives a bitcoin message. It + // consists of the number of bytes written, the message, and whether or + // not an error in the write occurred. This can be useful for + // circumstances such as keeping track of server-wide byte counts. + OnWrite func(p *Peer, bytesWritten int, msg wire.Message, err error) +} + // Config is the struct to hold configuration options useful to Peer. type Config struct { - - // Callback which returns the newest block details. + // NewestBlock specifies a callback which provides the newest block + // details to the peer as needed. This can be nil in which case the + // peer will report a block height of 0. Typically, only full nodes + // will need to specify this. NewestBlock ShaFunc // BestLocalAddress returns the best local address for a given address. BestLocalAddress AddrFunc - // SOCKS5 proxy (eg. 127.0.0.1:9050) to use for connections. + // Proxy specifies a SOCKS5 proxy (eg. 127.0.0.1:9050) to use for + // connections. Proxy string - // Whether to use the regression test network. - RegressionTest bool - - // If non-nil, the callback to be invoked when reading a peer message. - OnRead func(int, *wire.Message, error) - - // If non-nil, the callback to be invoked when writing a peer message. - OnWrite func(int, *wire.Message, error) - - // User agent string to be used in peer messages. + // UserAgentName specifies the user agent name to advertise. It is + // highly recommended to specify this value. UserAgentName string - // User agent version to be used in peer messages. + // UserAgentVersion specifies the user agent version to advertise. It + // is highly recommended to specify this value and that it follows the + // form "major.minor.revision" e.g. "2.6.41". UserAgentVersion string - // Network flag to be used. - Net wire.BitcoinNet + // ChainParams identifies which chain parameters the peer is associated + // with. It is highly recommended to specify this field, however it can + // be omitted in which case the test network will be used. + ChainParams *chaincfg.Params - // Services flag to be advertised in peer messages. + // Services specifies which services to advertise as supported by the + // local peer. This field can be omitted in which case it will be 0 + // and therefore advertise no supported services. Services wire.ServiceFlag - // Protocol version to use. + // ProtocolVersion specifies the maximum protocol version to use and + // advertise. This field can be omitted in which case + // peer.MaxProtocolVersion will be used. ProtocolVersion uint32 + + // Listeners houses callback functions to be invoked on receiving peer + // messages. + Listeners MessageListeners } // minUint32 is a helper function to return the minimum of two uint32s. @@ -164,10 +276,6 @@ type outMsg struct { // stats is the collection of stats related to a peer. type stats struct { statsMtx sync.RWMutex // protects all statistics below here. - versionKnown bool - protocolVersion uint32 - versionSent bool - verAckReceived bool timeOffset int64 timeConnected time.Time lastSend time.Time @@ -182,16 +290,16 @@ type stats struct { lastPingMicros int64 // Time for last ping to return. } -// StatsSnap is a snapshot of peer stats at at point in time. +// StatsSnap is a snapshot of peer stats at a point in time. type StatsSnap struct { ID int32 Addr string - Services string - LastSend int64 - LastRecv int64 + Services wire.ServiceFlag + LastSend time.Time + LastRecv time.Time BytesSent uint64 BytesRecv uint64 - ConnTime int64 + ConnTime time.Time TimeOffset int64 Version uint32 UserAgent string @@ -236,26 +344,28 @@ type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress // the inventory together. However, some helper functions for pushing messages // of specific types that typically require common special handling are // provided as a convenience. -// -// The AddListener and RemoveListener family of functions provide the -// ability for the caller to add and remove handlers, respectively, for each of -// the protocol messages the caller is interested in. type Peer struct { - btcnet wire.BitcoinNet started int32 connected int32 disconnect int32 // only to be used atomically conn net.Conn - addr string - cfg *Config - inbound bool - - flagsMtx sync.Mutex // protects the peer flags below - na *wire.NetAddress - id int32 - userAgent string - services wire.ServiceFlag + // These fields are set at creation time and never modified, so they are + // safe to read from concurrently without a mutex. + addr string + cfg Config + chainParams *chaincfg.Params + inbound bool + + flagsMtx sync.Mutex // protects the peer flags below + na *wire.NetAddress + id int32 + userAgent string + services wire.ServiceFlag + versionKnown bool + protocolVersion uint32 + versionSent bool + verAckReceived bool knownInventory *MruInventoryMap prevGetBlocksBegin *wire.ShaHash @@ -265,43 +375,20 @@ type Peer struct { outputQueue chan outMsg sendQueue chan outMsg sendDoneQueue chan struct{} - queueWg sync.WaitGroup // TODO(oga) wg -> single use channel? outputInvChan chan *wire.InvVect blockStallActivate chan time.Duration blockStallTimer <-chan time.Time blockStallCancel chan struct{} + queueQuit chan struct{} quit chan struct{} stats - - newestSha ShaFunc - nonce uint64 - - listenerMtx sync.Mutex - getAddrMsgListeners map[string]func(*Peer, *wire.MsgGetAddr) - addrMsgListeners map[string]func(*Peer, *wire.MsgAddr) - pingMsgListeners map[string]func(*Peer, *wire.MsgPing) - pongMsgListeners map[string]func(*Peer, *wire.MsgPong) - alertMsgListeners map[string]func(*Peer, *wire.MsgAlert) - memPoolMsgListeners map[string]func(*Peer, *wire.MsgMemPool) - txMsgListeners map[string]func(*Peer, *wire.MsgTx) - blockMsgListeners map[string]func(*Peer, *wire.MsgBlock, []byte) - invMsgListeners map[string]func(*Peer, *wire.MsgInv) - headersMsgListeners map[string]func(*Peer, *wire.MsgHeaders) - notFoundMsgListeners map[string]func(*Peer, *wire.MsgNotFound) - getDataMsgListeners map[string]func(*Peer, *wire.MsgGetData) - getBlocksMsgListeners map[string]func(*Peer, *wire.MsgGetBlocks) - getHeadersMsgListeners map[string]func(*Peer, *wire.MsgGetHeaders) - filterAddMsgListeners map[string]func(*Peer, *wire.MsgFilterAdd) - filterClearMsgListeners map[string]func(*Peer, *wire.MsgFilterClear) - filterLoadMsgListeners map[string]func(*Peer, *wire.MsgFilterLoad) - versionMsgListeners map[string]func(*Peer, *wire.MsgVersion) - verackMsgListeners map[string]func(*Peer, *wire.MsgVerAck) - rejectMsgListeners map[string]func(*Peer, *wire.MsgReject) } // String returns the peer's address and directionality as a human-readable // string. +// +// This function is safe for concurrent access. func (p *Peer) String() string { return fmt.Sprintf("%s (%s)", p.addr, directionString(p.inbound)) } @@ -320,38 +407,46 @@ func (p *Peer) isKnownInventory(invVect *wire.InvVect) bool { // message which requests a block will start the timer. If 'timeout' seconds // passes before the peer receives a "block" response, then the peer will // disconnect itself. +// +// This function is safe for concurrent access. func (p *Peer) SetBlockStallTimer(timeout time.Duration) { p.blockStallActivate <- timeout } -// UpdateLastBlockHeight updates the last known block for the peer. It is safe -// for concurrent access. +// UpdateLastBlockHeight updates the last known block for the peer. +// +// This function is safe for concurrent access. func (p *Peer) UpdateLastBlockHeight(newHeight int32) { p.statsMtx.Lock() - defer p.statsMtx.Unlock() - log.Tracef("Updating last block height of peer %v from %v to %v", p.addr, p.lastBlock, newHeight) p.lastBlock = int32(newHeight) + p.statsMtx.Unlock() } // UpdateLastAnnouncedBlock updates meta-data about the last block sha this -// peer is known to have announced. It is safe for concurrent access. +// peer is known to have announced. +// +// This function is safe for concurrent access. func (p *Peer) UpdateLastAnnouncedBlock(blkSha *wire.ShaHash) { - p.statsMtx.Lock() - defer p.statsMtx.Unlock() - log.Tracef("Updating last blk for peer %v, %v", p.addr, blkSha) + + p.statsMtx.Lock() p.lastAnnouncedBlock = blkSha + p.statsMtx.Unlock() } // AddKnownInventory adds the passed inventory to the cache of known inventory -// for the peer. It is safe for concurrent access. +// for the peer. +// +// This function is safe for concurrent access. func (p *Peer) AddKnownInventory(invVect *wire.InvVect) { p.knownInventory.Add(invVect) } -// StatsSnapshot returns a snapshot of the current peer statistics. +// StatsSnapshot returns a snapshot of the current peer flags and statistics. +// +// This function is safe for concurrent access. func (p *Peer) StatsSnapshot() *StatsSnap { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -361,6 +456,7 @@ func (p *Peer) StatsSnapshot() *StatsSnap { addr := p.addr userAgent := p.userAgent services := p.services + protocolVersion := p.protocolVersion p.flagsMtx.Unlock() // Get a copy of all relevant flags and stats. @@ -368,14 +464,14 @@ func (p *Peer) StatsSnapshot() *StatsSnap { ID: id, Addr: addr, UserAgent: userAgent, - Services: fmt.Sprintf("%08d", services), - LastSend: p.lastSend.Unix(), - LastRecv: p.lastRecv.Unix(), + Services: services, + LastSend: p.lastSend, + LastRecv: p.lastRecv, BytesSent: p.bytesSent, BytesRecv: p.bytesReceived, - ConnTime: p.timeConnected.Unix(), + ConnTime: p.timeConnected, TimeOffset: p.timeOffset, - Version: p.protocolVersion, + Version: protocolVersion, Inbound: p.inbound, StartingHeight: p.startingHeight, LastBlock: p.lastBlock, @@ -386,6 +482,8 @@ func (p *Peer) StatsSnapshot() *StatsSnap { } // ID returns the peer id. +// +// This function is safe for concurrent access. func (p *Peer) ID() int32 { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() @@ -394,6 +492,8 @@ func (p *Peer) ID() int32 { } // NA returns the peer network address. +// +// This function is safe for concurrent access. func (p *Peer) NA() *wire.NetAddress { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() @@ -402,18 +502,24 @@ func (p *Peer) NA() *wire.NetAddress { } // Addr returns the peer address. -// The address doesn't change after initialization, therefore it is not -// protected by a mutex. +// +// This function is safe for concurrent access. func (p *Peer) Addr() string { + // The address doesn't change after initialization, therefore it is not + // protected by a mutex. return p.addr } // Inbound returns whether the peer is inbound. +// +// This function is safe for concurrent access. func (p *Peer) Inbound() bool { return p.inbound } -// Services returns the services flag of the peer. +// Services returns the services flag of the remote peer. +// +// This function is safe for concurrent access. func (p *Peer) Services() wire.ServiceFlag { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() @@ -422,7 +528,9 @@ func (p *Peer) Services() wire.ServiceFlag { } -// UserAgent returns the user agent of the peer. +// UserAgent returns the user agent of the remote peer. +// +// This function is safe for concurrent access. func (p *Peer) UserAgent() string { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() @@ -430,7 +538,9 @@ func (p *Peer) UserAgent() string { return p.userAgent } -// LastAnnouncedBlock returns the last announced block of the peer. +// LastAnnouncedBlock returns the last announced block of the remote peer. +// +// This function is safe for concurrent access. func (p *Peer) LastAnnouncedBlock() *wire.ShaHash { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -438,7 +548,9 @@ func (p *Peer) LastAnnouncedBlock() *wire.ShaHash { return p.lastAnnouncedBlock } -// LastPingNonce returns the last ping nonce of the peer. +// LastPingNonce returns the last ping nonce of the remote peer. +// +// This function is safe for concurrent access. func (p *Peer) LastPingNonce() uint64 { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -446,7 +558,9 @@ func (p *Peer) LastPingNonce() uint64 { return p.lastPingNonce } -// LastPingTime returns the last ping time of the peer. +// LastPingTime returns the last ping time of the remote peer. +// +// This function is safe for concurrent access. func (p *Peer) LastPingTime() time.Time { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -454,7 +568,9 @@ func (p *Peer) LastPingTime() time.Time { return p.lastPingTime } -// LastPingMicros returns the last ping micros of the peer. +// LastPingMicros returns the last ping micros of the remote peer. +// +// This function is safe for concurrent access. func (p *Peer) LastPingMicros() int64 { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -463,33 +579,40 @@ func (p *Peer) LastPingMicros() int64 { } // VersionKnown returns the whether or not the version of a peer is known -// locally. It is safe for concurrent access. +// locally. +// +// This function is safe for concurrent access. func (p *Peer) VersionKnown() bool { - p.statsMtx.RLock() - defer p.statsMtx.RUnlock() + p.flagsMtx.Lock() + defer p.flagsMtx.Unlock() return p.versionKnown } // VerAckReceived returns whether or not a verack message was received by the -// peer. It is safe for concurrent accecss. +// peer. +// +// This function is safe for concurrent access. func (p *Peer) VerAckReceived() bool { - p.statsMtx.RLock() - defer p.statsMtx.RUnlock() + p.flagsMtx.Lock() + defer p.flagsMtx.Unlock() return p.verAckReceived } -// ProtocolVersion returns the peer protocol version in a manner that is safe -// for concurrent access. +// ProtocolVersion returns the peer protocol version. +// +// This function is safe for concurrent access. func (p *Peer) ProtocolVersion() uint32 { - p.statsMtx.RLock() - defer p.statsMtx.RUnlock() + p.flagsMtx.Lock() + defer p.flagsMtx.Unlock() return p.protocolVersion } // LastBlock returns the last block of the peer. +// +// This function is safe for concurrent access. func (p *Peer) LastBlock() int32 { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -498,6 +621,8 @@ func (p *Peer) LastBlock() int32 { } // LastSend returns the last send time of the peer. +// +// This function is safe for concurrent access. func (p *Peer) LastSend() time.Time { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -506,6 +631,8 @@ func (p *Peer) LastSend() time.Time { } // LastRecv returns the last recv time of the peer. +// +// This function is safe for concurrent access. func (p *Peer) LastRecv() time.Time { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -513,7 +640,9 @@ func (p *Peer) LastRecv() time.Time { return p.lastRecv } -// BytesSent returns the bytes sent by the peer. +// BytesSent returns the total number of bytes sent by the peer. +// +// This function is safe for concurrent access. func (p *Peer) BytesSent() uint64 { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -521,7 +650,9 @@ func (p *Peer) BytesSent() uint64 { return p.bytesSent } -// BytesReceived returns the bytes received by the peer. +// BytesReceived returns the total number of bytes received by the peer. +// +// This function is safe for concurrent access. func (p *Peer) BytesReceived() uint64 { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -530,6 +661,8 @@ func (p *Peer) BytesReceived() uint64 { } // TimeConnected returns the time at which the peer connected. +// +// This function is safe for concurrent access. func (p *Peer) TimeConnected() time.Time { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -537,7 +670,11 @@ func (p *Peer) TimeConnected() time.Time { return p.timeConnected } -// TimeOffset returns the time offset from the peer. +// TimeOffset returns the number of seconds the local time was offset from the +// time the peer reported during the initial negotiation phase. Negative values +// indicate the remote peer's time is before the local time. +// +// This function is safe for concurrent access. func (p *Peer) TimeOffset() int64 { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -545,7 +682,10 @@ func (p *Peer) TimeOffset() int64 { return p.timeOffset } -// StartingHeight returns the starting height of the peer. +// StartingHeight returns the last known height the peer reported during the +// initial negotiation phase. +// +// This function is safe for concurrent access. func (p *Peer) StartingHeight() int32 { p.statsMtx.RLock() defer p.statsMtx.RUnlock() @@ -556,9 +696,13 @@ func (p *Peer) StartingHeight() int32 { // pushVersionMsg sends a version message to the connected peer using the // current state. func (p *Peer) pushVersionMsg() error { - _, blockNum, err := p.newestSha() - if err != nil { - return err + var blockNum int32 + if p.cfg.NewestBlock != nil { + var err error + _, blockNum, err = p.cfg.NewestBlock() + if err != nil { + return err + } } theirNa := p.na @@ -581,8 +725,19 @@ func (p *Peer) pushVersionMsg() error { if p.cfg.BestLocalAddress != nil { ourNA = p.cfg.BestLocalAddress(p.na) } + + // Generate a unique nonce for this peer so self connections can be + // detected. This is accomplished by adding it to a size-limited map of + // recently seen nonces. + nonce, err := wire.RandomUint64() + if err != nil { + fmt.Println(err) + return err + } + sentNonces.Add(nonce) + // Version message. - msg := wire.NewMsgVersion(ourNA, theirNa, p.nonce, int32(blockNum)) + msg := wire.NewMsgVersion(ourNA, theirNa, nonce, int32(blockNum)) msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion) // XXX: bitcoind appears to always enable the full node services flag @@ -727,7 +882,7 @@ func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string // the communications. func (p *Peer) handleVersionMsg(msg *wire.MsgVersion) { // Detect self connections. - if msg.Nonce == p.nonce { + if !allowSelfConns && sentNonces.Exists(msg.Nonce) { log.Debugf("Disconnecting peer connected to self %s", p) p.Disconnect() return @@ -766,19 +921,18 @@ func (p *Peer) handleVersionMsg(msg *wire.MsgVersion) { // Updating a bunch of stats. p.statsMtx.Lock() - // Negotiate the protocol version. - p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion)) - p.versionKnown = true - log.Debugf("Negotiated protocol version %d for peer %s", - p.protocolVersion, p) p.lastBlock = msg.LastBlock p.startingHeight = msg.LastBlock // Set the peer's time offset. p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix() p.statsMtx.Unlock() - // Update peer flags + // Negotiate the protocol version. p.flagsMtx.Lock() + p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion)) + p.versionKnown = true + log.Debugf("Negotiated protocol version %d for peer %s", + p.protocolVersion, p) // Set the peer's ID. p.id = atomic.AddInt32(&nodeCount, 1) // Set the supported services for the peer to what the remote peer @@ -813,369 +967,16 @@ func (p *Peer) handleVersionMsg(msg *wire.MsgVersion) { // Send verack. p.QueueMessage(wire.NewMsgVerAck(), nil) - - // TODO: Relay alerts. -} - -// AddVersionMsgListener adds a listener which is invoked when a peer receives -// a version bitcoin message. -func (p *Peer) AddVersionMsgListener(key string, listener func(p *Peer, msg *wire.MsgVersion)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.versionMsgListeners[key] = listener -} - -// RemoveVersionMsgListener removes the version message listener with the given -// key. -func (p *Peer) RemoveVersionMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.versionMsgListeners, key) -} - -// AddVerAckMsgListener adds a listener which is invoked when a peer receives -// a verack bitcoin message. -func (p *Peer) AddVerAckMsgListener(key string, listener func(p *Peer, msg *wire.MsgVerAck)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.verackMsgListeners[key] = listener -} - -// RemoveVerAckMsgListener removes the verack message listener with the given -// key. -func (p *Peer) RemoveVerAckMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.verackMsgListeners, key) -} - -// AddGetAddrMsgListener adds a listener which is invoked when a peer receives -// a getaddr bitcoin message. -func (p *Peer) AddGetAddrMsgListener(key string, listener func(p *Peer, msg *wire.MsgGetAddr)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.getAddrMsgListeners[key] = listener -} - -// RemoveGetAddrMsgListener removes the getaddr message listener with the given -// key. -func (p *Peer) RemoveGetAddrMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.getAddrMsgListeners, key) -} - -// AddAddrMsgListener adds a listener which is invoked when a peer receives -// a addr bitcoin message. -func (p *Peer) AddAddrMsgListener(key string, listener func(p *Peer, msg *wire.MsgAddr)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.addrMsgListeners[key] = listener -} - -// RemoveAddrMsgListener removes the addr message listener with the given -// key. -func (p *Peer) RemoveAddrMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.addrMsgListeners, key) -} - -// AddPingMsgListener adds a listener which is invoked when a peer receives -// a ping bitcoin message. -func (p *Peer) AddPingMsgListener(key string, listener func(p *Peer, msg *wire.MsgPing)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.pingMsgListeners[key] = listener -} - -// RemovePingMsgListener removes the ping message listener with the given -// key. -func (p *Peer) RemovePingMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.pingMsgListeners, key) -} - -// AddPongMsgListener adds a listener which is invoked when a peer receives -// a pong bitcoin message. -func (p *Peer) AddPongMsgListener(key string, listener func(p *Peer, msg *wire.MsgPong)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.pongMsgListeners[key] = listener -} - -// RemovePongMsgListener removes the pong message listener with the given -// key. -func (p *Peer) RemovePongMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.pongMsgListeners, key) -} - -// AddAlertMsgListener adds a listener which is invoked when a peer receives -// a alert bitcoin message. -func (p *Peer) AddAlertMsgListener(key string, listener func(p *Peer, msg *wire.MsgAlert)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.alertMsgListeners[key] = listener -} - -// RemoveAlertMsgListener removes the alert message listener with the given -// key. -func (p *Peer) RemoveAlertMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.alertMsgListeners, key) -} - -// AddMemPoolMsgListener adds a listener which is invoked when a peer receives -// a mempool bitcoin message. -func (p *Peer) AddMemPoolMsgListener(key string, listener func(p *Peer, msg *wire.MsgMemPool)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.memPoolMsgListeners[key] = listener -} - -// RemoveMemPoolMsgListener removes the mempool message listener with the given -// key. -func (p *Peer) RemoveMemPoolMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.memPoolMsgListeners, key) -} - -// AddTxMsgListener adds a listener which is invoked when a peer receives a tx -// bitcoin message . -func (p *Peer) AddTxMsgListener(key string, listener func(p *Peer, msg *wire.MsgTx)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.txMsgListeners[key] = listener -} - -// RemoveTxMsgListener removes the tx message listener with the given key. -func (p *Peer) RemoveTxMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.txMsgListeners, key) -} - -// AddBlockMsgListener adds a listener which is invoked when a peer receives a -// block bitcoin message . -func (p *Peer) AddBlockMsgListener(key string, listener func(p *Peer, msg *wire.MsgBlock, buf []byte)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.blockMsgListeners[key] = listener -} - -// RemoveBlockMsgListener removes the block message listener with the given key. -func (p *Peer) RemoveBlockMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.blockMsgListeners, key) -} - -// AddInvMsgListener adds a listener which is invoked when a peer receives a -// inv bitcoin message . -func (p *Peer) AddInvMsgListener(key string, listener func(p *Peer, msg *wire.MsgInv)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.invMsgListeners[key] = listener -} - -// RemoveInvMsgListener removes the inv message listener with the given key. -func (p *Peer) RemoveInvMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.invMsgListeners, key) -} - -// AddHeadersMsgListener adds a listener which is invoked when a peer receives -// a headers bitcoin message . -func (p *Peer) AddHeadersMsgListener(key string, listener func(p *Peer, msg *wire.MsgHeaders)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.headersMsgListeners[key] = listener -} - -// RemoveHeadersMsgListener removes the headers message listener with the given -// key. -func (p *Peer) RemoveHeadersMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.headersMsgListeners, key) -} - -// AddNotFoundMsgListener adds a listener which is invoked when a peer receives -// a not found bitcoin message . -func (p *Peer) AddNotFoundMsgListener(key string, listener func(p *Peer, msg *wire.MsgNotFound)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.notFoundMsgListeners[key] = listener -} - -// RemoveNotFoundMsgListener removes the not found message listener with the given -// key. -func (p *Peer) RemoveNotFoundMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.notFoundMsgListeners, key) -} - -// AddGetDataMsgListener adds a listener which is invoked when a peer receives -// a getdata bitcoin message . -func (p *Peer) AddGetDataMsgListener(key string, listener func(p *Peer, msg *wire.MsgGetData)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.getDataMsgListeners[key] = listener -} - -// RemoveGetDataMsgListener removes the getdata message listener with the given -// key. -func (p *Peer) RemoveGetDataMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.getDataMsgListeners, key) -} - -// AddGetBlocksMsgListener adds a listener which is invoked when a peer receives -// a getblocks bitcoin message . -func (p *Peer) AddGetBlocksMsgListener(key string, listener func(p *Peer, msg *wire.MsgGetBlocks)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.getBlocksMsgListeners[key] = listener -} - -// RemoveGetBlocksMsgListener removes the getblocks message listener with the given -// key. -func (p *Peer) RemoveGetBlocksMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.getBlocksMsgListeners, key) -} - -// AddGetHeadersMsgListener adds a listener which is invoked when a peer receives -// a getheaders bitcoin message . -func (p *Peer) AddGetHeadersMsgListener(key string, listener func(p *Peer, msg *wire.MsgGetHeaders)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.getHeadersMsgListeners[key] = listener -} - -// RemoveGetHeadersMsgListener removes the getheaders message listener with the given -// key. -func (p *Peer) RemoveGetHeadersMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.getHeadersMsgListeners, key) -} - -// AddFilterAddMsgListener adds a listener which is invoked when a peer -// receives a filteradd bitcoin message . -func (p *Peer) AddFilterAddMsgListener(key string, listener func(p *Peer, msg *wire.MsgFilterAdd)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.filterAddMsgListeners[key] = listener -} - -// RemoveFilterAddMsgListener removes the filteradd message listener with the -// given key. -func (p *Peer) RemoveFilterAddMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.filterAddMsgListeners, key) -} - -// AddFilterClearMsgListener adds a listener which is invoked when a peer -// receives a filterclear bitcoin message . -func (p *Peer) AddFilterClearMsgListener(key string, listener func(p *Peer, msg *wire.MsgFilterClear)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.filterClearMsgListeners[key] = listener -} - -// RemoveFilterClearMsgListener removes the filterclear message listener with the -// given key. -func (p *Peer) RemoveFilterClearMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.filterClearMsgListeners, key) -} - -// AddFilterLoadMsgListener adds a listener which is invoked when a peer -// receives a filterload bitcoin message . -func (p *Peer) AddFilterLoadMsgListener(key string, listener func(p *Peer, msg *wire.MsgFilterLoad)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.filterLoadMsgListeners[key] = listener } -// RemoveFilterLoadMsgListener removes the filterload message listener with the -// given key. -func (p *Peer) RemoveFilterLoadMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.filterLoadMsgListeners, key) -} - -// AddRejectMsgListener adds a listener which is invoked when a peer receives -// a reject bitcoin message. -func (p *Peer) AddRejectMsgListener(key string, listener func(p *Peer, msg *wire.MsgReject)) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - p.rejectMsgListeners[key] = listener -} - -// RemoveRejectMsgListener removes the reject message listener with the given -// key. -func (p *Peer) RemoveRejectMsgListener(key string) { - p.listenerMtx.Lock() - defer p.listenerMtx.Unlock() - - delete(p.rejectMsgListeners, key) -} - -// PushAddrMsg sends one, or more, addr message(s) to the connected peer using -// the provided addresses. +// PushAddrMsg sends an addr message to the connected peer using the provided +// addresses. This function is useful over manually sending the message via +// QueueMessage since it automatically limits the addresses to the maximum +// number allowed by the message and randomizes the chosen addresses when there +// are too many. No message will be sent if there are no entries in the +// provided addresses slice. +// +// This function is safe for concurrent access. func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) error { // Nothing to send. if len(addresses) == 0 { @@ -1186,8 +987,8 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) error { numAdded := 0 msg := wire.NewMsgAddr() for _, na := range addresses { - // If the maxAddrs limit has been reached, randomize the list - // with the remaining addresses. + // Randomize the list with the remaining addresses when the + // max addresses limit has been reached. if numAdded == wire.MaxAddrPerMsg { msg.AddrList[r.Intn(wire.MaxAddrPerMsg)] = na continue @@ -1211,17 +1012,17 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) error { // message. For older clients, it does nothing and anything other than failure // is considered a successful ping. func (p *Peer) handlePingMsg(msg *wire.MsgPing) { - // Only Reply with pong is message comes from a new enough client. + // Only reply with pong if the message is from a new enough client. if p.ProtocolVersion() > wire.BIP0031Version { // Include nonce from ping so pong can be identified. p.QueueMessage(wire.NewMsgPong(msg.Nonce), nil) } } -// handlePongMsg is invoked when a peer received a pong bitcoin message. -// recent clients (protocol version > BIP0031Version), and if we had send a ping -// previosuly we update our ping time statistics. If the client is too old or -// we had not send a ping we ignore it. +// handlePongMsg is invoked when a peer receives a pong bitcoin message. It +// updates the ping statistics as required for recent clients (protocol +// version > BIP0031Version). There is no effect for older clients or when a +// ping was not previously sent. func (p *Peer) handlePongMsg(msg *wire.MsgPong) { p.statsMtx.Lock() defer p.statsMtx.Unlock() @@ -1231,11 +1032,11 @@ func (p *Peer) handlePongMsg(msg *wire.MsgPong) { // the times of each ping. For now we just make a best effort and // only record stats if it was for the last ping sent. Any preceding // and overlapping pings will be ignored. It is unlikely to occur - // without large usage of the ping rpc call since we ping - // infrequently enough that if they overlap we would have timed out - // the peer. - if p.protocolVersion > wire.BIP0031Version && - p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce { + // without large usage of the ping rpc call since we ping infrequently + // enough that if they overlap we would have timed out the peer. + if p.ProtocolVersion() > wire.BIP0031Version && p.lastPingNonce != 0 && + msg.Nonce == p.lastPingNonce { + p.lastPingMicros = time.Now().Sub(p.lastPingTime).Nanoseconds() p.lastPingMicros /= 1000 // convert to usec. p.lastPingNonce = 0 @@ -1245,12 +1046,12 @@ func (p *Peer) handlePongMsg(msg *wire.MsgPong) { // readMessage reads the next bitcoin message from the peer with logging. func (p *Peer) readMessage() (wire.Message, []byte, error) { n, msg, buf, err := wire.ReadMessageN(p.conn, p.ProtocolVersion(), - p.btcnet) + p.chainParams.Net) p.statsMtx.Lock() p.bytesReceived += uint64(n) p.statsMtx.Unlock() - if p.cfg.OnRead != nil { - p.cfg.OnRead(n, &msg, err) + if p.cfg.Listeners.OnRead != nil { + p.cfg.Listeners.OnRead(p, n, msg, err) } if err != nil { return nil, nil, err @@ -1277,7 +1078,7 @@ func (p *Peer) readMessage() (wire.Message, []byte, error) { return msg, buf, nil } -// writeMessage sends a bitcoin Message to the peer with logging. +// writeMessage sends a bitcoin message to the peer with logging. func (p *Peer) writeMessage(msg wire.Message) { // Don't do anything if we're disconnecting. if atomic.LoadInt32(&p.disconnect) != 0 { @@ -1313,7 +1114,7 @@ func (p *Peer) writeMessage(msg wire.Message) { log.Tracef("%v", newLogClosure(func() string { var buf bytes.Buffer err := wire.WriteMessage(&buf, msg, p.ProtocolVersion(), - p.btcnet) + p.chainParams.Net) if err != nil { return err.Error() } @@ -1322,12 +1123,12 @@ func (p *Peer) writeMessage(msg wire.Message) { // Write the message to the peer. n, err := wire.WriteMessageN(p.conn, msg, p.ProtocolVersion(), - p.btcnet) + p.chainParams.Net) p.statsMtx.Lock() p.bytesSent += uint64(n) p.statsMtx.Unlock() - if p.cfg.OnWrite != nil { - p.cfg.OnWrite(n, &msg, err) + if p.cfg.Listeners.OnWrite != nil { + p.cfg.Listeners.OnWrite(p, n, msg, err) } if err != nil { p.Disconnect() @@ -1362,6 +1163,34 @@ func (p *Peer) isAllowedByRegression(err error) bool { return true } +// isRegTestNetwork returns whether or not the peer is running on the regression +// test network. +func (p *Peer) isRegTestNetwork() bool { + return p.chainParams.Net == wire.TestNet +} + +// shouldHandleReadError returns whether or not the passed error, which is +// expected to have come from reading from the remote peer in the inHandler, +// should be logged and responded to with a reject message. +func (p *Peer) shouldHandleReadError(err error) bool { + // No logging or reject message when the peer is being forcibly + // disconnected. + if atomic.LoadInt32(&p.disconnect) != 0 { + return false + } + + // No logging or reject message when the remote peer has been + // disconnected. + if err == io.EOF { + return false + } + if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { + return false + } + + return true +} + // inHandler handles all incoming messages for the peer. It must be run as a // goroutine. func (p *Peer) inHandler() { @@ -1370,54 +1199,52 @@ func (p *Peer) inHandler() { // to idleTimeout for all future messages. idleTimer := time.AfterFunc(negotiateTimeout, func() { if p.VersionKnown() { - log.Warnf("Peer %s no answer for %d minutes, "+ + log.Warnf("Peer %s no answer for %s -- disconnecting", + p, idleTimeout) + } else { + log.Warnf("Peer %s no valid version message for %s -- "+ "disconnecting", p, negotiateTimeout) } p.Disconnect() }) out: for atomic.LoadInt32(&p.disconnect) == 0 { + // Read a message and stop the idle timer as soon as the read + // is done. The timer is reset below for the next iteration if + // needed. rmsg, buf, err := p.readMessage() - // Stop the timer now, if we go around again we will reset it. idleTimer.Stop() if err != nil { // In order to allow regression tests with malformed // messages, don't disconnect the peer when we're in // regression test mode and the error is one of the // allowed errors. - if p.cfg.RegressionTest && p.isAllowedByRegression(err) { - log.Errorf("Allowed regression test "+ - "error from %s: %v", p, err) + if p.isRegTestNetwork() && p.isAllowedByRegression(err) { + log.Errorf("Allowed regression test error "+ + "from %s: %v", p, err) idleTimer.Reset(idleTimeout) continue } - // Only log the error and possibly send reject message - // if we're not forcibly disconnecting. - if atomic.LoadInt32(&p.disconnect) == 0 { + // Only log the error and send reject message if the + // local peer is not forcibly disconnecting and the + // remote peer has not disconnected. + if p.shouldHandleReadError(err) { errMsg := fmt.Sprintf("Can't read message "+ "from %s: %v", p, err) log.Errorf(errMsg) - // Only send the reject message if it's not - // because the remote client disconnected. - if err != io.EOF { - // Push a reject message for the - // malformed message and wait for the - // message to be sent before - // disconnecting. - // - // NOTE: Ideally this would include the - // command in the header if at least - // that much of the message was valid, - // but that is not currently exposed by - // wire, so just used malformed for the - // command. - p.PushRejectMsg("malformed", - wire.RejectMalformed, errMsg, - nil, true) - } - + // Push a reject message for the malformed + // message and wait for the message to be sent + // before disconnecting. + // + // NOTE: Ideally this would include the command + // in the header if at least that much of the + // message was valid, but that is not currently + // exposed by wire, so just used malformed for + // the command. + p.PushRejectMsg("malformed", + wire.RejectMalformed, errMsg, nil, true) } break out } @@ -1441,205 +1268,149 @@ out: switch msg := rmsg.(type) { case *wire.MsgVersion: p.handleVersionMsg(msg) - - p.listenerMtx.Lock() - for key, listener := range p.versionMsgListeners { - log.Tracef("Running %s listener %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnVersion != nil { + p.cfg.Listeners.OnVersion(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgVerAck: - p.statsMtx.RLock() + p.flagsMtx.Lock() versionSent := p.versionSent - p.statsMtx.RUnlock() - + p.flagsMtx.Unlock() if !versionSent { log.Infof("Received 'verack' from peer %v "+ - "before version was sent -- disconnecting", p) + "before version was sent -- "+ + "disconnecting", p) break out } - // No read lock is necessary because verAckReceived is not written - // to in any other goroutine + + // No read lock is necessary because verAckReceived is + // not written to in any other goroutine. if p.verAckReceived { log.Infof("Already received 'verack' from "+ "peer %v -- disconnecting", p) break out } - p.statsMtx.Lock() + p.flagsMtx.Lock() p.verAckReceived = true - p.statsMtx.Unlock() - - p.listenerMtx.Lock() - for key, listener := range p.verackMsgListeners { - log.Tracef("Running %s listener %s", key, p) - listener(p, msg) + p.flagsMtx.Unlock() + if p.cfg.Listeners.OnVerAck != nil { + p.cfg.Listeners.OnVerAck(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgGetAddr: - p.listenerMtx.Lock() - for key, listener := range p.getAddrMsgListeners { - log.Tracef("Running %s listener %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnGetAddr != nil { + p.cfg.Listeners.OnGetAddr(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgAddr: - p.listenerMtx.Lock() - for key, listener := range p.addrMsgListeners { - log.Tracef("Running %s listener %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnAddr != nil { + p.cfg.Listeners.OnAddr(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgPing: p.handlePingMsg(msg) - p.listenerMtx.Lock() - for key, listener := range p.pingMsgListeners { - log.Tracef("Running %s listener %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnPing != nil { + p.cfg.Listeners.OnPing(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgPong: p.handlePongMsg(msg) - p.listenerMtx.Lock() - for key, listener := range p.pongMsgListeners { - log.Tracef("Running %s listener %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnPong != nil { + p.cfg.Listeners.OnPong(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgAlert: // Note: The reference client currently bans peers that send alerts // not signed with its key. We could verify against their key, but // since the reference client is currently unwilling to support // other implementions' alert messages, we will not relay theirs. - p.listenerMtx.Lock() - for key, listener := range p.alertMsgListeners { - log.Tracef("Running %s listener %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnAlert != nil { + p.cfg.Listeners.OnAlert(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgMemPool: - p.listenerMtx.Lock() - for key, listener := range p.memPoolMsgListeners { - log.Tracef("Running %s listener %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnMemPool != nil { + p.cfg.Listeners.OnMemPool(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgTx: - p.listenerMtx.Lock() - for key, listener := range p.txMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnTx != nil { + p.cfg.Listeners.OnTx(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgBlock: if p.blockStallCancel != nil { close(p.blockStallCancel) } - p.listenerMtx.Lock() - for key, listener := range p.blockMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg, buf) + if p.cfg.Listeners.OnBlock != nil { + p.cfg.Listeners.OnBlock(p, msg, buf) } - p.listenerMtx.Unlock() case *wire.MsgInv: - p.listenerMtx.Lock() - for key, listener := range p.invMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnInv != nil { + p.cfg.Listeners.OnInv(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgHeaders: - p.listenerMtx.Lock() - for key, listener := range p.headersMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnHeaders != nil { + p.cfg.Listeners.OnHeaders(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgNotFound: - p.listenerMtx.Lock() - for key, listener := range p.notFoundMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnNotFound != nil { + p.cfg.Listeners.OnNotFound(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgGetData: - p.listenerMtx.Lock() - for key, listener := range p.getDataMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnGetData != nil { + p.cfg.Listeners.OnGetData(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgGetBlocks: - p.listenerMtx.Lock() - for key, listener := range p.getBlocksMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnGetBlocks != nil { + p.cfg.Listeners.OnGetBlocks(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgGetHeaders: - p.listenerMtx.Lock() - for key, listener := range p.getHeadersMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnGetHeaders != nil { + p.cfg.Listeners.OnGetHeaders(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgFilterAdd: - p.listenerMtx.Lock() - for key, listener := range p.filterAddMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnFilterAdd != nil { + p.cfg.Listeners.OnFilterAdd(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgFilterClear: - p.listenerMtx.Lock() - for key, listener := range p.filterClearMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnFilterClear != nil { + p.cfg.Listeners.OnFilterClear(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgFilterLoad: - p.listenerMtx.Lock() - for key, listener := range p.filterLoadMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnFilterLoad != nil { + p.cfg.Listeners.OnFilterLoad(p, msg) + } + + case *wire.MsgMerkleBlock: + if p.cfg.Listeners.OnMerkleBlock != nil { + p.cfg.Listeners.OnMerkleBlock(p, msg) } - p.listenerMtx.Unlock() case *wire.MsgReject: - p.listenerMtx.Lock() - for key, listener := range p.rejectMsgListeners { - log.Tracef("Running %s listener for %s", key, p) - listener(p, msg) + if p.cfg.Listeners.OnReject != nil { + p.cfg.Listeners.OnReject(p, msg) } - p.listenerMtx.Unlock() default: log.Debugf("Received unhandled message of type %v:", rmsg.Command()) } - // ok we got a message, reset the timer. - // timer just calls p.Disconnect() after logging. + // A message was received so reset the idle timer. idleTimer.Reset(idleTimeout) } + // Ensure the idle timer is stopped to avoid leaking the resource. idleTimer.Stop() // Ensure connection is closed. @@ -1670,9 +1441,7 @@ func (p *Peer) queueHandler() { // To avoid duplication below. queuePacket := func(msg outMsg, list *list.List, waiting bool) bool { if !waiting { - log.Tracef("%s: sending to outHandler", p) p.sendQueue <- msg - log.Tracef("%s: sent to outHandler", p) } else { list.PushBack(msg) } @@ -1688,8 +1457,6 @@ out: // This channel is notified when a message has been sent across // the network socket. case <-p.sendDoneQueue: - log.Tracef("%s: acked by outhandler", p) - // No longer waiting if there are no more messages // in the pending messages queue. next := pendingMsgs.Front() @@ -1701,9 +1468,7 @@ out: // Notify the outHandler about the next item to // asynchronously send. val := pendingMsgs.Remove(next) - log.Tracef("%s: sending to outHandler", p) p.sendQueue <- val.(outMsg) - log.Tracef("%s: sent to outHandler", p) case iv := <-p.outputInvChan: // No handshake? They'll find out soon enough. @@ -1777,10 +1542,21 @@ cleanup: break cleanup } } - p.queueWg.Done() + close(p.queueQuit) log.Tracef("Peer queue handler done for %s", p) } +// invContainsBlock returns true if the passed InvList contains an Inv of type +// InvTypeBlock. Otherwise, it returns false. +func invContainsBlock(invList []*wire.InvVect) bool { + for _, inv := range invList { + if inv.Type == wire.InvTypeBlock { + return true + } + } + return false +} + // outHandler handles all outgoing messages for the peer. It must be run as a // goroutine. It uses a buffered channel to serialize output messages while // allowing the sender to continue running asynchronously. @@ -1800,37 +1576,39 @@ out: for { select { case msg := <-p.sendQueue: - // If the message is one we should get a reply for - // then reset the timer, we only want to send pings - // when otherwise we would not receive a reply from - // the peer. We specifically do not count block or inv - // messages here since they are not sure of a reply if - // the inv is of no interest explicitly solicited invs - // should elicit a reply but we don't track them - // specially. - log.Tracef("%s: received from queuehandler", p) + // Reset the ping timer for messages that expect a + // reply since we only want to send pings when we would + // otherwise not receive a reply from the peer. The + // getblocks and inv messages are specifically not + // counted here since there is no guarantee they will + // result in a reply. reset := true switch m := msg.msg.(type) { case *wire.MsgVersion: - // should get a verack - p.statsMtx.Lock() + // Expects a verack message. Also set the flag + // which indicates the version has been sent. + p.flagsMtx.Lock() p.versionSent = true - p.statsMtx.Unlock() + p.flagsMtx.Unlock() + case *wire.MsgGetAddr: - // should get addresses + // Expects an addr message. + case *wire.MsgPing: - // expects pong - // Also set up statistics. - p.statsMtx.Lock() - if p.protocolVersion > wire.BIP0031Version { + // Expects a pong message in later protocol + // versions. Also set up statistics. + if p.ProtocolVersion() > wire.BIP0031Version { + p.statsMtx.Lock() p.lastPingNonce = m.Nonce p.lastPingTime = time.Now() + p.statsMtx.Unlock() } - p.statsMtx.Unlock() + case *wire.MsgMemPool: - // Should return an inv. + // Expects an inv message. + case *wire.MsgGetData: - // Should get us block, tx, or not found. + // Expects a block, tx, or notfound message. // If the blockStallTimer has not already been // started, then initialize the timer to fire @@ -1845,14 +1623,17 @@ out: p.blockStallTimer = time.After(stallTimeout) p.blockStallCancel = make(chan struct{}) } + case *wire.MsgGetHeaders: - // Should get us headers back. + // Expects a headers message. + default: // Not one of the above, no sure reply. // We want to ping if nothing else // interesting happens. reset = false } + if reset { pingTimer.Reset(pingTimeout) } @@ -1863,15 +1644,14 @@ out: if msg.doneChan != nil { msg.doneChan <- struct{}{} } - log.Tracef("%s: acking queuehandler", p) p.sendDoneQueue <- struct{}{} - log.Tracef("%s: acked queuehandler", p) case timeout := <-p.blockStallActivate: log.Debugf("Activating block stall timer (%v "+ "seconds) for: %v", timeout, p) blockStallActive = true stallTimeout = timeout + case <-p.blockStallCancel: // The inHandler received a MsgBlock before // BlockStallTimeout seconds had elapsed. So we set the @@ -1881,6 +1661,7 @@ out: p.blockStallTimer = nil p.blockStallCancel = nil blockStallActive = false + case <-p.blockStallTimer: // The inHandler didn't receive a MsgBlock before // BlockStallTimeout seconds had elapsed. So we @@ -1889,6 +1670,7 @@ out: "block download, no block response for %v "+ "seconds disconnecting", p, BlockStallTimeout) p.Disconnect() + case <-p.quit: break out } @@ -1896,10 +1678,10 @@ out: pingTimer.Stop() - p.queueWg.Wait() + <-p.queueQuit // Drain any wait channels before we go away so we don't leave something - // waiting for us. We have waited on queueWg and thus we can be sure + // waiting for us. We have waited on queueQuit and thus we can be sure // that we will not miss anything sent on sendQueue. cleanup: for { @@ -1917,15 +1699,14 @@ cleanup: log.Tracef("Peer output handler done for %s", p) } -// QueueMessage adds the passed bitcoin message to the peer send queue. It -// uses a buffered channel to communicate with the output handler goroutine so -// it is automatically rate limited and safe for concurrent access. +// QueueMessage adds the passed bitcoin message to the peer send queue. +// +// This function is safe for concurrent access. func (p *Peer) QueueMessage(msg wire.Message, doneChan chan struct{}) { - // Avoid risk of deadlock if goroutine already exited. The goroutine + // Avoid risk of deadlock if goroutine already exited. The goroutine // we will be sending to hangs around until it knows for a fact that - // it is marked as disconnected. *then* it drains the channels. + // it is marked as disconnected and *then* it drains the channels. if !p.Connected() { - // avoid deadlock... if doneChan != nil { go func() { doneChan <- struct{}{} @@ -1938,18 +1719,19 @@ func (p *Peer) QueueMessage(msg wire.Message, doneChan chan struct{}) { // QueueInventory adds the passed inventory to the inventory send queue which // might not be sent right away, rather it is trickled to the peer in batches. -// Inventory that the peer is already known to have is ignored. It is safe for -// concurrent access. +// Inventory that the peer is already known to have is ignored. +// +// This function is safe for concurrent access. func (p *Peer) QueueInventory(invVect *wire.InvVect) { - // Don't add the inventory to the send queue if the peer is - // already known to have it. + // Don't add the inventory to the send queue if the peer is already + // known to have it. if p.isKnownInventory(invVect) { return } - // Avoid risk of deadlock if goroutine already exited. The goroutine + // Avoid risk of deadlock if goroutine already exited. The goroutine // we will be sending to hangs around until it knows for a fact that - // it is marked as disconnected. *then* it drains the channels. + // it is marked as disconnected and *then* it drains the channels. if !p.Connected() { return } @@ -1957,20 +1739,34 @@ func (p *Peer) QueueInventory(invVect *wire.InvVect) { p.outputInvChan <- invVect } +// Connect uses the given conn to connect to the peer. +func (p *Peer) Connect(conn net.Conn) error { + p.conn = conn + p.timeConnected = time.Now() + + // Connection was successful so log it and start peer. + log.Debugf("Connected to %s", p.conn.RemoteAddr()) + atomic.AddInt32(&p.connected, 1) + return p.Start() +} + // Connected returns whether or not the peer is currently connected. +// +// This function is safe for concurrent access. func (p *Peer) Connected() bool { return atomic.LoadInt32(&p.connected) != 0 && atomic.LoadInt32(&p.disconnect) == 0 } -// Disconnect disconnects the peer by closing the connection. It also sets -// a flag so the impending shutdown can be detected. +// Disconnect disconnects the peer by closing the connection. Calling this +// function when the peer is already disconnected or in the process of +// disconnecting will have no effect. func (p *Peer) Disconnect() { if atomic.AddInt32(&p.disconnect, 1) != 1 { return } - log.Tracef("disconnecting %s", p) + log.Tracef("Disconnecting %s", p) if atomic.LoadInt32(&p.connected) != 0 { p.conn.Close() } @@ -1999,9 +1795,6 @@ func (p *Peer) Start() error { // Start processing input and output. go p.inHandler() - // queueWg is kept so that outHandler knows when the queue has exited so - // it can drain correctly. - p.queueWg.Add(1) go p.queueHandler() go p.outHandler() @@ -2014,26 +1807,32 @@ func (p *Peer) Shutdown() { p.Disconnect() } -// WaitForShutdown waits until the peer is shutdown. +// WaitForShutdown waits until the peer has completely shutdown. This will +// happen if either the local or remote side has been disconnected or the peer +// is forcibly shutdown via Shutdown. func (p *Peer) WaitForShutdown() { <-p.quit } -// newPeerBase returns a new base bitcoin peer for the provided server and -// inbound flag. This is used by the NewInboundPeer and NewOutboundPeer -// functions to perform base setup needed by both types of peers. -func newPeerBase(cfg *Config, nonce uint64, inbound bool) *Peer { - // If provided, use the configured version, else default to the max - // supported version. - var protocolVersion uint32 +// newPeerBase returns a new base bitcoin peer based on the inbound flag. This +// is used by the NewInboundPeer and NewOutboundPeer functions to perform base +// setup needed by both types of peers. +func newPeerBase(cfg *Config, inbound bool) *Peer { + // Default to the max supported protocol version. Override to the + // version specified by the caller if configured. + protocolVersion := uint32(MaxProtocolVersion) if cfg.ProtocolVersion != 0 { protocolVersion = cfg.ProtocolVersion - } else { - protocolVersion = MaxProtocolVersion + } + + // Set the chain parameters to testnet if the caller did not specify + // any. + chainParams := &chaincfg.TestNet3Params + if cfg.ChainParams == nil { + chainParams = cfg.ChainParams } p := Peer{ - btcnet: cfg.Net, inbound: inbound, knownInventory: NewMruInventoryMap(maxKnownInventory), outputQueue: make(chan outMsg, outputBufferSize), @@ -2041,43 +1840,21 @@ func newPeerBase(cfg *Config, nonce uint64, inbound bool) *Peer { sendDoneQueue: make(chan struct{}, 1), // nonblocking sync outputInvChan: make(chan *wire.InvVect, outputBufferSize), blockStallActivate: make(chan time.Duration), + queueQuit: make(chan struct{}), quit: make(chan struct{}), - stats: stats{ - protocolVersion: protocolVersion, - }, - newestSha: cfg.NewestBlock, - nonce: nonce, - cfg: cfg, - services: cfg.Services, - - getAddrMsgListeners: make(map[string]func(*Peer, *wire.MsgGetAddr)), - addrMsgListeners: make(map[string]func(*Peer, *wire.MsgAddr)), - pingMsgListeners: make(map[string]func(*Peer, *wire.MsgPing)), - pongMsgListeners: make(map[string]func(*Peer, *wire.MsgPong)), - alertMsgListeners: make(map[string]func(*Peer, *wire.MsgAlert)), - memPoolMsgListeners: make(map[string]func(*Peer, *wire.MsgMemPool)), - txMsgListeners: make(map[string]func(*Peer, *wire.MsgTx)), - blockMsgListeners: make(map[string]func(*Peer, *wire.MsgBlock, []byte)), - invMsgListeners: make(map[string]func(*Peer, *wire.MsgInv)), - headersMsgListeners: make(map[string]func(*Peer, *wire.MsgHeaders)), - notFoundMsgListeners: make(map[string]func(*Peer, *wire.MsgNotFound)), - getDataMsgListeners: make(map[string]func(*Peer, *wire.MsgGetData)), - getBlocksMsgListeners: make(map[string]func(*Peer, *wire.MsgGetBlocks)), - getHeadersMsgListeners: make(map[string]func(*Peer, *wire.MsgGetHeaders)), - filterAddMsgListeners: make(map[string]func(*Peer, *wire.MsgFilterAdd)), - filterClearMsgListeners: make(map[string]func(*Peer, *wire.MsgFilterClear)), - filterLoadMsgListeners: make(map[string]func(*Peer, *wire.MsgFilterLoad)), - versionMsgListeners: make(map[string]func(*Peer, *wire.MsgVersion)), - verackMsgListeners: make(map[string]func(*Peer, *wire.MsgVerAck)), - rejectMsgListeners: make(map[string]func(*Peer, *wire.MsgReject)), + stats: stats{}, + cfg: *cfg, // Copy so caller can't mutate. + chainParams: chainParams, + services: cfg.Services, + protocolVersion: protocolVersion, } return &p } // NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin // processing incoming and outgoing messages. -func NewInboundPeer(cfg *Config, nonce uint64, conn net.Conn) *Peer { - p := newPeerBase(cfg, nonce, true) +func NewInboundPeer(cfg *Config, conn net.Conn) *Peer { + p := newPeerBase(cfg, true) p.conn = conn p.addr = conn.RemoteAddr().String() p.timeConnected = time.Now() @@ -2086,31 +1863,9 @@ func NewInboundPeer(cfg *Config, nonce uint64, conn net.Conn) *Peer { } // NewOutboundPeer returns a new outbound bitcoin peer. -func NewOutboundPeer(cfg *Config, nonce uint64, na *wire.NetAddress) *Peer { - p := newPeerBase(cfg, nonce, false) +func NewOutboundPeer(cfg *Config, na *wire.NetAddress) *Peer { + p := newPeerBase(cfg, false) p.na = na p.addr = fmt.Sprintf("%v:%v", na.IP, na.Port) return p } - -// Connect uses the given conn to connect to the peer. -func (p *Peer) Connect(conn net.Conn) error { - p.conn = conn - p.timeConnected = time.Now() - - // Connection was successful so log it and start peer. - log.Debugf("Connected to %s", p.conn.RemoteAddr()) - atomic.AddInt32(&p.connected, 1) - return p.Start() -} - -// invContainsBlock returns true if the passed InvList contains an Inv of type -// InvTypeBlock. Otherwise, it returns false. -func invContainsBlock(invList []*wire.InvVect) bool { - for _, inv := range invList { - if inv.Type == wire.InvTypeBlock { - return true - } - } - return false -} diff --git a/peer/peer_test.go b/peer/peer_test.go index c92a0c06fcb..8dbe59ec124 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -5,7 +5,6 @@ package peer_test import ( - "bytes" "errors" "io" "net" @@ -13,7 +12,7 @@ import ( "testing" "time" - "github.com/btcsuite/btcd/addrmgr" + "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/go-socks/socks" @@ -87,10 +86,6 @@ func pipe(c1, c2 *conn) (*conn, *conn) { return c1, c2 } -// addrMgr is the test address manager. -// It is global so as to be accessible from mock listeners. -var addrMgr = addrmgr.New("test", lookupFunc) - // peerStats holds the expected peer stats used for testing peer. type peerStats struct { wantID int32 @@ -110,160 +105,8 @@ type peerStats struct { wantTimeOffset int64 } -// TestPeerConnection tests the activity between inbound and outbound peers -// using a mock connection. -func TestPeerConnection(t *testing.T) { - peerCfg := &peer.Config{ - NewestBlock: newestSha, - BestLocalAddress: addrMgr.GetBestLocalAddress, - UserAgentName: "peer", - UserAgentVersion: "1.0", - Net: wire.MainNet, - Services: wire.SFNodeNetwork, - } - na, err := addrMgr.HostToNetAddress("127.0.0.1", uint16(8333), peerCfg.Services) - if err != nil { - t.Errorf("HostToNetAddress: unexpected err %v\n", err) - return - } - - wantStats := peerStats{ - wantID: 0, - wantAddr: "127.0.0.1:8333", - wantUserAgent: "/btcwire:0.2.1/peer:1.0/", - wantInbound: true, - wantServices: wire.SFNodeNetwork, - wantProtocolVersion: uint32(70002), - wantConnected: true, - wantVersionKnown: true, - wantVerAckReceived: true, - wantLastBlock: int32(234439), - wantStartingHeight: int32(234439), - wantLastPingTime: *new(time.Time), - wantLastPingNonce: uint64(0), - wantLastPingMicros: int64(0), - wantTimeOffset: int64(0), - } - - tests := []struct { - name string - getPeers func() (*peer.Peer, *peer.Peer, error) - getInboundStats func(s peerStats) peerStats - getOutboundStats func(s peerStats) peerStats - }{ - { - "basic handshake", - func() (*peer.Peer, *peer.Peer, error) { - c1, c2 := pipe( - &conn{raddr: "127.0.0.1:8333"}, - &conn{raddr: "127.0.0.1:8333"}, - ) - p1 := peer.NewInboundPeer(peerCfg, 0, c1) - err := p1.Start() - if err != nil { - return nil, nil, err - } - p2 := peer.NewOutboundPeer(peerCfg, 1, na) - if err := p2.Connect(c2); err != nil { - return nil, nil, err - } - return p1, p2, nil - }, - func(ps peerStats) peerStats { - ps.wantID = 1 - return ps - }, - func(ps peerStats) peerStats { - ps.wantID = 2 - return ps - }, - }, - { - "proxied inbound connection", - func() (*peer.Peer, *peer.Peer, error) { - // Pass a mock proxied connection to the inbound peer - c1, c2 := pipe( - &conn{raddr: ":8333", proxy: true}, - &conn{raddr: "127.0.0.1:8333"}, - ) - p1 := peer.NewInboundPeer(peerCfg, 0, c1) - err := p1.Start() - if err != nil { - return nil, nil, err - } - p2 := peer.NewOutboundPeer(peerCfg, 1, na) - if err := p2.Connect(c2); err != nil { - return nil, nil, err - } - return p1, p2, nil - }, - func(ps peerStats) peerStats { - ps.wantAddr = ":8333" - ps.wantID = 3 - return ps - }, - func(ps peerStats) peerStats { - ps.wantID = 4 - return ps - }, - }, - } - t.Logf("Running %d tests", len(tests)) - for i, test := range tests { - t.Logf("Running test: %s", test.name) - p1, p2, err := test.getPeers() - if err != nil { - t.Errorf("TestPeerConnection: #%d %s unexpected err - %v", - i, test.name, err) - continue - } - - // Wait until veracks are exchanged - ready := make(chan struct{}, 1) - p1.AddVerAckMsgListener("handleVerAckMsg", func(p *peer.Peer, - msg *wire.MsgVerAck) { - ready <- struct{}{} - }) - p2.AddVerAckMsgListener("handleVerAckMsg", func(p *peer.Peer, - msg *wire.MsgVerAck) { - ready <- struct{}{} - }) - for i := 0; i < 2; i++ { - select { - case <-ready: - case <-time.After(time.Second * 1): - t.Errorf("TestPeerConnection: #%d - verack timeout", i) - continue - } - } - - // Test peer flags and stats - testPeer(t, p1, test.getInboundStats(wantStats)) - testPeer(t, p2, test.getOutboundStats(wantStats)) - - if p1.Inbound() != true { - t.Errorf("testPeer: wrong Inbound - want true") - return - } - if p2.Inbound() != false { - t.Errorf("testPeer: wrong Inbound - want false") - return - } - - // Test listeners - testListeners(t, p1, p2) - p1.Shutdown() - p2.Shutdown() - } -} - // testPeer tests the given peer's flags and stats func testPeer(t *testing.T, p *peer.Peer, s peerStats) { - if p.ID() != s.wantID { - t.Errorf("testPeer: wrong ID - got %v, want %v", p.ID(), s.wantID) - return - } - if p.Addr() != s.wantAddr { t.Errorf("testPeer: wrong Addr - got %v, want %v", p.Addr(), s.wantAddr) return @@ -335,218 +178,311 @@ func testPeer(t *testing.T, p *peer.Peer, s peerStats) { p.TimeConnected() p.BytesSent() p.BytesReceived() + p.StatsSnapshot() +} - stats := p.StatsSnapshot() +// TestPeerConnection tests connection between inbound and outbound peers. +func TestPeerConnection(t *testing.T) { + verack := make(chan struct{}, 1) + peerCfg := &peer.Config{ + Listeners: peer.MessageListeners{ + OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) { + verack <- struct{}{} + }, + }, + UserAgentName: "peer", + UserAgentVersion: "1.0", + ChainParams: &chaincfg.MainNetParams, + Services: 0, + } + na := wire.NewNetAddressIPPort(net.IP{10, 0, 0, 1}, uint16(8333), 0) + wantStats := peerStats{ + wantAddr: "10.0.0.1:8333", + wantUserAgent: wire.DefaultUserAgent + "peer:1.0/", + wantInbound: true, + wantServices: 0, + wantProtocolVersion: peer.MaxProtocolVersion, + wantConnected: true, + wantVersionKnown: true, + wantVerAckReceived: true, + wantLastPingTime: *new(time.Time), + wantLastPingNonce: uint64(0), + wantLastPingMicros: int64(0), + wantTimeOffset: int64(0), + } + tests := []struct { + name string + setup func() (*peer.Peer, *peer.Peer, error) + }{ + { + "basic handshake", + func() (*peer.Peer, *peer.Peer, error) { + inConn, outConn := pipe( + &conn{raddr: "10.0.0.1:8333"}, + &conn{raddr: "10.0.0.2:8333"}, + ) + inPeer := peer.NewInboundPeer(peerCfg, inConn) + err := inPeer.Start() + if err != nil { + return nil, nil, err + } + outPeer := peer.NewOutboundPeer(peerCfg, na) + if err := outPeer.Connect(outConn); err != nil { + return nil, nil, err + } + for i := 0; i < 2; i++ { + select { + case <-verack: + case <-time.After(time.Second * 1): + return nil, nil, errors.New("verack timeout") + } + } + return inPeer, outPeer, nil + }, + }, + { + "socks proxy", + func() (*peer.Peer, *peer.Peer, error) { + inConn, outConn := pipe( + &conn{raddr: "10.0.0.1:8333", proxy: true}, + &conn{raddr: "10.0.0.2:8333"}, + ) + inPeer := peer.NewInboundPeer(peerCfg, inConn) + err := inPeer.Start() + if err != nil { + return nil, nil, err + } + outPeer := peer.NewOutboundPeer(peerCfg, na) + if err := outPeer.Connect(outConn); err != nil { + return nil, nil, err + } + for i := 0; i < 2; i++ { + select { + case <-verack: + case <-time.After(time.Second * 1): + return nil, nil, errors.New("verack timeout") + } + } + return inPeer, outPeer, nil + }, + }, + } + t.Logf("Running %d tests", len(tests)) + for i, test := range tests { + inPeer, outPeer, err := test.setup() + if err != nil { + t.Errorf("TestPeerConnection setup #%d: unexpected err %v\n", i, err) + return + } + testPeer(t, inPeer, wantStats) + testPeer(t, outPeer, wantStats) - if stats.ID != s.wantID { - t.Errorf("testPeer: wrong ID - got %v, want %v", stats.ID, s.wantID) - return + inPeer.Shutdown() + outPeer.Shutdown() } } -// testListeners tests that custom message listeners are working as expected. -func testListeners(t *testing.T, p1 *peer.Peer, p2 *peer.Peer) { +// TestPeerListeners tests that the peer listeners are called as expected. +func TestPeerListeners(t *testing.T) { + verack := make(chan struct{}, 1) + ok := make(chan wire.Message, 20) + peerCfg := &peer.Config{ + Listeners: peer.MessageListeners{ + OnGetAddr: func(p *peer.Peer, msg *wire.MsgGetAddr) { + ok <- msg + }, + OnAddr: func(p *peer.Peer, msg *wire.MsgAddr) { + ok <- msg + }, + OnPing: func(p *peer.Peer, msg *wire.MsgPing) { + ok <- msg + }, + OnPong: func(p *peer.Peer, msg *wire.MsgPong) { + ok <- msg + }, + OnAlert: func(p *peer.Peer, msg *wire.MsgAlert) { + ok <- msg + }, + OnMemPool: func(p *peer.Peer, msg *wire.MsgMemPool) { + ok <- msg + }, + OnTx: func(p *peer.Peer, msg *wire.MsgTx) { + ok <- msg + }, + OnBlock: func(p *peer.Peer, msg *wire.MsgBlock, buf []byte) { + ok <- msg + }, + OnInv: func(p *peer.Peer, msg *wire.MsgInv) { + ok <- msg + }, + OnHeaders: func(p *peer.Peer, msg *wire.MsgHeaders) { + ok <- msg + }, + OnNotFound: func(p *peer.Peer, msg *wire.MsgNotFound) { + ok <- msg + }, + OnGetData: func(p *peer.Peer, msg *wire.MsgGetData) { + ok <- msg + }, + OnGetBlocks: func(p *peer.Peer, msg *wire.MsgGetBlocks) { + ok <- msg + }, + OnGetHeaders: func(p *peer.Peer, msg *wire.MsgGetHeaders) { + ok <- msg + }, + OnFilterAdd: func(p *peer.Peer, msg *wire.MsgFilterAdd) { + ok <- msg + }, + OnFilterClear: func(p *peer.Peer, msg *wire.MsgFilterClear) { + ok <- msg + }, + OnFilterLoad: func(p *peer.Peer, msg *wire.MsgFilterLoad) { + ok <- msg + }, + OnMerkleBlock: func(p *peer.Peer, msg *wire.MsgMerkleBlock) { + ok <- msg + }, + OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) { + ok <- msg + }, + OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) { + verack <- struct{}{} + }, + OnReject: func(p *peer.Peer, msg *wire.MsgReject) { + ok <- msg + }, + }, + UserAgentName: "peer", + UserAgentVersion: "1.0", + ChainParams: &chaincfg.MainNetParams, + Services: 0, + } + inConn, outConn := pipe( + &conn{raddr: "10.0.0.1:8333"}, + &conn{raddr: "10.0.0.2:8333"}, + ) + na := wire.NewNetAddressIPPort(net.IP{10, 0, 0, 1}, uint16(8333), 0) + inPeer := peer.NewInboundPeer(peerCfg, inConn) + err := inPeer.Start() + if err != nil { + t.Errorf("TestPeerListeners: unexpected err %v\n", err) + return + } + peerCfg.Listeners = peer.MessageListeners{ + OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) { + verack <- struct{}{} + }, + } + outPeer := peer.NewOutboundPeer(peerCfg, na) + if err := outPeer.Connect(outConn); err != nil { + t.Errorf("TestPeerListeners: unexpected err %v\n", err) + return + } + for i := 0; i < 2; i++ { + select { + case <-verack: + case <-time.After(time.Second * 1): + t.Errorf("TestPeerListeners: verack timeout\n") + return + } + } + tests := []struct { - handler string - msg wire.Message + listener string + msg wire.Message }{ { - "handleGetAddr", + "OnGetAddr", wire.NewMsgGetAddr(), }, { - "handleAddr", + "OnAddr", wire.NewMsgAddr(), }, { - "handlePing", + "OnPing", wire.NewMsgPing(42), }, { - "handlePong", + "OnPong", wire.NewMsgPong(42), }, { - "handleAlert", + "OnAlert", wire.NewMsgAlert([]byte("payload"), []byte("signature")), }, { - "handleMemPool", + "OnMemPool", wire.NewMsgMemPool(), }, { - "handleTx", + "OnTx", wire.NewMsgTx(), }, { - "handleBlock", + "OnBlock", wire.NewMsgBlock(wire.NewBlockHeader(&wire.ShaHash{}, &wire.ShaHash{}, 1, 1)), }, { - "handleInv", + "OnInv", wire.NewMsgInv(), }, { - "handleHeaders", + "OnHeaders", wire.NewMsgHeaders(), }, { - "handleNotFound", + "OnNotFound", wire.NewMsgNotFound(), }, { - "handleGetData", + "OnGetData", wire.NewMsgGetData(), }, { - "handleGetBlocks", + "OnGetBlocks", wire.NewMsgGetBlocks(&wire.ShaHash{}), }, { - "handleGetHeaders", + "OnGetHeaders", wire.NewMsgGetHeaders(), }, { - "handleFilterAddMsg", + "OnFilterAddMsg", wire.NewMsgFilterAdd([]byte{0x01}), }, { - "handleFilterClearMsg", + "OnFilterClearMsg", wire.NewMsgFilterClear(), }, { - "handleFilterLoadMsg", + "OnFilterLoadMsg", wire.NewMsgFilterLoad([]byte{0x01}, 10, 0, wire.BloomUpdateNone), }, + { + "OnMerkleBlockMsg", + wire.NewMsgMerkleBlock(wire.NewBlockHeader(&wire.ShaHash{}, &wire.ShaHash{}, 1, 1)), + }, // only one version message is allowed // only one verack message is allowed { - "handleMsgReject", + "OnMsgReject", wire.NewMsgReject("block", wire.RejectDuplicate, "dupe block"), }, } - - for i, test := range tests { - // Chan to make sure the listener is fired - ok := make(chan struct{}) - - // Add listener and use chan to signal when it is called - switch test.msg.(type) { - case *wire.MsgGetAddr: - p1.AddGetAddrMsgListener(test.handler, func(*peer.Peer, *wire.MsgGetAddr) { - ok <- struct{}{} - }) - case *wire.MsgAddr: - p1.AddAddrMsgListener(test.handler, func(*peer.Peer, *wire.MsgAddr) { - ok <- struct{}{} - }) - case *wire.MsgPing: - p1.AddPingMsgListener(test.handler, func(*peer.Peer, *wire.MsgPing) { - ok <- struct{}{} - }) - case *wire.MsgPong: - p1.AddPongMsgListener(test.handler, func(*peer.Peer, *wire.MsgPong) { - ok <- struct{}{} - }) - case *wire.MsgAlert: - p1.AddAlertMsgListener(test.handler, func(*peer.Peer, *wire.MsgAlert) { - ok <- struct{}{} - }) - case *wire.MsgMemPool: - p1.AddMemPoolMsgListener(test.handler, func(*peer.Peer, *wire.MsgMemPool) { - ok <- struct{}{} - }) - case *wire.MsgTx: - p1.AddTxMsgListener(test.handler, func(*peer.Peer, *wire.MsgTx) { - ok <- struct{}{} - }) - case *wire.MsgBlock: - p1.AddBlockMsgListener(test.handler, func(*peer.Peer, *wire.MsgBlock, []byte) { - ok <- struct{}{} - }) - case *wire.MsgInv: - p1.AddInvMsgListener(test.handler, func(*peer.Peer, *wire.MsgInv) { - ok <- struct{}{} - }) - case *wire.MsgHeaders: - p1.AddHeadersMsgListener(test.handler, func(*peer.Peer, *wire.MsgHeaders) { - ok <- struct{}{} - }) - case *wire.MsgNotFound: - p1.AddNotFoundMsgListener(test.handler, func(*peer.Peer, *wire.MsgNotFound) { - ok <- struct{}{} - }) - case *wire.MsgGetData: - p1.AddGetDataMsgListener(test.handler, func(*peer.Peer, *wire.MsgGetData) { - ok <- struct{}{} - }) - case *wire.MsgGetBlocks: - p1.AddGetBlocksMsgListener(test.handler, func(*peer.Peer, *wire.MsgGetBlocks) { - ok <- struct{}{} - }) - case *wire.MsgGetHeaders: - p1.AddGetHeadersMsgListener(test.handler, func(*peer.Peer, *wire.MsgGetHeaders) { - ok <- struct{}{} - }) - case *wire.MsgFilterAdd: - p1.AddFilterAddMsgListener(test.handler, func(*peer.Peer, *wire.MsgFilterAdd) { - ok <- struct{}{} - }) - case *wire.MsgFilterClear: - p1.AddFilterClearMsgListener(test.handler, func(*peer.Peer, *wire.MsgFilterClear) { - ok <- struct{}{} - }) - case *wire.MsgFilterLoad: - p1.AddFilterLoadMsgListener(test.handler, func(*peer.Peer, *wire.MsgFilterLoad) { - ok <- struct{}{} - }) - case *wire.MsgVersion: - p1.AddVersionMsgListener(test.handler, func(*peer.Peer, *wire.MsgVersion) { - ok <- struct{}{} - }) - case *wire.MsgVerAck: - p1.AddVerAckMsgListener(test.handler, func(*peer.Peer, *wire.MsgVerAck) { - ok <- struct{}{} - }) - case *wire.MsgReject: - p1.AddRejectMsgListener(test.handler, func(*peer.Peer, *wire.MsgReject) { - ok <- struct{}{} - }) - } - + t.Logf("Running %d tests", len(tests)) + for _, test := range tests { // Queue the test message - p2.QueueMessage(test.msg, nil) - // Timeout in case something goes wrong + outPeer.QueueMessage(test.msg, nil) select { case <-ok: - // Should receive ok from the listener case <-time.After(time.Second * 1): - t.Errorf("testListeners #%d: expected handler %s to be called", i, test.handler) + t.Errorf("TestPeerListeners: %s timeout", test.listener) return } - - // Reset listeners - p1.RemoveGetAddrMsgListener(test.handler) - p1.RemoveAddrMsgListener(test.handler) - p1.RemovePingMsgListener(test.handler) - p1.RemovePongMsgListener(test.handler) - p1.RemoveAlertMsgListener(test.handler) - p1.RemoveMemPoolMsgListener(test.handler) - p1.RemoveTxMsgListener(test.handler) - p1.RemoveBlockMsgListener(test.handler) - p1.RemoveInvMsgListener(test.handler) - p1.RemoveHeadersMsgListener(test.handler) - p1.RemoveNotFoundMsgListener(test.handler) - p1.RemoveGetDataMsgListener(test.handler) - p1.RemoveGetBlocksMsgListener(test.handler) - p1.RemoveGetHeadersMsgListener(test.handler) - p1.RemoveFilterAddMsgListener(test.handler) - p1.RemoveFilterClearMsgListener(test.handler) - p1.RemoveFilterLoadMsgListener(test.handler) - p1.RemoveVersionMsgListener(test.handler) - p1.RemoveVerAckMsgListener(test.handler) - p1.RemoveRejectMsgListener(test.handler) } + inPeer.Shutdown() + outPeer.Shutdown() } // TestOutboundPeer tests that the outbound peer works as expected. @@ -559,23 +495,17 @@ func TestOutboundPeer(t *testing.T) { peerCfg := &peer.Config{ NewestBlock: mockNewestSha, - BestLocalAddress: addrMgr.GetBestLocalAddress, UserAgentName: "peer", UserAgentVersion: "1.0", - Net: wire.MainNet, - Services: wire.SFNodeNetwork, + ChainParams: &chaincfg.MainNetParams, + Services: 0, } - var b bytes.Buffer - c := &conn{raddr: "127.0.0.1:8333", Writer: &b, Reader: &b} - - na, err := addrMgr.HostToNetAddress("127.0.0.1", uint16(8333), peerCfg.Services) - if err != nil { - t.Errorf("HostToNetAddress: error %v\n", err) - return - } - p := peer.NewOutboundPeer(peerCfg, 1, na) + r, w := io.Pipe() + c := &conn{raddr: "10.0.0.1:8333", Writer: w, Reader: r} + na := wire.NewNetAddressIPPort(net.IP{10, 0, 0, 1}, uint16(8333), 0) + p := peer.NewOutboundPeer(peerCfg, na) if p.NA() != na { t.Errorf("TestOutboundPeer: wrong NA - got %v, want %v", p.NA(), na) return @@ -608,9 +538,17 @@ func TestOutboundPeer(t *testing.T) { <-done p.Shutdown() - // Reset NewestBlock to normal Start - peerCfg.NewestBlock = newestSha - p1 := peer.NewOutboundPeer(peerCfg, 1, na) + // Test NewestBlock + var newestBlock = func() (*wire.ShaHash, int32, error) { + hashStr := "14a0810ac680a3eb3f82edc878cea25ec41d6b790744e5daeef" + hash, err := wire.NewShaHashFromStr(hashStr) + if err != nil { + return nil, 0, err + } + return hash, 234439, nil + } + peerCfg.NewestBlock = newestBlock + p1 := peer.NewOutboundPeer(peerCfg, na) if err := p1.Connect(c); err != nil { t.Errorf("Connect: unexpected err %v\n", err) return @@ -635,8 +573,8 @@ func TestOutboundPeer(t *testing.T) { p1.Shutdown() // Test regression - peerCfg.RegressionTest = true - p2 := peer.NewOutboundPeer(peerCfg, 1, na) + peerCfg.ChainParams = &chaincfg.RegressionNetParams + p2 := peer.NewOutboundPeer(peerCfg, na) if err := p2.Connect(c); err != nil { t.Errorf("Connect: unexpected err %v\n", err) return @@ -672,3 +610,8 @@ func TestOutboundPeer(t *testing.T) { p2.Shutdown() } + +func init() { + // Allow self connection when running the tests. + peer.TstAllowSelfConns() +} diff --git a/rpcserver.go b/rpcserver.go index fe0eac02c8b..c109cf181dd 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2289,12 +2289,13 @@ func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) info := &btcjson.GetPeerInfoResult{ ID: statsSnap.ID, Addr: statsSnap.Addr, - Services: statsSnap.Services, - LastSend: statsSnap.LastSend, - LastRecv: statsSnap.LastRecv, + Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)), + LastSend: statsSnap.LastSend.Unix(), + LastRecv: statsSnap.LastRecv.Unix(), BytesSent: statsSnap.BytesSent, BytesRecv: statsSnap.BytesRecv, - ConnTime: statsSnap.ConnTime, + ConnTime: statsSnap.ConnTime.Unix(), + PingTime: float64(statsSnap.LastPingMicros), TimeOffset: statsSnap.TimeOffset, Version: statsSnap.Version, SubVer: statsSnap.UserAgent, @@ -2304,7 +2305,6 @@ func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) BanScore: 0, SyncNode: p == syncPeer, } - info.PingTime = float64(statsSnap.LastPingMicros) if p.LastPingNonce() != 0 { wait := float64(time.Now().Sub(statsSnap.LastPingTime).Nanoseconds()) // We actually want microseconds. diff --git a/server.go b/server.go index 476ad2a8079..b5ddbfb7bec 100644 --- a/server.go +++ b/server.go @@ -102,8 +102,8 @@ type updatePeerHeightsMsg struct { // server provides a bitcoin server for handling communications to and from // bitcoin peers. type server struct { - nonce uint64 listeners []net.Listener + peerConfig *peer.Config chainParams *chaincfg.Params started int32 // atomic shutdown int32 // atomic @@ -760,11 +760,33 @@ func (s *server) handleGetHeadersMsg(p *peer.Peer, msg *wire.MsgGetHeaders) { p.QueueMessage(headersMsg, nil) } +// isValidBIP0111 is a helper function for the bloom filter commands to check +// BIP0111 compliance. +func isValidBIP0111(p *peer.Peer, cmd string) bool { + if p.Services()&wire.SFNodeBloom != wire.SFNodeBloom { + if p.ProtocolVersion() >= wire.BIP0111Version { + peerLog.Debugf("%s sent an unsupported %s "+ + "request -- disconnecting", p, cmd) + p.Disconnect() + } else { + peerLog.Debugf("Ignoring %s request from %s -- bloom "+ + "support is disabled", cmd, p) + } + return false + } + + return true +} + // handleFilterAddMsg is invoked when a peer receives a filteradd bitcoin // message and is used by remote peers to add data to an already loaded bloom // filter. The peer will be disconnected if a filter is not loaded when this // message is received. func (s *server) handleFilterAddMsg(p *peer.Peer, msg *wire.MsgFilterAdd) { + if !isValidBIP0111(p, msg.Command()) { + return + } + pInfo, err := s.blockManager.peerInfo(p) if err != nil { bmgrLog.Errorf("%v", err) @@ -785,6 +807,10 @@ func (s *server) handleFilterAddMsg(p *peer.Peer, msg *wire.MsgFilterAdd) { // The peer will be disconnected if a filter is not loaded when this message is // received. func (s *server) handleFilterClearMsg(p *peer.Peer, msg *wire.MsgFilterClear) { + if !isValidBIP0111(p, msg.Command()) { + return + } + pInfo, err := s.blockManager.peerInfo(p) if err != nil { bmgrLog.Errorf("%v", err) @@ -803,6 +829,10 @@ func (s *server) handleFilterClearMsg(p *peer.Peer, msg *wire.MsgFilterClear) { // message and it used to load a bloom filter that should be used for // delivering merkle blocks and associated transactions that match the filter. func (s *server) handleFilterLoadMsg(p *peer.Peer, msg *wire.MsgFilterLoad) { + if !isValidBIP0111(p, msg.Command()) { + return + } + pInfo, err := s.blockManager.peerInfo(p) if err != nil { bmgrLog.Errorf("%v", err) @@ -918,33 +948,16 @@ func (s *server) handleAddrMsg(p *peer.Peer, msg *wire.MsgAddr) { s.addrManager.AddAddresses(msg.AddrList, p.NA()) } -// registerListeners registers peer message listeners -func (s *server) registerListeners(p *peer.Peer) { - p.AddVersionMsgListener("handleVersionMsg", s.handleVersionMsg) - p.AddMemPoolMsgListener("handleMemPoolMsg", s.handleMemPoolMsg) - p.AddTxMsgListener("handleTxMsg", s.handleTxMsg) - p.AddBlockMsgListener("handleBlockMsg", s.handleBlockMsg) - p.AddInvMsgListener("handleInvMsg", s.handleInvMsg) - p.AddHeadersMsgListener("handleHeadersMsg", s.handleHeadersMsg) - p.AddGetDataMsgListener("handleGetDataMsg", s.handleGetDataMsg) - p.AddGetBlocksMsgListener("handleGetBlocksMsg", s.handleGetBlocksMsg) - p.AddGetHeadersMsgListener("handleGetHeadersMsg", s.handleGetHeadersMsg) - p.AddFilterAddMsgListener("handleFilterAddMsg", s.handleFilterAddMsg) - p.AddFilterClearMsgListener("handleFilterClearMsg", s.handleFilterClearMsg) - p.AddFilterLoadMsgListener("handleFilterLoadMsg", s.handleFilterLoadMsg) - p.AddGetAddrMsgListener("handleGetAddrMsg", s.handleGetAddrMsg) - p.AddAddrMsgListener("handleAddrMsg", s.handleAddrMsg) - - // When peer gets shutdown, notify the server that it is done. - go func() { - p.WaitForShutdown() - s.donePeers <- p +// handleRead is invoked when a peer receives a message and it is used to update +// the bytes received by the server. +func (s *server) handleRead(p *peer.Peer, bytesRead int, msg wire.Message, err error) { + s.AddBytesReceived(uint64(bytesRead)) +} - // Only tell block manager we are gone if we ever told it we existed. - if p.VersionKnown() { - s.blockManager.DonePeer(p) - } - }() +// handleWrite is invoked when a peer sends a message and it is used to update +// the bytes sent by the server. +func (s *server) handleWrite(p *peer.Peer, bytesWritten int, msg wire.Message, err error) { + s.AddBytesSent(uint64(bytesWritten)) } func (p *peerState) Count() int { @@ -1068,6 +1081,16 @@ func (s *server) handleAddPeerMsg(state *peerState, p *peer.Peer, persistent boo } } + // Handle peer shutdown or disconnect + go func() { + p.WaitForShutdown() + s.donePeers <- p + + // Only tell block manager we are gone if we ever told it we existed. + if p.VersionKnown() { + s.blockManager.DonePeer(p) + } + }() return true } @@ -1354,19 +1377,7 @@ func (s *server) listenHandler(listener net.Listener) { } continue } - peerCfg := &peer.Config{ - NewestBlock: s.db.NewestSha, - BestLocalAddress: s.addrManager.GetBestLocalAddress, - Proxy: cfg.Proxy, - RegressionTest: cfg.RegressionTest, - UserAgentName: userAgentName, - UserAgentVersion: userAgentVersion, - Net: s.chainParams.Net, - Services: wire.SFNodeNetwork, - } - p := peer.NewInboundPeer(peerCfg, s.nonce, conn) - s.registerListeners(p) - s.AddPeer(p) + s.AddPeer(peer.NewInboundPeer(s.peerConfig, conn)) } s.wg.Done() srvrLog.Tracef("Listener handler done for %s", listener.Addr()) @@ -1421,16 +1432,6 @@ func (s *server) seedFromDNS() { // addPeer initializes a new outbound peer and setups the message listeners. func (s *server) addPeer(addr string) *peer.Peer { - peerCfg := &peer.Config{ - NewestBlock: s.db.NewestSha, - BestLocalAddress: s.addrManager.GetBestLocalAddress, - Proxy: cfg.Proxy, - RegressionTest: cfg.RegressionTest, - UserAgentName: userAgentName, - UserAgentVersion: userAgentVersion, - Net: s.chainParams.Net, - Services: wire.SFNodeNetwork, - } host, portStr, err := net.SplitHostPort(addr) if err != nil { srvrLog.Errorf("Tried to create a new outbound peer with invalid "+ @@ -1445,16 +1446,14 @@ func (s *server) addPeer(addr string) *peer.Peer { return nil } - na, err := s.addrManager.HostToNetAddress(host, uint16(port), peerCfg.Services) + na, err := s.addrManager.HostToNetAddress(host, uint16(port), s.services) if err != nil { srvrLog.Errorf("Can not turn host %s into netaddress: %v", host, err) return nil } - p := peer.NewOutboundPeer(peerCfg, s.nonce, na) - s.registerListeners(p) - return p + return peer.NewOutboundPeer(s.peerConfig, na) } // establishConn establishes a connection to the peer. @@ -2101,11 +2100,6 @@ out: // bitcoin network type specified by chainParams. Use start to begin accepting // connections from peers. func newServer(listenAddrs []string, db database.Db, chainParams *chaincfg.Params) (*server, error) { - nonce, err := wire.RandomUint64() - if err != nil { - return nil, err - } - services := defaultServices if cfg.NoPeerBloomFilters { services &^= wire.SFNodeBloom @@ -2238,7 +2232,6 @@ func newServer(listenAddrs []string, db database.Db, chainParams *chaincfg.Param } s := server{ - nonce: nonce, listeners: listeners, chainParams: chainParams, addrManager: amgr, @@ -2266,6 +2259,34 @@ func newServer(listenAddrs []string, db database.Db, chainParams *chaincfg.Param s.txMemPool = newTxMemPool(&s) s.cpuMiner = newCPUMiner(&s) + s.peerConfig = &peer.Config{ + NewestBlock: db.NewestSha, + BestLocalAddress: amgr.GetBestLocalAddress, + Proxy: cfg.Proxy, + UserAgentName: userAgentName, + UserAgentVersion: userAgentVersion, + ChainParams: chainParams, + Services: services, + Listeners: peer.MessageListeners{ + OnVersion: s.handleVersionMsg, + OnMemPool: s.handleMemPoolMsg, + OnTx: s.handleTxMsg, + OnBlock: s.handleBlockMsg, + OnInv: s.handleInvMsg, + OnHeaders: s.handleHeadersMsg, + OnGetData: s.handleGetDataMsg, + OnGetBlocks: s.handleGetBlocksMsg, + OnGetHeaders: s.handleGetHeadersMsg, + OnFilterAdd: s.handleFilterAddMsg, + OnFilterClear: s.handleFilterClearMsg, + OnFilterLoad: s.handleFilterLoadMsg, + OnGetAddr: s.handleGetAddrMsg, + OnAddr: s.handleAddrMsg, + OnRead: s.handleRead, + OnWrite: s.handleWrite, + }, + } + if cfg.AddrIndex { ai, err := newAddrIndexer(&s) if err != nil {