Skip to content

Commit

Permalink
Merge pull request #13 from ethereumjs/micah
Browse files Browse the repository at this point in the history
Adds support for deduping logs.
  • Loading branch information
MicahZoltu authored Feb 19, 2018
2 parents 4b94e5a + cd13bca commit 6580081
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 15 deletions.
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ethereumjs-blockstream",
"version": "3.1.0",
"version": "4.0.0",
"description": "A library to turn an unreliable remote source of Ethereum blocks into a reliable stream of blocks with removals on re-orgs and backfills on skips.",
"main": "output/source/index.js",
"types": "output/source/index.d.ts",
Expand Down
16 changes: 9 additions & 7 deletions source/log-reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ const getFilteredLogs = async <TBlock extends Block, TLog extends Log>(getLogs:
const logPromises = filters
.map(filter => ({ fromBlock: newBlock.number, toBlock: newBlock.number, address: filter.address, topics: filter.topics, }))
.map(filter => getLogs(filter));
return await Promise.all(logPromises)
.then(nestedLogs => nestedLogs.reduce((allLogs, logs) => allLogs.concat(logs), []));
const nestedLogs = await Promise.all(logPromises);
return nestedLogs.reduce((allLogs, logs) => allLogs.concat(logs), []);
}

const addNewLogsToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, newLogs: Array<TLog>, onLogAdded: (log: TLog) => Promise<void>): Promise<LogHistory<TLog>> => {
const sortedLogs = newLogs.sort((logA, logB) => parseInt(logA.logIndex, 16) - parseInt(logB.logIndex, 16));
for (const log of sortedLogs) {
ensureOrder(logHistory.last(), log);
logHistory = await addNewLogToHead(logHistory, log, onLogAdded);
for (const logToAdd of sortedLogs) {
// we may already have this log because two filters can return the same log
if (logHistory.some(logInHistory => logInHistory!.blockHash === logToAdd.blockHash && logInHistory!.logIndex === logToAdd.logIndex)) continue;
ensureOrder(logHistory.last(), logToAdd);
logHistory = await addNewLogToHead(logHistory, logToAdd, onLogAdded);
}
return logHistory;
}
Expand All @@ -54,11 +56,11 @@ const ensureOrder = <TLog extends Log>(headLog: TLog | undefined, newLog: TLog)
if (headLog === undefined) return;
const headBlockNumber = parseInt(headLog.blockNumber, 16);
const newLogBlockNumber = parseInt(newLog.blockNumber, 16);
if (headBlockNumber > newLogBlockNumber) throw new Error("received log for a block older than current head log's block");
if (headBlockNumber > newLogBlockNumber) throw new Error(`received log for a block (${newLogBlockNumber}) older than current head log's block (${headBlockNumber})`);
if (headBlockNumber !== newLogBlockNumber) return;
const headLogIndex = parseInt(headLog.logIndex, 16);
const newLogIndex = parseInt(newLog.logIndex, 16);
if (headLogIndex >= newLogIndex) throw new Error("received log with same block number but index newer than previous index");
if (headLogIndex >= newLogIndex) throw new Error(`received log with same block number (${newLogBlockNumber}) but index (${newLogIndex}) is the same or older than previous index (${headLogIndex})`);
}

export const reconcileLogHistoryWithRemovedBlock = async <TBlock extends Block, TLog extends Log>(
Expand Down
9 changes: 3 additions & 6 deletions tests/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,24 +340,21 @@ describe("reconcileLogHistoryWithAddedBlock", async () => {
const firstLogHistory = await reconcileLogHistoryWithAddedBlock(getLogs, oldLogHistory, firstBlock, onLogAdded, [{}]);
const secondLogHistoryPromise = reconcileLogHistoryWithAddedBlock(getLogs, firstLogHistory, secondBlock, onLogAdded, [{}]);

await expect(secondLogHistoryPromise).to.eventually.rejectedWith(Error, "received log for a block older than current head log's block");
await expect(secondLogHistoryPromise).to.eventually.rejectedWith(Error, /received log for a block (.*?) older than current head log's block (.*?)/);
// unfortunate reality
expect(newLogAnnouncements).to.deep.equal([new MockLog(0x7777)]);
})

it("fails if multiple logs are received with the same index", async () => {
it("dedupes logs with same blockhash and index from multiple filters", async () => {
const getLogs = async (filterOptions: FilterOptions) => Promise.resolve([
new MockLog(0x7777, 0x0),
new MockLog(0x7777, 0x1),
new MockLog(0x7777, 0x1),
]);
const newBlock = new MockBlock(0x7777);
const oldLogHistory = Promise.resolve(ImmutableList<Log>());

const newLogHistoryPromise = reconcileLogHistoryWithAddedBlock(getLogs, oldLogHistory, newBlock, onLogAdded, [{}]);
const newLogHistory = await reconcileLogHistoryWithAddedBlock(getLogs, oldLogHistory, newBlock, onLogAdded, [{},{}]);

await expect(newLogHistoryPromise).to.eventually.rejectedWith(Error, "received log with same block number but index newer than previous index");
// unfortunate reality
expect(newLogAnnouncements).to.deep.equal([
new MockLog(0x7777, 0x0),
new MockLog(0x7777, 0x1),
Expand Down

0 comments on commit 6580081

Please sign in to comment.