Skip to content

Commit

Permalink
Implement a rebroadcast handler.
Browse files Browse the repository at this point in the history
This commit implements a rebroadcast handler which deals with
rebroadcasting inventory at a random time interval between 0 and 30
minutes.  It then uses the new rebroadcast logic to ensure transactions
which were submitted via the sendrawtransaction RPC are rebroadcast until
they make it into a block.

Closes #99.
  • Loading branch information
mydesktop authored and davecgh committed Mar 27, 2014
1 parent 5fcdfb0 commit ab002c9
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 43 deletions.
12 changes: 10 additions & 2 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,8 +1062,16 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
b.server.txMemPool.RemoveDoubleSpends(tx)
}

// Notify registered websocket clients
if r := b.server.rpcServer; r != nil {
// Now that this block is in the blockchain we can mark all the
// transactions (except the coinbase) as no longer needing
// rebroadcasting.
for _, tx := range block.Transactions()[1:] {
iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha())
b.server.RemoveRebroadcastInventory(iv)
}

// Notify registered websocket clients of incoming block.
r.ntfnMgr.NotifyBlockConnected(block)
}

Expand All @@ -1087,7 +1095,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
}
}

// Notify registered websocket clients
// Notify registered websocket clients.
if r := b.server.rpcServer; r != nil {
r.ntfnMgr.NotifyBlockDisconnected(block)
}
Expand Down
1 change: 1 addition & 0 deletions mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type txMemPool struct {
outpoints map[btcwire.OutPoint]*btcutil.Tx
pennyTotal float64 // exponentially decaying total for penny spends.
lastPennyUnix int64 // unix time of last ``penny spend''

}

// isDust returns whether or not the passed transaction output amount is
Expand Down
7 changes: 7 additions & 0 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,13 @@ func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error
return nil, err
}

// We keep track of all the sendrawtransaction request txs because we need to
// rebroadcast them if they fail to get broadcast or entered into a block; for
// instance if the client was offline when they were generated. Refer to
// server.go in /btcd.
iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha())
s.server.AddRebroadcastInventory(iv)

return tx.Sha().String(), nil
}

Expand Down
184 changes: 143 additions & 41 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ package main

