Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client: add basic TxPool #1176

Merged
merged 20 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2a3ff0c
client: added basic TxPool structure, added PooledTransaction ETH mes…
holgerd77 Mar 30, 2021
6e21c95
client -> tx pool: updated getPooledTransactions to use reqId from ET…
holgerd77 Aug 11, 2021
6298440
client -> tx pool: add txs to the pool, log pool statistics
holgerd77 Aug 11, 2021
814077f
client -> tx pool: improved pool closing, added basic unit test setup
holgerd77 Aug 12, 2021
60b2526
client -> tx pool: added structured test setup and pool tests, fixed …
holgerd77 Aug 12, 2021
9a22632
client -> service: fixed inconsistent service closing (fixes hanging …
holgerd77 Aug 23, 2021
685029a
tx -> pool: fixed bug in pending/handled inclusion checks, tx pool te…
holgerd77 Aug 23, 2021
ae5ae3b
client -> tx pool: moved tx pool to sync
holgerd77 Aug 23, 2021
49c0f3e
client -> tx pool: added proper close functionality to synchronizer
holgerd77 Aug 23, 2021
33a407c
client: reordered sync open() functionality
holgerd77 Aug 23, 2021
b758294
client -> tx pool: added more explicit start/stop logic and separatio…
holgerd77 Aug 23, 2021
5918102
client -> tx pool: test fixes
holgerd77 Aug 23, 2021
de2ee53
client -> tx pool: moved tx pool from Synchronizer to FullSynchronizer
holgerd77 Aug 23, 2021
65614e7
client -> tx pool: prepared for new block tx checks for tx pool
holgerd77 Aug 23, 2021
34d59af
re-apply prior changes:
ryanio Aug 23, 2021
d9f2456
simplify checkTxPoolState() logic
ryanio Aug 23, 2021
c1404ea
`await import` not needed unless using testdouble
ryanio Aug 23, 2021
7fe66b3
finish newBlocks() logic, add tests
ryanio Aug 23, 2021
50fa7b5
fix sync() while loop
ryanio Aug 24, 2021
130c3f3
make logInterval opitonally unefined so it doesnt need to be set to null
ryanio Aug 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions packages/client/lib/net/protocol/ethprotocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,21 @@ type GetBlockBodiesOpts = {
hashes: Buffer[]
}

type GetPooledTransactionsOpts = {
/* Request id (default: next internal id) */
reqId?: BN
/* The tx hashes */
hashes: Buffer[]
}

/*
* Messages with responses that are added as
* methods in camelCase to BoundProtocol.
*/
export interface EthProtocolMethods {
getBlockHeaders: (opts: GetBlockHeadersOpts) => Promise<[BN, BlockHeader[]]>
getBlockBodies: (opts: GetBlockBodiesOpts) => Promise<[BN, BlockBodyBuffer[]]>
getPooledTransactions: (opts: GetPooledTransactionsOpts) => Promise<[BN, any[]]>
}

const id = new BN(0)
Expand Down Expand Up @@ -113,6 +121,24 @@ export class EthProtocol extends Protocol {
],
decode: ([reqId, bodies]: [Buffer, BlockBodyBuffer[]]) => [new BN(reqId), bodies],
},
{
name: 'NewPooledTransactionHashes',
code: 0x08,
},
{
name: 'GetPooledTransactions',
code: 0x09,
response: 0x0a,
encode: ({ reqId, hashes }: GetPooledTransactionsOpts) => [
(reqId === undefined ? id.iaddn(1) : new BN(reqId)).toArrayLike(Buffer),
hashes,
],
},
{
name: 'PooledTransactions',
code: 0x0a,
decode: ([reqId, txs]: [Buffer, any[]]) => [new BN(reqId), txs],
},
]

