Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes logs added/removed callbacks to receive an array of all logs in a particular block. #26

Merged
merged 1 commit into from
Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ethereumjs-blockstream",
"version": "6.0.1",
"version": "7.0.0",
"description": "A library to turn an unreliable remote source of Ethereum blocks into a reliable stream of blocks with removals on re-orgs and backfills on skips.",
"main": "output/source/index.js",
"types": "output/source/index.d.ts",
Expand Down
46 changes: 23 additions & 23 deletions source/block-and-log-streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
private readonly logFilters: { [propName: string]: Filter } = {}
private readonly onBlockAddedSubscribers: { [propName: string]: (block: TBlock) => void } = {};
private readonly onBlockRemovedSubscribers: { [propName: string]: (block: TBlock) => void } = {};
private readonly onLogAddedSubscribers: { [propName: string]: (log: TLog) => void } = {};
private readonly onLogRemovedSubscribers: { [propName: string]: (log: TLog) => void } = {};
private readonly onLogsAddedSubscribers: { [propName: string]: (blockHash: string, logs: Array<TLog>) => void } = {};
private readonly onLogsRemovedSubscribers: { [propName: string]: (blockHash: string, logs: Array<TLog>) => void } = {};

/**
* @param getBlockByHash async function that returns a block given a particular hash or null/throws if the block is not found
Expand Down Expand Up @@ -81,12 +81,12 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
.forEach(callback => this.pendingCallbacks.push(() => callback(block)));

const logFilters = Object.keys(this.logFilters).map(key => this.logFilters[key]);
this.logHistory = reconcileLogHistoryWithAddedBlock(this.getLogs, this.logHistory, block, this.onLogAdded, logFilters, this.blockRetention);
this.logHistory = reconcileLogHistoryWithAddedBlock(this.getLogs, this.logHistory, block, this.onLogsAdded, logFilters, this.blockRetention);
await this.logHistory;
};

private readonly onBlockRemoved = async (block: TBlock): Promise<void> => {
this.logHistory = reconcileLogHistoryWithRemovedBlock(this.logHistory, block, this.onLogRemoved);
this.logHistory = reconcileLogHistoryWithRemovedBlock(this.logHistory, block, this.onLogsRemoved);
await this.logHistory;

Object.keys(this.onBlockRemovedSubscribers)
Expand All @@ -95,18 +95,18 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
.forEach(callback => this.pendingCallbacks.push(() => callback(block)));
};

private readonly onLogAdded = async (log: TLog): Promise<void> => {
Object.keys(this.onLogAddedSubscribers)
.map((key: string) => this.onLogAddedSubscribers[key])
private readonly onLogsAdded = async (blockHash: string, logs: Array<TLog>): Promise<void> => {
Object.keys(this.onLogsAddedSubscribers)
.map((key: string) => this.onLogsAddedSubscribers[key])
.map(callback => logAndSwallowWrapper(callback, this.onError))
.forEach(callback => this.pendingCallbacks.push(() => callback(log)));
.forEach(callback => this.pendingCallbacks.push(() => callback(blockHash, logs)));
};

private readonly onLogRemoved = async (log: TLog): Promise<void> => {
Object.keys(this.onLogRemovedSubscribers)
.map((key: string) => this.onLogRemovedSubscribers[key])
private readonly onLogsRemoved = async (blockHash: string, logs: Array<TLog>): Promise<void> => {
Object.keys(this.onLogsRemovedSubscribers)
.map((key: string) => this.onLogsRemovedSubscribers[key])
.map(callback => logAndSwallowWrapper(callback, this.onError))
.forEach(callback => this.pendingCallbacks.push(() => callback(log)));
.forEach(callback => this.pendingCallbacks.push(() => callback(blockHash, logs)));
};


Expand Down Expand Up @@ -151,34 +151,34 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
};


public readonly subscribeToOnLogAdded = (onLogAdded: (log: TLog) => void): string => {
public readonly subscribeToOnLogsAdded = (onLogsAdded: (blockHash: string, logs: Array<TLog>) => void): string => {
const uuid = `on log added token ${createUuid()}`;
this.onLogAddedSubscribers[uuid] = onLogAdded;
this.onLogsAddedSubscribers[uuid] = onLogsAdded;
return uuid;
};

public readonly unsubscribeFromOnLogAdded = (token: string) => {
public readonly unsubscribeFromOnLogsAdded = (token: string) => {
if (!token.startsWith("on log added token ")) throw new Error(`Expected a log added subscription token. Actual: ${token}`);
delete this.onLogAddedSubscribers[token];
delete this.onLogsAddedSubscribers[token];
};


public readonly subscribeToOnLogRemoved = (onLogRemoved: (log: TLog) => void): string => {
public readonly subscribeToOnLogsRemoved = (onLogsRemoved: (blockHash: string, logs: Array<TLog>) => void): string => {
const uuid = `on log removed token ${createUuid()}`;
this.onLogRemovedSubscribers[uuid] = onLogRemoved;
this.onLogsRemovedSubscribers[uuid] = onLogsRemoved;
return uuid;
};

public readonly unsubscribeFromOnLogRemoved = (token: string) => {
public readonly unsubscribeFromOnLogsRemoved = (token: string) => {
if (!token.startsWith("on log removed token ")) throw new Error(`Expected a log added subscription token. Actual: ${token}`);
delete this.onLogRemovedSubscribers[token];
delete this.onLogsRemovedSubscribers[token];
};
}

function logAndSwallowWrapper<T>(callback: (arg: T) => void, onError: (error: Error) => void): (arg: T) => void {
return function (parameter) {
function logAndSwallowWrapper<T, U>(callback: (arg1?: T, arg2?: U) => void, onError: (error: Error) => void): (arg1?: T, arg2?: U) => void {
return function (parameter1, parameter2) {
try {
callback(parameter);
callback(parameter1, parameter2);
} catch (error) {
onError(error);
}
Expand Down
25 changes: 12 additions & 13 deletions source/log-reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ export const reconcileLogHistoryWithAddedBlock = async <TBlock extends Block, TL
getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>,
logHistory: LogHistory<TLog> | Promise<LogHistory<TLog>>,
newBlock: TBlock,
onLogAdded: (log: TLog) => Promise<void>,
onLogsAdded: (blockHash: string, logs: Array<TLog>) => Promise<void>,
filters: Filter[] = [],
historyBlockLength: number = 100,
): Promise<LogHistory<TLog>> => {
logHistory = await logHistory;
const logs = await getFilteredLogs(getLogs, newBlock, filters);
logHistory = await addNewLogsToHead(logHistory, logs, onLogAdded);
logHistory = await addNewLogsToHead(newBlock.hash, logHistory, logs, onLogsAdded);
logHistory = await pruneOldLogs(logHistory, newBlock, historyBlockLength);
return logHistory;
}
Expand All @@ -27,14 +27,18 @@ const getFilteredLogs = async <TBlock extends Block, TLog extends Log>(getLogs:
return nestedLogs.reduce((allLogs, logs) => allLogs.concat(logs), []);
}

const addNewLogsToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, newLogs: Array<TLog>, onLogAdded: (log: TLog) => Promise<void>): Promise<LogHistory<TLog>> => {
const addNewLogsToHead = async <TLog extends Log>(blockHash: string, logHistory: LogHistory<TLog>, newLogs: Array<TLog>, onLogsAdded: (blockHash: string, logs: Array<TLog>) => Promise<void>): Promise<LogHistory<TLog>> => {
const sortedLogs = newLogs.sort((logA, logB) => parseHexInt(logA.logIndex) - parseHexInt(logB.logIndex));
const addedLogs: Array<TLog> = []
for (const logToAdd of sortedLogs) {
// we may already have this log because two filters can return the same log
if (logHistory.some(logInHistory => logInHistory!.blockHash === logToAdd.blockHash && logInHistory!.logIndex === logToAdd.logIndex)) continue;
ensureOrder(logHistory.last(), logToAdd);
logHistory = await addNewLogToHead(logHistory, logToAdd, onLogAdded);
logHistory = logHistory.push(logToAdd)
addedLogs.push(logToAdd)
}
// CONSIDER: the user getting this notification won't have any visibility into the updated log history yet. should we announce new logs in a `setTimeout`? should we provide log history with new logs?
await onLogsAdded(blockHash, addedLogs)
return logHistory;
}

Expand All @@ -43,13 +47,6 @@ const pruneOldLogs = async <TBlock extends Block, TLog extends Log>(logHistory:
return logHistory.skipUntil(log => parseHexInt(newBlock.number) - parseHexInt(log!.blockNumber) < historyBlockLength).toList();
}

const addNewLogToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, newLog: TLog, onLogAdded: (log: TLog) => Promise<void>): Promise<LogHistory<TLog>> => {
logHistory = logHistory.push(newLog);
// CONSIDER: the user getting this notification won't have any visibility into the updated log history yet. should we announce new logs in a `setTimeout`? should we provide log history with new logs?
await onLogAdded(newLog);
return logHistory;
}

const ensureOrder = <TLog extends Log>(headLog: TLog | undefined, newLog: TLog) => {
if (headLog === undefined) return;
const headBlockNumber = parseHexInt(headLog.blockNumber);
Expand All @@ -64,14 +61,16 @@ const ensureOrder = <TLog extends Log>(headLog: TLog | undefined, newLog: TLog)
export const reconcileLogHistoryWithRemovedBlock = async <TBlock extends Block, TLog extends Log>(
logHistory: LogHistory<TLog>|Promise<LogHistory<TLog>>,
removedBlock: TBlock,
onLogRemoved: (log: TLog) => Promise<void>,
onLogsRemoved: (blockHash: string, logs: Array<TLog>) => Promise<void>,
): Promise<LogHistory<TLog>> => {
logHistory = await logHistory;

const removedLogs = []
while (!logHistory.isEmpty() && logHistory.last().blockHash === removedBlock.hash) {
await onLogRemoved(logHistory.last());
removedLogs.push(logHistory.last());
logHistory = logHistory.pop();
}
await onLogsRemoved(removedBlock.hash, removedLogs);

// sanity check, no known way to trigger the error
if (logHistory.some(log => log!.blockHash === removedBlock.hash)) throw new Error("found logs for removed block not at head of log history");
Expand Down
Loading