Skip to content

Commit 78d480e

Browse files
committed
dbft: implement WithRequestTx callback. Close #59
1 parent 4d76ca7 commit 78d480e

File tree

3 files changed

+47
-0
lines changed

3 files changed

+47
-0
lines changed

consensus/dbft/dbft.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ type DBFT struct {
201201
// about a new consensus payload to be sent.
202202
broadcast func(m *dbftproto.Message) error
203203

204+
// requestTxs is a callback which is called to request the missing
205+
// transactions from neighbor nodes.
206+
requestTxs func(hashed []common.Hash)
207+
204208
// various chain/mempool events and subscription management:
205209
chainHeadSub event.Subscription
206210
chainHeadEvents chan core.ChainHeadEvent
@@ -465,6 +469,14 @@ func New(config *params.DBFTConfig, db ethdb.Database) (*DBFT, error) {
465469
return res
466470
}),
467471
dbft.WithRequestTx(func(h ...util.Uint256) {
472+
if len(h) == 0 {
473+
return
474+
}
475+
hashes := make([]common.Hash, len(h))
476+
for i := range h {
477+
hashes = append(hashes, common.Hash(h[i]))
478+
}
479+
c.requestTxs(hashes)
468480
}),
469481
dbft.WithGetConsensusAddress(func(keys ...dbftCrypto.PublicKey) util.Uint160 {
470482
// NextConsensus is filled manually in NewBlockFromContext.
@@ -607,6 +619,11 @@ func (c *DBFT) WithBroadcast(f func(m *dbftproto.Message) error) {
607619
c.broadcast = f
608620
}
609621

622+
// WithRequestTxs sets callback to request the missing transactions from neighbor nodese.
623+
func (c *DBFT) WithRequestTxs(f func(hashed []common.Hash)) {
624+
c.requestTxs = f
625+
}
626+
610627
// WithTxPool initializes transaction pool API for DBFT interactions with memory pool
611628
// (fetching unknown transactions).
612629
func (c *DBFT) WithTxPool(pool txPool) {

eth/backend.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
279279
bft.WithEthAPI(ethAPI)
280280
bft.WithBroadcast(eth.dbftSrv.BroadcastMessage)
281281
bft.WithTxPool(eth.txPool)
282+
bft.WithRequestTxs(eth.handler.BroadcastRequestTxs)
282283
}
283284

284285
// Setup DNS discovery iterators.

eth/handler.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ const (
5353
// All transactions with a higher size will be announced and need to be fetched
5454
// by the peer.
5555
txMaxBroadcastSize = 4096
56+
57+
// maxHashCount is the max count of hashes in one batch to RequestTxs
58+
maxHashesCount = 500
5659
)
5760

5861
var (
@@ -685,3 +688,29 @@ func (h *handler) enableSyncedFeatures() {
685688
h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize)
686689
}
687690
}
691+
692+
// BroadcastRequestTxs will send GetPooledTransactionsMsg to neighbor peers
693+
func (h *handler) BroadcastRequestTxs(txHashes []common.Hash) {
694+
if len(txHashes) == 0 {
695+
return
696+
}
697+
for i := 0; i <= len(txHashes)/maxHashesCount; i++ {
698+
start := i * maxHashesCount
699+
stop := (i + 1) * maxHashesCount
700+
if stop > len(txHashes) {
701+
stop = len(txHashes)
702+
}
703+
if start == stop {
704+
break
705+
}
706+
// Broadcast RequestTxs
707+
for key, peer := range h.peers.peers {
708+
err := peer.RequestTxs(txHashes)
709+
if err != nil {
710+
log.Error("BroadcastRequestTxs", "txHashes", txHashes,
711+
"peer", key,
712+
"error", err)
713+
}
714+
}
715+
}
716+
}

0 commit comments

Comments
 (0)