Skip to content

Commit

Permalink
feat: Handle reorgs in world state synchronizer
Browse files Browse the repository at this point in the history
  • Loading branch information
spalladino committed Oct 9, 2024
1 parent 9afd190 commit 70d830f
Show file tree
Hide file tree
Showing 18 changed files with 774 additions and 522 deletions.
28 changes: 28 additions & 0 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import {
type InboxLeaf,
type L1ToL2MessageSource,
type L2Block,
L2BlockId,
type L2BlockL2Logs,
type L2BlockSource,
type L2LogsSource,
L2Tips,
type LogFilter,
type LogType,
type TxEffect,
Expand Down Expand Up @@ -653,6 +655,32 @@ export class Archiver implements ArchiveSource {
getContractArtifact(address: AztecAddress): Promise<ContractArtifact | undefined> {
return this.store.getContractArtifact(address);
}

async getL2Tips(): Promise<L2Tips> {
const [latestBlockNumber, provenBlockNumber] = await Promise.all([
this.getBlockNumber(),
this.getProvenBlockNumber(),
] as const);

const [latestBlockHeader, provenBlockHeader] = await Promise.all([
latestBlockNumber > 0 ? this.getBlockHeader(latestBlockNumber) : undefined,
provenBlockNumber > 0 ? this.getBlockHeader(provenBlockNumber) : undefined,
] as const);

if (latestBlockNumber > 0 && !latestBlockHeader) {
throw new Error('Failed to retrieve latest block header');
}

if (provenBlockNumber > 0 && !provenBlockHeader) {
throw new Error('Failed to retrieve proven block header');
}

return {
latest: { number: latestBlockNumber, hash: latestBlockHeader?.hash().toString() } as L2BlockId,
proven: { number: provenBlockNumber, hash: provenBlockHeader?.hash().toString() } as L2BlockId,
finalized: { number: provenBlockNumber, hash: provenBlockHeader?.hash().toString() } as L2BlockId,
};
}
}

enum Operation {
Expand Down
26 changes: 24 additions & 2 deletions yarn-project/archiver/src/test/mock_l2_block_source.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import { L2Block, type L2BlockSource, type TxEffect, type TxHash, TxReceipt, TxStatus } from '@aztec/circuit-types';
import {
L2Block,
type L2BlockSource,
L2Tips,
type TxEffect,
type TxHash,
TxReceipt,
TxStatus,
} from '@aztec/circuit-types';
import { EthAddress, type Header } from '@aztec/circuits.js';

import { getSlotRangeForEpoch } from '../archiver/epoch_helpers.js';

/**
* A mocked implementation of L2BlockSource to be used in p2p tests.
* A mocked implementation of L2BlockSource to be used in tests.
*/
export class MockBlockSource implements L2BlockSource {
private l2Blocks: L2Block[] = [];
Expand Down Expand Up @@ -135,6 +143,20 @@ export class MockBlockSource implements L2BlockSource {
return Promise.resolve(undefined);
}

async getL2Tips(): Promise<L2Tips> {
const [latest, proven, finalized] = [
await this.getBlockNumber(),
await this.getProvenBlockNumber(),
await this.getProvenBlockNumber(),
] as const;

return {
latest: { number: latest, hash: this.l2Blocks[latest]?.hash().toString() },
proven: { number: proven, hash: this.l2Blocks[proven]?.hash().toString() },
finalized: { number: finalized, hash: this.l2Blocks[finalized]?.hash().toString() },
};
}

getL2EpochNumber(): Promise<bigint> {
throw new Error('Method not implemented.');
}
Expand Down
8 changes: 0 additions & 8 deletions yarn-project/circuit-types/src/interfaces/world_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,6 @@ export interface WorldStateSynchronizer {
*/
syncImmediate(minBlockNumber?: number): Promise<number>;

/**
* Pauses the synchronizer, syncs to the target block number, forks world state, and resumes.
* @param targetBlockNumber - The block number to sync to.
* @param forkIncludeUncommitted - Whether to include uncommitted data in the fork.
* @returns The db forked at the requested target block number.
*/
syncImmediateAndFork(targetBlockNumber: number): Promise<MerkleTreeWriteOperations>;

/**
* Forks the current in-memory state based off the current committed state, and returns an instance that cannot modify the underlying data store.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './l2_block_downloader.js';
export * from './l2_block_stream.js';
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import { Fr, type Header } from '@aztec/circuits.js';
import { compactArray } from '@aztec/foundation/collection';

import { type MockProxy, mock } from 'jest-mock-extended';
import times from 'lodash.times';

import { type L2Block } from '../l2_block.js';
import { type L2BlockSource, type L2BlockTag } from '../l2_block_source.js';
import {
L2BlockStream,
type L2BlockStreamEvent,
type L2BlockStreamEventHandler,
type L2BlockStreamLocalDataProvider,
} from './l2_block_stream.js';

describe('L2BlockStream', () => {
let blockStream: TestL2BlockStream;

let blockSource: MockProxy<L2BlockSource>;
let localData: TestL2BlockStreamLocalDataProvider;
let handler: TestL2BlockStreamEventHandler;

let latest: number = 0;

beforeEach(() => {
blockSource = mock<L2BlockSource>();
localData = new TestL2BlockStreamLocalDataProvider();
handler = new TestL2BlockStreamEventHandler();

// Archiver returns headers with hashes equal to the block number for simplicity
blockSource.getBlockHeader.mockImplementation(number =>
Promise.resolve(makeHeader(number === 'latest' ? 1 : number)),
);

// And returns blocks up until what was reported as the latest block
blockSource.getBlocks.mockImplementation((from, limit) =>
Promise.resolve(compactArray(times(limit, i => (from + i > latest ? undefined : makeBlock(from + i))))),
);

blockStream = new TestL2BlockStream(blockSource, localData, handler, { batchSize: 10 });
});

const makeBlock = (number: number) => ({ number } as L2Block);

const makeHeader = (number: number) => mock<Header>({ hash: () => new Fr(number) });

const setRemoteTips = (latest_: number, proven?: number, finalized?: number) => {
proven = proven ?? 0;
finalized = finalized ?? 0;
latest = latest_;

blockSource.getL2Tips.mockResolvedValue({
latest: { number: latest, hash: latest.toString() },
proven: { number: proven, hash: proven.toString() },
finalized: { number: finalized, hash: finalized.toString() },
});
};

describe('work', () => {
it('pulls new blocks from start', async () => {
setRemoteTips(5);

await blockStream.work();
expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 1)) }]);
});

it('pulls new blocks from offset', async () => {
setRemoteTips(15);
localData.latest = 10;

await blockStream.work();
expect(blockSource.getBlocks).toHaveBeenCalledWith(11, 5, undefined);
expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 11)) }]);
});

it('pulls new blocks in multiple batches', async () => {
setRemoteTips(45);

await blockStream.work();
expect(blockSource.getBlocks).toHaveBeenCalledTimes(5);
expect(handler.events).toEqual([
{ type: 'blocks-added', blocks: times(10, i => makeBlock(i + 1)) },
{ type: 'blocks-added', blocks: times(10, i => makeBlock(i + 11)) },
{ type: 'blocks-added', blocks: times(10, i => makeBlock(i + 21)) },
{ type: 'blocks-added', blocks: times(10, i => makeBlock(i + 31)) },
{ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 41)) },
]);
});

it('halts pulling blocks if stopped', async () => {
setRemoteTips(45);
blockStream.running = false;

await blockStream.work();
expect(blockSource.getBlocks).toHaveBeenCalledTimes(1);
expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(10, i => makeBlock(i + 1)) }]);
});

it('handles a reorg and requests blocks from new tip', async () => {
setRemoteTips(45);
localData.latest = 40;

for (const i of [37, 38, 39, 40]) {
// Mess up the block hashes for a bunch of blocks
localData.blockHashes[i] = `0xaa${i.toString()}`;
}

await blockStream.work();
expect(handler.events).toEqual([
{ type: 'chain-pruned', blockNumber: 36 },
{ type: 'blocks-added', blocks: times(9, i => makeBlock(i + 37)) },
]);
});

it('emits events for chain proven and finalized', async () => {
setRemoteTips(45, 40, 35);
localData.latest = 40;
localData.proven = 10;
localData.finalized = 10;

await blockStream.work();
expect(handler.events).toEqual([
{ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 41)) },
{ type: 'chain-proven', blockNumber: 40 },
{ type: 'chain-finalized', blockNumber: 35 },
]);
});

it('does not emit events for chain proven or finalized if local data ignores them', async () => {
setRemoteTips(45, 40, 35);
localData.latest = 40;

await blockStream.work();
expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 41)) }]);
});
});
});

class TestL2BlockStreamEventHandler implements L2BlockStreamEventHandler {
public readonly events: L2BlockStreamEvent[] = [];

handleBlockStreamEvent(event: L2BlockStreamEvent): Promise<void> {
this.events.push(event);
return Promise.resolve();
}
}

class TestL2BlockStreamLocalDataProvider implements L2BlockStreamLocalDataProvider {
public readonly blockHashes: Record<number, string> = {};

public latest = 0;
public proven: number | undefined = undefined;
public finalized: number | undefined = undefined;

public getL2BlockHash(number: number): Promise<string | undefined> {
return Promise.resolve(number > this.latest ? undefined : this.blockHashes[number] ?? new Fr(number).toString());
}

public getL2Tips(): Promise<{ latest: number } & Partial<Record<L2BlockTag, number>>> {
return Promise.resolve(this);
}
}

class TestL2BlockStream extends L2BlockStream {
public running = true;

public override work() {
return super.work();
}

public override isRunning(): boolean {
return this.running;
}
}
Loading

0 comments on commit 70d830f

Please sign in to comment.