import (
"container/list"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcjson"
"github.com/conformal/btcwire"
"math"
"net"
"runtime"
"strconv"
Expand All @@ -19,9 +22,9 @@ import (
"time"
)

// These constants are used by the DNS seed code to pick a random last seen
// time.
const (
// These constants are used by the DNS seed code to pick a random last seen
// time.
secondsIn3Days int32 = 24 * 60 * 60 * 3
secondsIn4Days int32 = 24 * 60 * 60 * 4
)
Expand All @@ -46,33 +49,42 @@ type broadcastMsg struct {
excludePeers []*peer
}

// BroadcastInventoryAdd is a type used to declare that the InvVect it contains
// needs to be added to the rebroadcast map
type broadcastInventoryAdd *btcwire.InvVect

// BroadcastInventoryDel is a type used to declare that the InvVect it contains
// needs to be removed from the rebroadcast map
type broadcastInventoryDel *btcwire.InvVect

// server provides a bitcoin server for handling communications to and from
// bitcoin peers.
type server struct {
nonce uint64
listeners []net.Listener
btcnet btcwire.BitcoinNet
started int32 // atomic
shutdown int32 // atomic
shutdownSched int32 // atomic
bytesMutex sync.Mutex // For the following two fields.
bytesReceived uint64 // Total bytes received from all peers since start.
bytesSent uint64 // Total bytes sent by all peers since start.
addrManager *AddrManager
rpcServer *rpcServer
blockManager *blockManager
txMemPool *txMemPool
newPeers chan *peer
donePeers chan *peer
banPeers chan *peer
wakeup chan bool
query chan interface{}
relayInv chan *btcwire.InvVect
broadcast chan broadcastMsg
wg sync.WaitGroup
quit chan bool
nat NAT
db btcdb.Db
nonce uint64
listeners []net.Listener
btcnet btcwire.BitcoinNet
started int32 // atomic
shutdown int32 // atomic
shutdownSched int32 // atomic
bytesMutex sync.Mutex // For the following two fields.
bytesReceived uint64 // Total bytes received from all peers since start.
bytesSent uint64 // Total bytes sent by all peers since start.
addrManager *AddrManager
rpcServer *rpcServer
blockManager *blockManager
txMemPool *txMemPool
modifyRebroadcastInv chan interface{}
newPeers chan *peer
donePeers chan *peer
banPeers chan *peer
wakeup chan bool
query chan interface{}
relayInv chan *btcwire.InvVect
broadcast chan broadcastMsg
wg sync.WaitGroup
quit chan bool
nat NAT
db btcdb.Db
}

type peerState struct {
Expand All @@ -84,6 +96,34 @@ type peerState struct {
maxOutboundPeers int
}

// randomUint16Number returns a random uint16 in a specified input range. Note
// that the range is in zeroth ordering; if you pass it 1800, you will get values
// from 0 to 1800. In order to avoid modulo bias and ensure every possible
// outcome in [0, max) has equal probability, the random number must be sampled
// from a random source that has a range limited to a multiple of the modulus.
func randomUint16Number(max uint16) uint16 {
var randomNumber uint16
var limitRange = (math.MaxUint16 / max) * max
for {
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
if randomNumber < limitRange {
return (randomNumber % max)
}
}
}

// AddRebroadcastInventory dispatches a message to the rebroadcastHandler
// specifying to add an item to the rebroadcast map of InvVects
func (s *server) AddRebroadcastInventory(iv *btcwire.InvVect) {
s.modifyRebroadcastInv <- broadcastInventoryAdd(iv)
}

// RemoveRebroadcastInventory dispatches a message to the rebroadcastHandler
// specifying to remove an item from the rebroadcast map of InvVects
func (s *server) RemoveRebroadcastInventory(iv *btcwire.InvVect) {
s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
}

func (p *peerState) Count() int {
return p.peers.Len() + p.outboundPeers.Len() + p.persistentPeers.Len()
}
Expand Down Expand Up @@ -706,6 +746,61 @@ func (s *server) NetTotals() (uint64, uint64) {
return s.bytesReceived, s.bytesSent
}

// rebroadcastHandler is a listener that uses a couple of channels to maintain
// a list of transactions that need to be rebroadcast. The list of tx is stored
// in their abstracted P2P form (InvVect) in a map (pendingInvs).
// Why we need this:
// We handle user submitted tx, e.g. from a wallet, via the RPC submission
// function sendrawtransactions. Because we need to ensure that user-
// submitted tx eventually enter a block, we need to retransmit them
// periodically until we see them actually enter a block.
func (s *server) rebroadcastHandler() {
timer := time.NewTimer(5 * time.Minute) // Wait 5 min before first tx rebroadcast.
pendingInvs := make(map[btcwire.InvVect]struct{})

out:
for {
select {
case riv := <-s.modifyRebroadcastInv:
switch msg := riv.(type) {
// Incoming InvVects are added to our map of RPC txs.
case broadcastInventoryAdd:
pendingInvs[*msg] = struct{}{}

// When an InvVect has been added to a block, we can now remove it;
// note that we need to check if the iv is actually found in the
// map before we try to delete it, as when handleNotifyMsg finds a
// new block it cycles through the txs and sends them all
// indescriminately to this function. The if loop is cheap, so
// this should not be an issue.
case broadcastInventoryDel:
if _, ok := pendingInvs[*msg]; ok {
delete(pendingInvs, *msg)
}
}

// When the timer triggers, scan through all the InvVects of RPC-submitted
// tx and cause the server to resubmit them to peers, as they have not
// been added to incoming blocks.
case <-timer.C:
for iv := range pendingInvs {
ivCopy := iv
s.RelayInventory(&ivCopy)
}

// Set the timer to go off at a random time between 0 and 1799 seconds
timer.Reset(time.Second * time.Duration(randomUint16Number(1800)))

case <-s.quit:
break out
}
}

timer.Stop()

s.wg.Done()
}

// Start begins accepting connections from peers.
func (s *server) Start() {
// Already started?
Expand All @@ -726,13 +821,19 @@ func (s *server) Start() {
// managers.
s.wg.Add(1)
go s.peerHandler()

if s.nat != nil {
s.wg.Add(1)
go s.upnpUpdateThread()
}

// Start the RPC server if it's not disabled.
if !cfg.DisableRPC {
s.wg.Add(1)

// Start the rebroadcastHandler, which ensures user tx received by
// the RPC server are rebroadcast until being included in a block.
go s.rebroadcastHandler()

s.rpcServer.Start()
}
}
Expand Down Expand Up @@ -1030,20 +1131,21 @@ func newServer(listenAddrs []string, db btcdb.Db, btcnet btcwire.BitcoinNet) (*s
}

s := server{
nonce: nonce,
listeners: listeners,
btcnet: btcnet,
addrManager: amgr,
newPeers: make(chan *peer, cfg.MaxPeers),
donePeers: make(chan *peer, cfg.MaxPeers),
banPeers: make(chan *peer, cfg.MaxPeers),
wakeup: make(chan bool),
query: make(chan interface{}),
relayInv: make(chan *btcwire.InvVect, cfg.MaxPeers),
broadcast: make(chan broadcastMsg, cfg.MaxPeers),
quit: make(chan bool),
nat: nat,
db: db,
nonce: nonce,
listeners: listeners,
btcnet: btcnet,
addrManager: amgr,
newPeers: make(chan *peer, cfg.MaxPeers),
donePeers: make(chan *peer, cfg.MaxPeers),
banPeers: make(chan *peer, cfg.MaxPeers),
wakeup: make(chan bool),
query: make(chan interface{}),
relayInv: make(chan *btcwire.InvVect, cfg.MaxPeers),
broadcast: make(chan broadcastMsg, cfg.MaxPeers),
quit: make(chan bool),
modifyRebroadcastInv: make(chan interface{}),
nat: nat,
db: db,
}
bm, err := newBlockManager(&s)
if err != nil {
Expand Down

0 comments on commit ab002c9

Please sign in to comment.