Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
Calling txpool.ResetWithHeaders to clear queued transactions during t…
Browse files Browse the repository at this point in the history
…he bulk sync (#299)
  • Loading branch information
brkomir authored Dec 23, 2021
1 parent fb33de0 commit 4765373
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 41 deletions.
4 changes: 2 additions & 2 deletions consensus/dev/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (d *Dev) writeNewBlock(parent *types.Header) error {

// after the block has been written we reset the txpool so that
// the old transactions are removed
d.txpool.ResetWithHeader(block.Header)
d.txpool.ResetWithHeaders(block.Header)

return nil
}
Expand Down Expand Up @@ -229,4 +229,4 @@ func (d *Dev) Seal(block *types.Block, ctx context.Context) (*types.Block, error
func (d *Dev) Close() error {
close(d.closeCh)
return nil
}
}
21 changes: 14 additions & 7 deletions consensus/ibft/ibft.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type blockchainInterface interface {
}

type transactionPoolInterface interface {
ResetWithHeader(h *types.Header)
ResetWithHeaders(headers ...*types.Header)
Pop() (*types.Transaction, func())
DecreaseAccountNonce(tx *types.Transaction)
Length() uint64
Expand All @@ -40,8 +40,8 @@ type transactionPoolInterface interface {
type syncerInterface interface {
Start()
BestPeer() *protocol.SyncPeer
BulkSyncWithPeer(p *protocol.SyncPeer) error
WatchSyncWithPeer(p *protocol.SyncPeer, handler func(b *types.Block) bool)
BulkSyncWithPeer(p *protocol.SyncPeer, newBlocksHandler func(blocks []*types.Block)) error
WatchSyncWithPeer(p *protocol.SyncPeer, newBlockHandler func(b *types.Block) bool)
GetSyncProgression() *protocol.Progression
Broadcast(b *types.Block)
}
Expand Down Expand Up @@ -346,7 +346,14 @@ func (i *Ibft) runSyncState() {
continue
}

if err := i.syncer.BulkSyncWithPeer(p); err != nil {
if err := i.syncer.BulkSyncWithPeer(p, func(newBlocks []*types.Block) {
newHeaders := []*types.Header{}
for _, block := range newBlocks {
newHeaders = append(newHeaders, block.Header)
}

i.txpool.ResetWithHeaders(newHeaders...)
}); err != nil {
i.logger.Error("failed to bulk sync", "err", err)
continue
}
Expand All @@ -362,7 +369,7 @@ func (i *Ibft) runSyncState() {
var isValidator bool
i.syncer.WatchSyncWithPeer(p, func(b *types.Block) bool {
i.syncer.Broadcast(b)
i.txpool.ResetWithHeader(b.Header)
i.txpool.ResetWithHeaders(b.Header)
isValidator = i.isValidSnapshot()

return isValidator
Expand Down Expand Up @@ -764,7 +771,7 @@ func (i *Ibft) insertBlock(block *types.Block) error {

// after the block has been written we reset the txpool so that
// the old transactions are removed
i.txpool.ResetWithHeader(block.Header)
i.txpool.ResetWithHeaders(block.Header)

return nil
}
Expand Down Expand Up @@ -1069,4 +1076,4 @@ func (i *Ibft) pushMessage(msg *proto.MessageReq) {
case i.updateCh <- struct{}{}:
default:
}
}
}
47 changes: 40 additions & 7 deletions consensus/ibft/ibft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func TestWriteTransactions(t *testing.T) {
}
}

func TestRunSyncState_NewHeadReceivedFromPeer_CallsTxPoolResetWithHeader(t *testing.T) {
func TestRunSyncState_NewHeadReceivedFromPeer_CallsTxPoolResetWithHeaders(t *testing.T) {
m := newMockIbft(t, []string{"A", "B", "C"}, "A")
m.setState(SyncState)
expectedNewBlockToSync := &types.Block{Header: &types.Header{Number: 1}}
Expand All @@ -588,12 +588,42 @@ func TestRunSyncState_NewHeadReceivedFromPeer_CallsTxPoolResetWithHeader(t *test
m.runSyncState()

assert.True(t, mockTxPool.resetWithHeaderCalled)
assert.Equal(t, expectedNewBlockToSync.Header, mockTxPool.resetWithHeaderParam)
assert.Equal(t, expectedNewBlockToSync.Header, mockTxPool.resetWithHeadersParam[0])
assert.True(t, mockSyncer.broadcastCalled)
assert.Equal(t, expectedNewBlockToSync, mockSyncer.broadcastedBlock)
}

func TestRunSyncState_BulkSyncWithPeer_CallsTxPoolResetWithHeaders(t *testing.T) {
m := newMockIbft(t, []string{"A", "B", "C"}, "A")
m.setState(SyncState)
expectedNewBlocksToSync := []*types.Block{
{Header: &types.Header{Number: 1}},
{Header: &types.Header{Number: 2}},
{Header: &types.Header{Number: 3}},
}
mockSyncer := &mockSyncer{}
mockSyncer.bulkSyncBocksFromPeer = expectedNewBlocksToSync
m.syncer = mockSyncer
mockTxPool := &mockTxPool{}
m.txpool = mockTxPool

// we need to change state from Sync in order to break from the loop inside runSyncState
stateChangeDelay := time.After(100 * time.Millisecond)
go func() {
<-stateChangeDelay
m.setState(AcceptState)
}()

m.runSyncState()

assert.True(t, mockTxPool.resetWithHeaderCalled)
for i, expected := range expectedNewBlocksToSync {
assert.Equal(t, expected.Header, mockTxPool.resetWithHeadersParam[i])
}
}

type mockSyncer struct {
bulkSyncBocksFromPeer []*types.Block
receivedNewHeadFromPeer *types.Block
broadcastedBlock *types.Block
broadcastCalled bool
Expand All @@ -605,12 +635,15 @@ func (s *mockSyncer) BestPeer() *protocol.SyncPeer {
return &protocol.SyncPeer{}
}

func (s *mockSyncer) BulkSyncWithPeer(p *protocol.SyncPeer) error {
func (s *mockSyncer) BulkSyncWithPeer(p *protocol.SyncPeer, handler func(blocks []*types.Block)) error {
handler(s.bulkSyncBocksFromPeer)
return nil
}

func (s *mockSyncer) WatchSyncWithPeer(p *protocol.SyncPeer, handler func(b *types.Block) bool) {
handler(s.receivedNewHeadFromPeer)
if s.receivedNewHeadFromPeer != nil {
handler(s.receivedNewHeadFromPeer)
}
}

func (s *mockSyncer) GetSyncProgression() *protocol.Progression {
Expand All @@ -626,12 +659,12 @@ type mockTxPool struct {
transactions []*types.Transaction
nonceDecreased map[*types.Transaction]bool
resetWithHeaderCalled bool
resetWithHeaderParam *types.Header
resetWithHeadersParam []*types.Header
}

func (p *mockTxPool) ResetWithHeader(h *types.Header) {
func (p *mockTxPool) ResetWithHeaders(headers ...*types.Header) {
p.resetWithHeaderCalled = true
p.resetWithHeaderParam = h
p.resetWithHeadersParam = headers
}

func (p *mockTxPool) Pop() (*types.Transaction, func()) {
Expand Down
8 changes: 5 additions & 3 deletions protocol/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (s *Syncer) logSyncPeerPopBlockError(err error, peer *SyncPeer) {
}

// BulkSyncWithPeer finds common ancestor with a peer and syncs block until latest block
func (s *Syncer) BulkSyncWithPeer(p *SyncPeer) error {
func (s *Syncer) BulkSyncWithPeer(p *SyncPeer, newBlocksHandler func(blocks []*types.Block)) error {
// find the common ancestor
ancestor, fork, err := s.findCommonAncestor(p.client, p.status)
if err != nil {
Expand Down Expand Up @@ -719,12 +719,14 @@ func (s *Syncer) BulkSyncWithPeer(p *SyncPeer) error {
if err := s.blockchain.WriteBlocks(slot.blocks); err != nil {
return fmt.Errorf("failed to write bulk sync blocks: %v", err)
}

newBlocksHandler(slot.blocks)
}

// try to get the next block
startBlock = sk.LastHeader()

if startBlock.Number >= uint64(target) {
if startBlock.Number >= target {
break
}
}
Expand Down Expand Up @@ -759,4 +761,4 @@ func getHeader(clt proto.V1Client, num *uint64, hash *types.Hash) (*types.Header
return nil, err
}
return header, nil
}
}
45 changes: 29 additions & 16 deletions protocol/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,22 +318,25 @@ func TestBulkSyncWithPeer(t *testing.T) {
headers []*types.Header
peerHeaders []*types.Header
// result
synced bool
err error
shouldSync bool
syncFromBlock int
err error
}{
{
name: "should sync until peer's latest block",
headers: blockchain.NewTestHeaderChainWithSeed(nil, 10, 0),
peerHeaders: blockchain.NewTestHeaderChainWithSeed(nil, 20, 0),
synced: true,
err: nil,
name: "should sync until peer's latest block",
headers: blockchain.NewTestHeaderChainWithSeed(nil, 10, 0),
peerHeaders: blockchain.NewTestHeaderChainWithSeed(nil, 30, 0),
shouldSync: true,
syncFromBlock: 10,
err: nil,
},
{
name: "should sync until peer's latest block",
headers: blockchain.NewTestHeaderChainWithSeed(nil, 20, 0),
peerHeaders: blockchain.NewTestHeaderChainWithSeed(nil, 10, 0),
synced: false,
err: errors.New("fork not found"),
name: "shouldn't sync if peer's latest block is behind",
headers: blockchain.NewTestHeaderChainWithSeed(nil, 20, 0),
peerHeaders: blockchain.NewTestHeaderChainWithSeed(nil, 10, 0),
shouldSync: false,
syncFromBlock: 0,
err: errors.New("fork not found"),
},
}

Expand All @@ -342,17 +345,27 @@ func TestBulkSyncWithPeer(t *testing.T) {
chain, peerChain := NewMockBlockchain(tt.headers), NewMockBlockchain(tt.peerHeaders)
syncer, peerSyncers := SetupSyncerNetwork(t, chain, []blockchainShim{peerChain})
peerSyncer := peerSyncers[0]
var handledNewBlocks []*types.Block
newBlocksHandler := func(blocks []*types.Block) {
handledNewBlocks = append(handledNewBlocks, blocks...)
}

peer := getPeer(syncer, peerSyncer.server.AddrInfo().ID)
assert.NotNil(t, peer)

err := syncer.BulkSyncWithPeer(peer)
err := syncer.BulkSyncWithPeer(peer, newBlocksHandler)
assert.Equal(t, tt.err, err)
WaitUntilProcessedAllEvents(t, syncer, 10*time.Second)

expectedStatus := HeaderToStatus(tt.headers[len(tt.headers)-1])
if tt.synced {
var expectedStatus *Status
if tt.shouldSync {
expectedStatus = HeaderToStatus(tt.peerHeaders[len(tt.peerHeaders)-1])
assert.Equal(t, handledNewBlocks, peerChain.blocks[tt.syncFromBlock:], "not all blocks are handled")
assert.Equal(t, peerChain.blocks, chain.blocks, "chain is not synced")
} else {
expectedStatus = HeaderToStatus(tt.headers[len(tt.headers)-1])
assert.NotEqual(t, handledNewBlocks, peerChain.blocks[tt.syncFromBlock:])
assert.NotEqual(t, peerChain.blocks, chain.blocks)
}
assert.Equal(t, expectedStatus, syncer.status)
})
Expand Down Expand Up @@ -603,4 +616,4 @@ func TestSyncer_PeerDisconnected(t *testing.T) {
// Make sure that the disconnected peer is not in the
// reference node's sync peer map
assert.False(t, found)
}
}
12 changes: 6 additions & 6 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,9 @@ func (t *TxPool) batchDeleteTxFromLookup(txns []*types.Transaction) {
}

// ResetWithHeader does basic txpool housekeeping after a block write
func (t *TxPool) ResetWithHeader(h *types.Header) {
func (t *TxPool) ResetWithHeaders(headers ...*types.Header) {
evnt := &blockchain.Event{
NewChain: []*types.Header{h},
NewChain: headers,
}

// Process the txns in the event to make sure the TxPool is up-to-date
Expand All @@ -577,8 +577,8 @@ func (p *processEventWrapper) addTxn(txn *types.Transaction) {
// promotedTxnCleanup looks through the promoted queue for any invalid transactions
// made by a specific account, and removes them
func (t *TxPool) promotedTxnCleanup(
address types.Address, // The address to filter by
stateNonce uint64, // The valid nonce (reference for pruning)
address types.Address, // The address to filter by
stateNonce uint64, // The valid nonce (reference for pruning)
cleanupCallback func(txn *types.Transaction), // Additional cleanup logic
) {
// Prune out all the now possibly low-nonce transactions in the promoted queue
Expand Down Expand Up @@ -757,7 +757,7 @@ func (t *TxPool) ProcessEvent(evnt *blockchain.Event) {
// validateTx validates that the transaction conforms to specific constraints to be added to the txpool
func (t *TxPool) validateTx(
tx *types.Transaction, // The transaction that should be validated
isLocal bool, // Flag indicating if the transaction is from a local account
isLocal bool, // Flag indicating if the transaction is from a local account
) error {
// Check the transaction size to overcome DOS Attacks
if uint64(len(tx.MarshalRLP())) > txMaxSize {
Expand Down Expand Up @@ -1277,4 +1277,4 @@ func (a *localAccounts) addAddr(addr types.Address) {
// slotsRequired() calculates the number of slotsRequired for given transaction
func slotsRequired(tx *types.Transaction) uint64 {
return (tx.Size() + txSlotSize - 1) / txSlotSize
}
}

0 comments on commit 4765373

Please sign in to comment.