Skip to content

Commit

Permalink
Merge pull request #26 from ethereumjs/micah
Browse files Browse the repository at this point in the history
Changes logs added/removed callbacks to receive an array of all logs in a particular block.
  • Loading branch information
epheph authored Nov 7, 2018
2 parents 2576905 + 7ac6517 commit 4466c9b
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 85 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ethereumjs-blockstream",
"version": "6.0.1",
"version": "7.0.0",
"description": "A library to turn an unreliable remote source of Ethereum blocks into a reliable stream of blocks with removals on re-orgs and backfills on skips.",
"main": "output/source/index.js",
"types": "output/source/index.d.ts",
Expand Down
46 changes: 23 additions & 23 deletions source/block-and-log-streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
private readonly logFilters: { [propName: string]: Filter } = {}
private readonly onBlockAddedSubscribers: { [propName: string]: (block: TBlock) => void } = {};
private readonly onBlockRemovedSubscribers: { [propName: string]: (block: TBlock) => void } = {};
private readonly onLogAddedSubscribers: { [propName: string]: (log: TLog) => void } = {};
private readonly onLogRemovedSubscribers: { [propName: string]: (log: TLog) => void } = {};
private readonly onLogsAddedSubscribers: { [propName: string]: (blockHash: string, logs: Array<TLog>) => void } = {};
private readonly onLogsRemovedSubscribers: { [propName: string]: (blockHash: string, logs: Array<TLog>) => void } = {};

