Skip to content

Commit

Permalink
client -> tx pool: updated getPooledTransactions to use reqId from ET…
Browse files Browse the repository at this point in the history
…H/66, added basic data structures for tx request selection, minor fixes
  • Loading branch information
holgerd77 committed Aug 12, 2021
1 parent aba9497 commit d222e88
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 10 deletions.
14 changes: 13 additions & 1 deletion packages/client/lib/net/protocol/ethprotocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@ type GetBlockBodiesOpts = {
hashes: Buffer[]
}

type GetPooledTransactionsOpts = {
/* Request id (default: next internal id) */
reqId?: BN
/* The tx hashes */
hashes: Buffer[]
}

/*
* Messages with responses that are added as
* methods in camelCase to BoundProtocol.
*/
export interface EthProtocolMethods {
getBlockHeaders: (opts: GetBlockHeadersOpts) => Promise<[BN, BlockHeader[]]>
getBlockBodies: (opts: GetBlockBodiesOpts) => Promise<[BN, BlockBodyBuffer[]]>
getPooledTransactions: (hashes: Buffer[]) => Promise<any[]>
getPooledTransactions: (opts: GetPooledTransactionsOpts) => Promise<[BN, any[]]>
}

const id = new BN(0)
Expand Down Expand Up @@ -122,10 +129,15 @@ export class EthProtocol extends Protocol {
name: 'GetPooledTransactions',
code: 0x09,
response: 0x0a,
encode: ({ reqId, hashes }: GetPooledTransactionsOpts) => [
(reqId === undefined ? id.iaddn(1) : new BN(reqId)).toArrayLike(Buffer),
hashes,
],
},
{
name: 'PooledTransactions',
code: 0x0a,
decode: ([reqId, txs]: [Buffer, any[]]) => [new BN(reqId), txs],
},
]

Expand Down
2 changes: 1 addition & 1 deletion packages/client/lib/service/fullethereumservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export class FullEthereumService extends EthereumService {
} else if (message.name === 'NewBlockHashes') {
await this.synchronizer.announced(message.data, peer)
} else if (message.name === 'NewPooledTransactionHashes') {
this.txPool.announced(message.data, peer)
await this.txPool.announced(message.data, peer)
}
}

Expand Down
55 changes: 47 additions & 8 deletions packages/client/lib/service/txpool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import EventEmitter from 'events'
import { Config } from '../config'
import { Peer } from '../net/peer'
import { EthProtocolMethods } from '../net/protocol'
// import { TransactionFactory } from '@ethereumjs/tx'
import { TransactionFactory } from '@ethereumjs/tx'

export interface TxPoolOptions {
/* Config */
Expand All @@ -24,6 +24,23 @@ export class TxPool extends EventEmitter {

private opened: boolean

/**
* List of pending tx hashes to avoid double requests
*/
private pending: string[] = []

/**
* List of pooled tx hashes
*/
private pooled: string[] = []

/**
* List of outdated tx hashes
*
* Simple FIFO list to avoid double requests on outdated txs
*/
private outdated: string[] = []

/**
* Create new tx pool
* @param options constructor parameters
Expand Down Expand Up @@ -62,13 +79,35 @@ export class TxPool extends EventEmitter {
async announced(txHashes: Buffer[], peer: Peer) {
if (txHashes.length) {
this.config.logger.info(`Tx pool: received new pooled tx hashes number=${txHashes.length}`)
const txs = await (peer!.eth as EthProtocolMethods).getPooledTransactions(
txHashes.slice(0, TX_RETRIEVAL_LIMIT)
)
this.config.logger.info(`Tx pool: received txs number=${txs.length}`)
/*for (const txData of txs) {
const tx = TransactionFactory.fromBlockBodyData(txData)
}*/

const reqHashes = []
for (const txHash of txHashes) {
const txHashStr = txHash.toString('hex')
if (txHashStr in this.pending || txHashStr in this.pooled || txHashStr in this.outdated) {
continue
}
reqHashes.push(txHash)
}

if (reqHashes.length > 0) {
const reqHashesStr = reqHashes.map((hash) => hash.toString('hex'))
this.pending.concat(reqHashesStr)
//console.log(`txHashes: ${txHashes.length} reqHashes: ${reqHashes.length} pending: ${this.pending.length}`)
const txsResult = await (peer!.eth as EthProtocolMethods).getPooledTransactions({
hashes: reqHashes.slice(0, TX_RETRIEVAL_LIMIT),
})

// Remove from pending list regardless if tx is in result
for (const reqHashStr of reqHashesStr) {
this.pending = this.pending.filter((hash) => hash !== reqHashStr)
}

this.config.logger.info(`Tx pool: received txs number=${txsResult.length}`)
for (const txData of txsResult[1]) {
const tx = TransactionFactory.fromBlockBodyData(txData)
this.config.logger.info(`Nonce: ${tx.nonce}`)
}
}
}
}

Expand Down

0 comments on commit d222e88

Please sign in to comment.