Skip to content

Commit

Permalink
Fixes bug where an Ethereum node giving bad answers could break things.
Browse files Browse the repository at this point in the history
This changes the way we announce block/log additions/removals so that we wait until we are sure we can fully synch with the node before doing any announcements.  This allows us to rollback when the Ethereum node gives us something weird in response such as a claim that a parent block doesn't exist, or logs that don't line up with the expected block.

No longer does console.log when an error occurs, instead calls the optional user provided `errorCallback`.  This cleans up the tests, and also allows the user to do something more useful with errors like track them.

Removes deprecated ES5 callback stuff.
  • Loading branch information
MicahZoltu committed Jun 14, 2018
1 parent 964e6cf commit d793329
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 239 deletions.
92 changes: 39 additions & 53 deletions source/block-and-log-streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ import { List as ImmutableList } from "immutable";
import * as createUuid from "uuid";

export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
private blockHistory: Promise<BlockHistory<TBlock>> = Promise.resolve(ImmutableList<TBlock>());
private logHistory: Promise<LogHistory<TLog>> = Promise.resolve(ImmutableList<TLog>());
private latestBlock: TBlock | null = null;
private lastKnownGoodBlockHistory: BlockHistory<TBlock> = ImmutableList<TBlock>();
private blockHistory: Promise<BlockHistory<TBlock>> = Promise.resolve(this.lastKnownGoodBlockHistory);
private lastKnownGoodLogHistory: LogHistory<TLog> = ImmutableList<TLog>();
private logHistory: Promise<LogHistory<TLog>> = Promise.resolve(this.lastKnownGoodLogHistory);
private pendingCallbacks: Array<() => void> = [];

private readonly blockRetention: number;

private readonly getBlockByHash: (hash: string) => Promise<TBlock | null>;
private readonly getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>;
private readonly errorCallback: (error: any) => void = () => {};

private readonly logFilters: { [propName: string]: Filter } = {}
private readonly onBlockAddedSubscribers: { [propName: string]: (block: TBlock) => void } = {};
Expand All @@ -28,88 +31,71 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
constructor(
getBlockByHash: (hash: string) => Promise<TBlock | null>,
getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>,
configuration?: { blockRetention?: number },
configuration?: { blockRetention?: number, errorCallback?: (error: any) => void },
) {
this.getBlockByHash = getBlockByHash;
this.getLogs = getLogs;
this.errorCallback = (configuration && configuration.errorCallback) ? configuration.errorCallback : () => {};
this.blockRetention = (configuration && configuration.blockRetention) ? configuration.blockRetention : 100;
}

static createCallbackStyle = <TBlock extends Block, TLog extends Log>(
getBlockByHash: (hash: string, callback: (error?: Error, block?: TBlock | null) => void) => void,
getLogs: (filterOptions: FilterOptions, callback: (error?: Error, logs?: TLog[]) => void) => void,
configuration?: { blockRetention?: number },
): BlockAndLogStreamer<TBlock, TLog> => {
console.warn(`Deprecation Warning: The callback interface for ethereumjs-blockstream is deprecated and will be removed in a future version of this library. Use BlockAndLogStreamer constructor instead.`);
const wrappedGetBlockByHash = (hash: string): Promise<TBlock | null> => {
return new Promise<TBlock | null>((resolve, reject) => {
getBlockByHash(hash, (error, block) => {
if (error) throw error;
else resolve(block);
});
});
};
const wrappedGetLogs = (filterOptions: FilterOptions): Promise<Array<TLog>> => new Promise<Array<TLog>>((resolve, reject) => {
getLogs(filterOptions, (error, logs) => {
if (error) throw error;
if (!logs) throw new Error("Received null/undefined logs and no error.");
resolve(logs);
});
});
return new BlockAndLogStreamer<TBlock, TLog>(wrappedGetBlockByHash, wrappedGetLogs, configuration);
}

public readonly reconcileNewBlock = async (block: TBlock): Promise<void> => {
this.blockHistory = reconcileBlockHistory(this.getBlockByHash, this.blockHistory, block, this.onBlockAdded, this.onBlockRemoved, this.blockRetention);
const blockHistory = await this.blockHistory;
this.latestBlock = blockHistory.last();
};

public readonly reconcileNewBlockCallbackStyle = async (block: TBlock, callback: (error?: Error) => void): Promise<void> => {
console.warn(`Deprecation Warning: The callback interface for ethereumjs-blockstream is deprecated and will be removed in a future version of this library. Use BlockAndLogStreamer.reconcileNewBlock instead.`);
this.reconcileNewBlock(block)
.then(() => callback(undefined))
.catch(error => callback(error));
try {
this.blockHistory = reconcileBlockHistory(this.getBlockByHash, this.blockHistory, block, this.onBlockAdded, this.onBlockRemoved, this.blockRetention);
const blockHistory = await this.blockHistory;
const logHistory = await this.logHistory;
// everything reconciled correctly, checkpoint state
this.lastKnownGoodBlockHistory = blockHistory;
this.lastKnownGoodLogHistory = logHistory;
this.pendingCallbacks.forEach(callback => callback());
this.pendingCallbacks = [];
} catch (error) {
// something went wrong, rollback to last checkpoint
this.blockHistory = Promise.resolve(this.lastKnownGoodBlockHistory);
this.logHistory = Promise.resolve(this.lastKnownGoodLogHistory);
this.pendingCallbacks = [];
this.errorCallback(error);
}
};

