Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switches to using new eth_getLogs by blockHash RPC. #25

Merged
merged 3 commits into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ethereumjs-blockstream",
"version": "5.0.0",
"version": "6.0.1",
"description": "A library to turn an unreliable remote source of Ethereum blocks into a reliable stream of blocks with removals on re-orgs and backfills on skips.",
"main": "output/source/index.js",
"types": "output/source/index.d.ts",
Expand Down
5 changes: 3 additions & 2 deletions source/block-reconciler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Block } from "./models/block";
import { BlockHistory } from "./models/block-history";
import { parseHexInt } from "./utilities";
import { List as ImmutableList } from "immutable";

type GetBlockByHash<TBlock> = (hash: string) => Promise<TBlock|null>;
Expand Down Expand Up @@ -50,7 +51,7 @@ const backfill = async <TBlock extends Block>(getBlockByHash: GetBlockByHash<TBl
return await rollback(blockHistory, onBlockRemoved);
const parentBlock = await getBlockByHash(newBlock.parentHash);
if (parentBlock === null) throw new Error("Failed to fetch parent block.");
if (parseInt(parentBlock.number, 16) + blockRetention < parseInt(blockHistory.last().number, 16))
if (parseHexInt(parentBlock.number) + blockRetention < parseHexInt(blockHistory.last().number))
return await rollback(blockHistory, onBlockRemoved);
blockHistory = await reconcileBlockHistory(getBlockByHash, blockHistory, parentBlock, onBlockAdded, onBlockRemoved, blockRetention);
return await reconcileBlockHistory(getBlockByHash, blockHistory, newBlock, onBlockAdded, onBlockRemoved, blockRetention);
Expand All @@ -77,7 +78,7 @@ const isFirstBlock = <TBlock extends Block>(blockHistory: BlockHistory<TBlock>):
}

const isOlderThanOldestBlock = <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, newBlock: TBlock): boolean => {
return parseInt(blockHistory.first().number, 16) > parseInt(newBlock.number, 16);
return parseHexInt(blockHistory.first().number) > parseHexInt(newBlock.number);
}

const isAlreadyInHistory = <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, newBlock: TBlock): boolean => {
Expand Down
24 changes: 8 additions & 16 deletions source/log-reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Block } from "./models/block";
import { Log } from "./models/log";
import { Filter, FilterOptions } from "./models/filters";
import { LogHistory } from "./models/log-history";
import { List as ImmutableList } from "immutable";
import { parseHexInt } from "./utilities";

export const reconcileLogHistoryWithAddedBlock = async <TBlock extends Block, TLog extends Log>(
getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>,
Expand All @@ -14,22 +14,21 @@ export const reconcileLogHistoryWithAddedBlock = async <TBlock extends Block, TL
): Promise<LogHistory<TLog>> => {
logHistory = await logHistory;
const logs = await getFilteredLogs(getLogs, newBlock, filters);
ensureBlockhash(newBlock, logs);
logHistory = await addNewLogsToHead(logHistory, logs, onLogAdded);
logHistory = await pruneOldLogs(logHistory, newBlock, historyBlockLength);
return logHistory;
}

const getFilteredLogs = async <TBlock extends Block, TLog extends Log>(getLogs: (filterOptions: FilterOptions) => Promise<Array<TLog>>, newBlock: TBlock, filters: Array<Filter>): Promise<Array<TLog>> => {
const logPromises = filters
.map(filter => ({ fromBlock: newBlock.number, toBlock: newBlock.number, address: filter.address, topics: filter.topics, }))
.map(filter => ({ blockHash: newBlock.hash, address: filter.address, topics: filter.topics, }))
.map(filter => getLogs(filter));
const nestedLogs = await Promise.all(logPromises);
return nestedLogs.reduce((allLogs, logs) => allLogs.concat(logs), []);
}

const addNewLogsToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, newLogs: Array<TLog>, onLogAdded: (log: TLog) => Promise<void>): Promise<LogHistory<TLog>> => {
const sortedLogs = newLogs.sort((logA, logB) => parseInt(logA.logIndex, 16) - parseInt(logB.logIndex, 16));
const sortedLogs = newLogs.sort((logA, logB) => parseHexInt(logA.logIndex) - parseHexInt(logB.logIndex));
for (const logToAdd of sortedLogs) {
// we may already have this log because two filters can return the same log
if (logHistory.some(logInHistory => logInHistory!.blockHash === logToAdd.blockHash && logInHistory!.logIndex === logToAdd.logIndex)) continue;
Expand All @@ -41,7 +40,7 @@ const addNewLogsToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>,

const pruneOldLogs = async <TBlock extends Block, TLog extends Log>(logHistory: LogHistory<TLog>, newBlock: TBlock, historyBlockLength: number): Promise<LogHistory<TLog>> => {
// `log!` is required until the next major version of `immutable` is published to NPM (current version 3.8.2) which improves the type definitions
return logHistory.skipUntil(log => parseInt(newBlock.number, 16) - parseInt(log!.blockNumber, 16) < historyBlockLength).toList();
return logHistory.skipUntil(log => parseHexInt(newBlock.number) - parseHexInt(log!.blockNumber) < historyBlockLength).toList();
}

const addNewLogToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, newLog: TLog, onLogAdded: (log: TLog) => Promise<void>): Promise<LogHistory<TLog>> => {
Expand All @@ -53,22 +52,15 @@ const addNewLogToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, n

const ensureOrder = <TLog extends Log>(headLog: TLog | undefined, newLog: TLog) => {
if (headLog === undefined) return;
const headBlockNumber = parseInt(headLog.blockNumber, 16);
const newLogBlockNumber = parseInt(newLog.blockNumber, 16);
const headBlockNumber = parseHexInt(headLog.blockNumber);
const newLogBlockNumber = parseHexInt(newLog.blockNumber);
if (headBlockNumber > newLogBlockNumber) throw new Error(`received log for a block (${newLogBlockNumber}) older than current head log's block (${headBlockNumber})`);
if (headBlockNumber !== newLogBlockNumber) return;
const headLogIndex = parseInt(headLog.logIndex, 16);
const newLogIndex = parseInt(newLog.logIndex, 16);
const headLogIndex = parseHexInt(headLog.logIndex);
const newLogIndex = parseHexInt(newLog.logIndex);
if (headLogIndex >= newLogIndex) throw new Error(`received log with same block number (${newLogBlockNumber}) but index (${newLogIndex}) is the same or older than previous index (${headLogIndex})`);
}

const ensureBlockhash = <TBlock extends Block, TLog extends Log>(block: TBlock, logs: Array<TLog>) => {
// FIXME: This technique for verifying we got the right logs will not work if there were no logs present in the block! This means it is possible to miss logs. Can be fixed once https://eips.ethereum.org/EIPS/eip-234 is implemented
logs.forEach(log => {
if (log.blockHash !== block.hash) throw new Error(`Received log for block hash ${log.blockHash} when asking for logs of block ${block.hash}.`);
});
}

export const reconcileLogHistoryWithRemovedBlock = async <TBlock extends Block, TLog extends Log>(
logHistory: LogHistory<TLog>|Promise<LogHistory<TLog>>,
removedBlock: TBlock,
Expand Down
3 changes: 1 addition & 2 deletions source/models/filters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ export interface Filter {
}

export interface FilterOptions extends Filter {
readonly fromBlock?: string;
readonly toBlock?: string;
readonly blockHash: string
}
5 changes: 5 additions & 0 deletions source/utilities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const parseHexInt = (value: string) => {
const result = Number.parseInt(value, 16);
if (!Number.isFinite(result)) throw new Error(`${value} is not a hex encoded integer, parsing returned ${result}.`);
return result;
}
12 changes: 6 additions & 6 deletions tests/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ export function getLogsFactory(logsPerFilter: number, fork: string = "AAAA") {
const logs = [];
let logIndex = 0;
for (let i = 0; i < logsPerFilter; ++i) {
const blockNumber = parseInt(filterOptions.toBlock!, 16);
logs.push(new MockLog(blockNumber, logIndex++, fork));
const blockHash = filterOptions.blockHash;
logs.push(new MockLog(blockHash, logIndex++, fork));
}
return logs;
};
Expand Down Expand Up @@ -71,10 +71,10 @@ export class MockLog implements Log {
readonly data: string = "0x0000000000000000000000000000000000000000000000000000000000000000";
readonly topics: string[] = [];

constructor(blockNumber: number, logIndex: number = 0x0, fork: string = "AAAA") {
const blockNumberAsHex = blockNumber.toString(16);
this.blockNumber = "0x" + blockNumberAsHex;
this.blockHash = `0xbl0cbl0cbl0cbl0cbl0cbl0cbl0cbl0cbl0cbl0cbl0cbl0cbl0cbl0c${fork}${("0000" + blockNumberAsHex).substring(blockNumberAsHex.length)}`;
constructor(blockHash: string, logIndex: number = 0x0, fork: string = "AAAA") {
const blockNumber = parseInt(blockHash.substring(62), 16);
this.blockNumber = `0x${blockNumber.toString(16)}`;
this.blockHash = blockHash;
this.logIndex = `0x${logIndex.toString(16)}`;
this.transactionIndex = this.logIndex;
}
Expand Down
Loading