3737)
3838
3939const (
40- maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
41- maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
40+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
41+ maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
42+
43+ // maxQueuedTxs is the maximum number of transaction lists to queue up before
44+ // dropping broadcasts. This is a sensitive number as a transaction list might
45+ // contain a single transaction, or thousands.
46+ maxQueuedTxs = 128
47+
48+ // maxQueuedProps is the maximum number of block propagations to queue up before
49+ // dropping broadcasts. There's not much point in queueing stale blocks, so a few
50+ // that might cover uncles should be enough.
51+ maxQueuedProps = 4
52+
53+ // maxQueuedAnns is the maximum number of block announcements to queue up before
54+ // dropping broadcasts. Similarly to block propagations, there's no point to queue
55+ // above some healthy uncle limit, so use that.
56+ maxQueuedAnns = 4
57+
4258 handshakeTimeout = 5 * time .Second
4359)
4460
@@ -50,6 +66,12 @@ type PeerInfo struct {
5066 Head string `json:"head"` // SHA3 hash of the peer's best owned block
5167}
5268
69+ // propEvent is a block propagation, waiting for its turn in the broadcast queue.
70+ type propEvent struct {
71+ block * types.Block
72+ td * big.Int
73+ }
74+
5375type peer struct {
5476 id string
5577
@@ -63,23 +85,64 @@ type peer struct {
6385 td * big.Int
6486 lock sync.RWMutex
6587
66- knownTxs * set.Set // Set of transaction hashes known to be known by this peer
67- knownBlocks * set.Set // Set of block hashes known to be known by this peer
88+ knownTxs * set.Set // Set of transaction hashes known to be known by this peer
89+ knownBlocks * set.Set // Set of block hashes known to be known by this peer
90+ queuedTxs chan []* types.Transaction // Queue of transactions to broadcast to the peer
91+ queuedProps chan * propEvent // Queue of blocks to broadcast to the peer
92+ queuedAnns chan * types.Block // Queue of blocks to announce to the peer
93+ term chan struct {} // Termination channel to stop the broadcaster
6894}
6995
7096func newPeer (version int , p * p2p.Peer , rw p2p.MsgReadWriter ) * peer {
71- id := p .ID ()
72-
7397 return & peer {
7498 Peer : p ,
7599 rw : rw ,
76100 version : version ,
77- id : fmt .Sprintf ("%x" , id [:8 ]),
101+ id : fmt .Sprintf ("%x" , p . ID (). Bytes () [:8 ]),
78102 knownTxs : set .New (),
79103 knownBlocks : set .New (),
104+ queuedTxs : make (chan []* types.Transaction , maxQueuedTxs ),
105+ queuedProps : make (chan * propEvent , maxQueuedProps ),
106+ queuedAnns : make (chan * types.Block , maxQueuedAnns ),
107+ term : make (chan struct {}),
108+ }
109+ }
110+
111+ // broadcast is a write loop that multiplexes block propagations, announcements
112+ // and transaction broadcasts into the remote peer. The goal is to have an async
113+ // writer that does not lock up node internals.
114+ func (p * peer ) broadcast () {
115+ for {
116+ select {
117+ case txs := <- p .queuedTxs :
118+ if err := p .SendTransactions (txs ); err != nil {
119+ return
120+ }
121+ p .Log ().Trace ("Broadcast transactions" , "count" , len (txs ))
122+
123+ case prop := <- p .queuedProps :
124+ if err := p .SendNewBlock (prop .block , prop .td ); err != nil {
125+ return
126+ }
127+ p .Log ().Trace ("Propagated block" , "number" , prop .block .Number (), "hash" , prop .block .Hash (), "td" , prop .td )
128+
129+ case block := <- p .queuedAnns :
130+ if err := p .SendNewBlockHashes ([]common.Hash {block .Hash ()}, []uint64 {block .NumberU64 ()}); err != nil {
131+ return
132+ }
133+ p .Log ().Trace ("Announced block" , "number" , block .Number (), "hash" , block .Hash ())
134+
135+ case <- p .term :
136+ return
137+ }
80138 }
81139}
82140
141+ // close signals the broadcast goroutine to terminate.
142+ func (p * peer ) close () {
143+ close (p .term )
144+ }
145+
83146// Info gathers and returns a collection of metadata known about a peer.
84147func (p * peer ) Info () * PeerInfo {
85148 hash , td := p .Head ()
@@ -139,6 +202,19 @@ func (p *peer) SendTransactions(txs types.Transactions) error {
139202 return p2p .Send (p .rw , TxMsg , txs )
140203}
141204
205+ // AsyncSendTransactions queues list of transactions propagation to a remote
206+ // peer. If the peer's broadcast queue is full, the event is silently dropped.
207+ func (p * peer ) AsyncSendTransactions (txs []* types.Transaction ) {
208+ select {
209+ case p .queuedTxs <- txs :
210+ for _ , tx := range txs {
211+ p .knownTxs .Add (tx .Hash ())
212+ }
213+ default :
214+ p .Log ().Debug ("Dropping transaction propagation" , "count" , len (txs ))
215+ }
216+ }
217+
142218// SendNewBlockHashes announces the availability of a number of blocks through
143219// a hash notification.
144220func (p * peer ) SendNewBlockHashes (hashes []common.Hash , numbers []uint64 ) error {
@@ -153,12 +229,35 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error
153229 return p2p .Send (p .rw , NewBlockHashesMsg , request )
154230}
155231
232+ // AsyncSendNewBlockHash queues the availability of a block for propagation to a
233+ // remote peer. If the peer's broadcast queue is full, the event is silently
234+ // dropped.
235+ func (p * peer ) AsyncSendNewBlockHash (block * types.Block ) {
236+ select {
237+ case p .queuedAnns <- block :
238+ p .knownBlocks .Add (block .Hash ())
239+ default :
240+ p .Log ().Debug ("Dropping block announcement" , "number" , block .NumberU64 (), "hash" , block .Hash ())
241+ }
242+ }
243+
156244// SendNewBlock propagates an entire block to a remote peer.
157245func (p * peer ) SendNewBlock (block * types.Block , td * big.Int ) error {
158246 p .knownBlocks .Add (block .Hash ())
159247 return p2p .Send (p .rw , NewBlockMsg , []interface {}{block , td })
160248}
161249
250+ // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
251+ // the peer's broadcast queue is full, the event is silently dropped.
252+ func (p * peer ) AsyncSendNewBlock (block * types.Block , td * big.Int ) {
253+ select {
254+ case p .queuedProps <- & propEvent {block : block , td : td }:
255+ p .knownBlocks .Add (block .Hash ())
256+ default :
257+ p .Log ().Debug ("Dropping block propagation" , "number" , block .NumberU64 (), "hash" , block .Hash ())
258+ }
259+ }
260+
162261// SendBlockHeaders sends a batch of block headers to the remote peer.
163262func (p * peer ) SendBlockHeaders (headers []* types.Header ) error {
164263 return p2p .Send (p .rw , BlockHeadersMsg , headers )
@@ -313,7 +412,8 @@ func newPeerSet() *peerSet {
313412}
314413
315414// Register injects a new peer into the working set, or returns an error if the
316- // peer is already known.
415+ // peer is already known. If a new peer it registered, its broadcast loop is also
416+ // started.
317417func (ps * peerSet ) Register (p * peer ) error {
318418 ps .lock .Lock ()
319419 defer ps .lock .Unlock ()
@@ -325,6 +425,8 @@ func (ps *peerSet) Register(p *peer) error {
325425 return errAlreadyRegistered
326426 }
327427 ps .peers [p .id ] = p
428+ go p .broadcast ()
429+
328430 return nil
329431}
330432
@@ -334,10 +436,13 @@ func (ps *peerSet) Unregister(id string) error {
334436 ps .lock .Lock ()
335437 defer ps .lock .Unlock ()
336438
337- if _ , ok := ps .peers [id ]; ! ok {
439+ p , ok := ps .peers [id ]
440+ if ! ok {
338441 return errNotRegistered
339442 }
340443 delete (ps .peers , id )
444+ p .close ()
445+
341446 return nil
342447}
343448
0 commit comments