private readonly onBlockAdded = async (block: TBlock): Promise<void> => {
Object.keys(this.onBlockAddedSubscribers)
.map((key: string) => this.onBlockAddedSubscribers[key])
.map(callback => logAndSwallowWrapper(callback, this.errorCallback))
.forEach(callback => this.pendingCallbacks.push(() => callback(block)));

const logFilters = Object.keys(this.logFilters).map(key => this.logFilters[key]);
this.logHistory = reconcileLogHistoryWithAddedBlock(this.getLogs, this.logHistory, block, this.onLogAdded, logFilters, this.blockRetention);

await this.logHistory;
Object.keys(this.onBlockAddedSubscribers)
.map((key: string) => this.onBlockAddedSubscribers[key])
.map(callback => logAndSwallowWrapper(callback))
.forEach(callback => callback(block));
};

private readonly onBlockRemoved = async (block: TBlock): Promise<void> => {
this.logHistory = reconcileLogHistoryWithRemovedBlock(this.logHistory, block, this.onLogRemoved);

await this.logHistory;

Object.keys(this.onBlockRemovedSubscribers)
.map((key: string) => this.onBlockRemovedSubscribers[key])
.map(callback => logAndSwallowWrapper(callback))
.forEach(callback => callback(block));
.map(callback => logAndSwallowWrapper(callback, this.errorCallback))
.forEach(callback => this.pendingCallbacks.push(() => callback(block)));
};

private readonly onLogAdded = async (log: TLog): Promise<void> => {
Object.keys(this.onLogAddedSubscribers)
.map((key: string) => this.onLogAddedSubscribers[key])
.map(callback => logAndSwallowWrapper(callback))
.forEach(callback => callback(log));
.map(callback => logAndSwallowWrapper(callback, this.errorCallback))
.forEach(callback => this.pendingCallbacks.push(() => callback(log)));
};

private readonly onLogRemoved = async (log: TLog): Promise<void> => {
Object.keys(this.onLogRemovedSubscribers)
.map((key: string) => this.onLogRemovedSubscribers[key])
.map(callback => logAndSwallowWrapper(callback))
.forEach(callback => callback(log));
.map(callback => logAndSwallowWrapper(callback, this.errorCallback))
.forEach(callback => this.pendingCallbacks.push(() => callback(log)));
};


public readonly getLatestReconciledBlock = (): TBlock | null => {
return this.latestBlock;
return this.lastKnownGoodBlockHistory.isEmpty() ? null : this.lastKnownGoodBlockHistory.last();
};


Expand Down Expand Up @@ -173,12 +159,12 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
};
}

function logAndSwallowWrapper<T>(callback: (arg: T) => void): (arg: T) => void {
function logAndSwallowWrapper<T>(callback: (arg: T) => void, onError: (error: any) => void): (arg: T) => void {
return function (parameter) {
try {
callback(parameter);
} catch (error) {
console.log(error);
onError(error);
}
};
}
9 changes: 7 additions & 2 deletions source/log-reconciler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Block } from "./models/block";
import { BlockHistory } from "./models/block-history";
import { Log } from "./models/log";
import { Filter, FilterOptions } from "./models/filters";
import { LogHistory } from "./models/log-history";
Expand All @@ -15,10 +14,10 @@ export const reconcileLogHistoryWithAddedBlock = async <TBlock extends Block, TL
): Promise<LogHistory<TLog>> => {
logHistory = await logHistory;
const logs = await getFilteredLogs(getLogs, newBlock, filters);
ensureBlockhash(newBlock, logs);
logHistory = await addNewLogsToHead(logHistory, logs, onLogAdded);
logHistory = await pruneOldLogs(logHistory, newBlock, historyBlockLength);
return logHistory;
// TODO: validate logs are part of expected block hash
}

const getFilteredLogs = async <TBlock extends Block, TLog extends Log>(getLogs: (filterOptions: FilterOptions) => Promise<Array<TLog>>, newBlock: TBlock, filters: Array<Filter>): Promise<Array<TLog>> => {
Expand Down Expand Up @@ -63,6 +62,12 @@ const ensureOrder = <TLog extends Log>(headLog: TLog | undefined, newLog: TLog)
if (headLogIndex >= newLogIndex) throw new Error(`received log with same block number (${newLogBlockNumber}) but index (${newLogIndex}) is the same or older than previous index (${headLogIndex})`);
}

const ensureBlockhash = <TBlock extends Block, TLog extends Log>(block: TBlock, logs: Array<TLog>) => {
logs.forEach(log => {
if (log.blockHash !== block.hash) throw new Error(`Received log for block hash ${log.blockHash} when asking for logs of block ${block.hash}.`);
});
}

export const reconcileLogHistoryWithRemovedBlock = async <TBlock extends Block, TLog extends Log>(
logHistory: LogHistory<TLog>|Promise<LogHistory<TLog>>,
removedBlock: TBlock,
Expand Down
118 changes: 0 additions & 118 deletions tests/es5.js

This file was deleted.

3 changes: 2 additions & 1 deletion tests/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export function getLogsFactory(logsPerFilter: number, fork: string = "AAAA") {
const logs = [];
let logIndex = 0;
for (let i = 0; i < logsPerFilter; ++i) {
logs.push(new MockLog(parseInt(filterOptions.toBlock!, 16), logIndex++, fork));
const blockNumber = parseInt(filterOptions.toBlock!, 16);
logs.push(new MockLog(blockNumber, logIndex++, fork));
}
return logs;
};
Expand Down
Loading

0 comments on commit d793329

Please sign in to comment.