/**
* @param getBlockByHash async function that returns a block given a particular hash or null/throws if the block is not found
Expand Down Expand Up @@ -81,12 +81,12 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
.forEach(callback => this.pendingCallbacks.push(() => callback(block)));

const logFilters = Object.keys(this.logFilters).map(key => this.logFilters[key]);
this.logHistory = reconcileLogHistoryWithAddedBlock(this.getLogs, this.logHistory, block, this.onLogAdded, logFilters, this.blockRetention);
this.logHistory = reconcileLogHistoryWithAddedBlock(this.getLogs, this.logHistory, block, this.onLogsAdded, logFilters, this.blockRetention);
await this.logHistory;
};

private readonly onBlockRemoved = async (block: TBlock): Promise<void> => {
this.logHistory = reconcileLogHistoryWithRemovedBlock(this.logHistory, block, this.onLogRemoved);
this.logHistory = reconcileLogHistoryWithRemovedBlock(this.logHistory, block, this.onLogsRemoved);
await this.logHistory;

Object.keys(this.onBlockRemovedSubscribers)
Expand All @@ -95,18 +95,18 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
.forEach(callback => this.pendingCallbacks.push(() => callback(block)));
};

private readonly onLogAdded = async (log: TLog): Promise<void> => {
Object.keys(this.onLogAddedSubscribers)
.map((key: string) => this.onLogAddedSubscribers[key])
private readonly onLogsAdded = async (blockHash: string, logs: Array<TLog>): Promise<void> => {
Object.keys(this.onLogsAddedSubscribers)
.map((key: string) => this.onLogsAddedSubscribers[key])
.map(callback => logAndSwallowWrapper(callback, this.onError))
.forEach(callback => this.pendingCallbacks.push(() => callback(log)));
.forEach(callback => this.pendingCallbacks.push(() => callback(blockHash, logs)));
};

private readonly onLogRemoved = async (log: TLog): Promise<void> => {
Object.keys(this.onLogRemovedSubscribers)
.map((key: string) => this.onLogRemovedSubscribers[key])
private readonly onLogsRemoved = async (blockHash: string, logs: Array<TLog>): Promise<void> => {
Object.keys(this.onLogsRemovedSubscribers)
.map((key: string) => this.onLogsRemovedSubscribers[key])
.map(callback => logAndSwallowWrapper(callback, this.onError))
.forEach(callback => this.pendingCallbacks.push(() => callback(log)));
.forEach(callback => this.pendingCallbacks.push(() => callback(blockHash, logs)));
};


Expand Down Expand Up @@ -151,34 +151,34 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
};


public readonly subscribeToOnLogAdded = (onLogAdded: (log: TLog) => void): string => {
public readonly subscribeToOnLogsAdded = (onLogsAdded: (blockHash: string, logs: Array<TLog>) => void): string => {
const uuid = `on log added token ${createUuid()}`;
this.onLogAddedSubscribers[uuid] = onLogAdded;
this.onLogsAddedSubscribers[uuid] = onLogsAdded;
return uuid;
};

public readonly unsubscribeFromOnLogAdded = (token: string) => {
public readonly unsubscribeFromOnLogsAdded = (token: string) => {
if (!token.startsWith("on log added token ")) throw new Error(`Expected a log added subscription token. Actual: ${token}`);
delete this.onLogAddedSubscribers[token];
delete this.onLogsAddedSubscribers[token];
};


public readonly subscribeToOnLogRemoved = (onLogRemoved: (log: TLog) => void): string => {
public readonly subscribeToOnLogsRemoved = (onLogsRemoved: (blockHash: string, logs: Array<TLog>) => void): string => {
const uuid = `on log removed token ${createUuid()}`;
this.onLogRemovedSubscribers[uuid] = onLogRemoved;
this.onLogsRemovedSubscribers[uuid] = onLogsRemoved;
return uuid;
};

public readonly unsubscribeFromOnLogRemoved = (token: string) => {
public readonly unsubscribeFromOnLogsRemoved = (token: string) => {
if (!token.startsWith("on log removed token ")) throw new Error(`Expected a log added subscription token. Actual: ${token}`);
delete this.onLogRemovedSubscribers[token];
delete this.onLogsRemovedSubscribers[token];
};
}

function logAndSwallowWrapper<T>(callback: (arg: T) => void, onError: (error: Error) => void): (arg: T) => void {
return function (parameter) {
function logAndSwallowWrapper<T, U>(callback: (arg1?: T, arg2?: U) => void, onError: (error: Error) => void): (arg1?: T, arg2?: U) => void {
return function (parameter1, parameter2) {
try {
callback(parameter);
callback(parameter1, parameter2);
} catch (error) {
onError(error);
}
Expand Down
25 changes: 12 additions & 13 deletions source/log-reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ export const reconcileLogHistoryWithAddedBlock = async <TBlock extends Block, TL
getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>,
logHistory: LogHistory<TLog> | Promise<LogHistory<TLog>>,
newBlock: TBlock,
onLogAdded: (log: TLog) => Promise<void>,
onLogsAdded: (blockHash: string, logs: Array<TLog>) => Promise<void>,
filters: Filter[] = [],
historyBlockLength: number = 100,
): Promise<LogHistory<TLog>> => {
logHistory = await logHistory;
const logs = await getFilteredLogs(getLogs, newBlock, filters);
logHistory = await addNewLogsToHead(logHistory, logs, onLogAdded);
logHistory = await addNewLogsToHead(newBlock.hash, logHistory, logs, onLogsAdded);
logHistory = await pruneOldLogs(logHistory, newBlock, historyBlockLength);
return logHistory;
}
Expand All @@ -27,14 +27,18 @@ const getFilteredLogs = async <TBlock extends Block, TLog extends Log>(getLogs:
return nestedLogs.reduce((allLogs, logs) => allLogs.concat(logs), []);
}

const addNewLogsToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, newLogs: Array<TLog>, onLogAdded: (log: TLog) => Promise<void>): Promise<LogHistory<TLog>> => {
const addNewLogsToHead = async <TLog extends Log>(blockHash: string, logHistory: LogHistory<TLog>, newLogs: Array<TLog>, onLogsAdded: (blockHash: string, logs: Array<TLog>) => Promise<void>): Promise<LogHistory<TLog>> => {
const sortedLogs = newLogs.sort((logA, logB) => parseHexInt(logA.logIndex) - parseHexInt(logB.logIndex));
const addedLogs: Array<TLog> = []
for (const logToAdd of sortedLogs) {
// we may already have this log because two filters can return the same log
if (logHistory.some(logInHistory => logInHistory!.blockHash === logToAdd.blockHash && logInHistory!.logIndex === logToAdd.logIndex)) continue;
ensureOrder(logHistory.last(), logToAdd);
logHistory = await addNewLogToHead(logHistory, logToAdd, onLogAdded);
logHistory = logHistory.push(logToAdd)
addedLogs.push(logToAdd)
}
// CONSIDER: the user getting this notification won't have any visibility into the updated log history yet. should we announce new logs in a `setTimeout`? should we provide log history with new logs?
await onLogsAdded(blockHash, addedLogs)
return logHistory;
}

Expand All @@ -43,13 +47,6 @@ const pruneOldLogs = async <TBlock extends Block, TLog extends Log>(logHistory:
return logHistory.skipUntil(log => parseHexInt(newBlock.number) - parseHexInt(log!.blockNumber) < historyBlockLength).toList();
}

const addNewLogToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, newLog: TLog, onLogAdded: (log: TLog) => Promise<void>): Promise<LogHistory<TLog>> => {
logHistory = logHistory.push(newLog);
// CONSIDER: the user getting this notification won't have any visibility into the updated log history yet. should we announce new logs in a `setTimeout`? should we provide log history with new logs?
await onLogAdded(newLog);
return logHistory;
}

const ensureOrder = <TLog extends Log>(headLog: TLog | undefined, newLog: TLog) => {
if (headLog === undefined) return;
const headBlockNumber = parseHexInt(headLog.blockNumber);
Expand All @@ -64,14 +61,16 @@ const ensureOrder = <TLog extends Log>(headLog: TLog | undefined, newLog: TLog)
export const reconcileLogHistoryWithRemovedBlock = async <TBlock extends Block, TLog extends Log>(
logHistory: LogHistory<TLog>|Promise<LogHistory<TLog>>,
removedBlock: TBlock,
onLogRemoved: (log: TLog) => Promise<void>,
onLogsRemoved: (blockHash: string, logs: Array<TLog>) => Promise<void>,
): Promise<LogHistory<TLog>> => {
logHistory = await logHistory;

const removedLogs = []
while (!logHistory.isEmpty() && logHistory.last().blockHash === removedBlock.hash) {
await onLogRemoved(logHistory.last());
removedLogs.push(logHistory.last());
logHistory = logHistory.pop();
}
await onLogsRemoved(removedBlock.hash, removedLogs);

// sanity check, no known way to trigger the error
if (logHistory.some(log => log!.blockHash === removedBlock.hash)) throw new Error("found logs for removed block not at head of log history");
Expand Down
Loading

0 comments on commit 4466c9b

Please sign in to comment.