/**
Expand Down
11 changes: 11 additions & 0 deletions packages/client/lib/service/ethereumservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,15 @@ export class EthereumService extends Service {
await this.synchronizer.stop()
await super.stop()
}

/**
* Close service.
* @return {Promise}
*/
async close() {
if (this.opened) {
await this.synchronizer.close()
}
await super.close()
}
}
2 changes: 2 additions & 0 deletions packages/client/lib/service/fullethereumservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ export class FullEthereumService extends EthereumService {
peer.eth!.send('BlockBodies', { reqId, bodies })
} else if (message.name === 'NewBlockHashes') {
this.synchronizer.handleNewBlockHashes(message.data)
} else if (message.name === 'NewPooledTransactionHashes') {
await this.synchronizer.txPool.announcedTxHashes(message.data, peer)
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/client/lib/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class Service {
* @return {Promise}
*/
async close() {
if (this.running) {
if (this.opened) {
await this.pool.close()
}
this.opened = false
Expand Down
78 changes: 58 additions & 20 deletions packages/client/lib/sync/fullsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Synchronizer, SynchronizerOptions } from './sync'
import { BlockFetcher } from './fetcher'
import { Block } from '@ethereumjs/block'
import { VMExecution } from './execution/vmexecution'
import { TxPool } from './txpool'
import { Event } from '../types'

/**
Expand All @@ -14,6 +15,8 @@ import { Event } from '../types'
export class FullSynchronizer extends Synchronizer {
public execution: VMExecution

public txPool: TxPool

constructor(options: SynchronizerOptions) {
super(options)

Expand All @@ -23,13 +26,18 @@ export class FullSynchronizer extends Synchronizer {
chain: options.chain,
})

this.txPool = new TxPool({
config: this.config,
})

this.config.events.on(Event.SYNC_EXECUTION_VM_ERROR, async () => {
await this.stop()
})

this.config.events.on(Event.CHAIN_UPDATED, async () => {
if (this.running) {
await this.execution.run()
this.checkTxPoolState()
}
})
// eslint-disable-next-line @typescript-eslint/no-floating-promises
Expand All @@ -44,6 +52,28 @@ export class FullSynchronizer extends Synchronizer {
return 'full'
}

/**
* Open synchronizer. Must be called before sync() is called
*/
async open(): Promise<void> {
await super.open()
await this.chain.open()
await this.execution.open()
this.txPool.open()
await this.pool.open()
this.execution.syncing = true
const number = this.chain.blocks.height.toNumber()
const td = this.chain.blocks.td.toString(10)
const hash = this.chain.blocks.latest!.hash()
this.startingBlock = number
this.config.chainCommon.setHardforkByBlockNumber(number)
this.config.logger.info(
`Latest local block: number=${number} td=${td} hash=${short(
hash
)} hardfork=${this.config.chainCommon.hardfork()}`
)
}

/**
* Returns true if peer can be used for syncing
* @return {boolean}
Expand Down Expand Up @@ -86,6 +116,21 @@ export class FullSynchronizer extends Synchronizer {
return result ? result[1][0] : undefined
}

/**
* Checks if tx pool should be started
*/
checkTxPoolState() {
if (!this.syncTargetHeight || this.txPool.running) {
return
}
// If height gte target, we are close enough to the
// head of the chain that the tx pool can be started
const target = this.syncTargetHeight.subn(this.txPool.BLOCKS_BEFORE_TARGET_HEIGHT_ACTIVATION)
if (this.chain.headers.height.gte(target)) {
this.txPool.start()
}
}

/**
* Sync all blocks and state from peer starting from current height.
* @param peer remote peer to sync with
Expand Down Expand Up @@ -139,6 +184,7 @@ export class FullSynchronizer extends Synchronizer {
this.pool.size
}`
)
this.txPool.newBlocks(blocks)
})

this.config.events.on(Event.SYNC_SYNCHRONIZED, () => {
Expand All @@ -153,33 +199,14 @@ export class FullSynchronizer extends Synchronizer {
})
}

/**
* Open synchronizer. Must be called before sync() is called
*/
async open(): Promise<void> {
await this.chain.open()
await this.execution.open()
await this.pool.open()
this.execution.syncing = true
const number = this.chain.blocks.height.toNumber()
const td = this.chain.blocks.td.toString(10)
const hash = this.chain.blocks.latest!.hash()
this.startingBlock = number
this.config.chainCommon.setHardforkByBlockNumber(number)
this.config.logger.info(
`Latest local block: number=${number} td=${td} hash=${short(
hash
)} hardfork=${this.config.chainCommon.hardfork()}`
)
}

/**
* Stop synchronization. Returns a promise that resolves once its stopped.
* @return {Promise}
*/
async stop(): Promise<boolean> {
this.execution.syncing = false
await this.execution.stop()
this.txPool.stop()

if (!this.running) {
return false
Expand All @@ -195,4 +222,15 @@ export class FullSynchronizer extends Synchronizer {

return true
}

/**
* Close synchronizer.
* @return {Promise}
*/
async close() {
if (this.opened) {
this.txPool.close()
}
await super.close()
}
}
27 changes: 14 additions & 13 deletions packages/client/lib/sync/lightsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ export class LightSynchronizer extends Synchronizer {
return 'light'
}

/**
* Open synchronizer. Must be called before sync() is called
*/
async open(): Promise<void> {
await super.open()
await this.chain.open()
await this.pool.open()
const number = this.chain.headers.height.toNumber()
const td = this.chain.headers.td.toString(10)
const hash = this.chain.blocks.latest!.hash()
this.startingBlock = number
this.config.logger.info(`Latest local header: number=${number} td=${td} hash=${short(hash)}`)
}

/**
* Returns true if peer can be used for syncing
* @return {boolean}
Expand Down Expand Up @@ -131,19 +145,6 @@ export class LightSynchronizer extends Synchronizer {
})
}

/**
* Open synchronizer. Must be called before sync() is called
*/
async open(): Promise<void> {
await this.chain.open()
await this.pool.open()
const number = this.chain.headers.height.toNumber()
const td = this.chain.headers.td.toString(10)
const hash = this.chain.blocks.latest!.hash()
this.startingBlock = number
this.config.logger.info(`Latest local header: number=${number} td=${td} hash=${short(hash)}`)
}

/**
* Stop synchronization. Returns a promise that resolves once its stopped.
* @return {Promise}
Expand Down
Loading