Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: better tracing/metrics in validator and archiver #9108

Merged
merged 14 commits into from
Oct 10, 2024
Merged
1 change: 1 addition & 0 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ export class Archiver implements ArchiveSource {
// if we are here then we must have a valid proven epoch number
await this.store.setProvenL2EpochNumber(Number(provenEpochNumber));
}
this.instrumentation.updateLastProvenBlock(Number(provenBlockNumber));
};

// This is an edge case that we only hit if there are no proposed blocks.
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ export class AztecNodeService implements AztecNode {

const simulationProvider = await createSimulationProvider(config, log);

const validatorClient = createValidatorClient(config, p2pClient);
const validatorClient = createValidatorClient(config, p2pClient, telemetry);

// now create the sequencer
const sequencer = config.disableSequencer
Expand Down
11 changes: 8 additions & 3 deletions yarn-project/circuit-types/src/p2p/block_attestation.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { type EthAddress } from '@aztec/circuits.js';
import { Buffer32 } from '@aztec/foundation/buffer';
import { recoverAddress } from '@aztec/foundation/crypto';
import { keccak256, recoverAddress } from '@aztec/foundation/crypto';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { Signature } from '@aztec/foundation/eth-signature';
import { type Fr } from '@aztec/foundation/fields';
import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize';

import { ConsensusPayload } from './consensus_payload.js';
Expand Down Expand Up @@ -37,7 +38,11 @@ export class BlockAttestation extends Gossipable {
}

override p2pMessageIdentifier(): Buffer32 {
return BlockAttestationHash.fromField(this.payload.archive);
return new BlockAttestationHash(keccak256(this.signature.toBuffer()));
}

get archive(): Fr {
return this.payload.archive;
}

/**Get sender
Expand Down
7 changes: 6 additions & 1 deletion yarn-project/circuit-types/src/p2p/block_proposal.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { type EthAddress } from '@aztec/circuits.js';
import { Buffer32 } from '@aztec/foundation/buffer';
import { recoverAddress } from '@aztec/foundation/crypto';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { Signature } from '@aztec/foundation/eth-signature';
import { type Fr } from '@aztec/foundation/fields';
import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize';

import { ConsensusPayload } from './consensus_payload.js';
Expand Down Expand Up @@ -40,6 +41,10 @@ export class BlockProposal extends Gossipable {
return BlockProposalHash.fromField(this.payload.archive);
}

get archive(): Fr {
return this.payload.archive;
}

static async createProposalFromSigner(
payload: ConsensusPayload,
payloadSigner: (payload: Buffer32) => Promise<Signature>,
Expand Down
3 changes: 2 additions & 1 deletion yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ export const createP2PClient = async (
proofVerifier,
worldStateSynchronizer,
store,
telemetry,
);
} else {
p2pService = new DummyP2PService();
}
return new P2PClient(store, l2BlockSource, mempools, p2pService, config.keepProvenTxsInPoolFor);
return new P2PClient(store, l2BlockSource, mempools, p2pService, config.keepProvenTxsInPoolFor, telemetry);
};

async function configureP2PClientAddresses(_config: P2PConfig & DataStoreConfig): Promise<P2PConfig & DataStoreConfig> {
Expand Down
13 changes: 8 additions & 5 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { mockEpochProofQuote, mockTx } from '@aztec/circuit-types';
import { retryUntil } from '@aztec/foundation/retry';
import { type AztecKVStore } from '@aztec/kv-store';
import { openTmpStore } from '@aztec/kv-store/utils';
import { type TelemetryClient } from '@aztec/telemetry-client';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { expect, jest } from '@jest/globals';

Expand All @@ -28,6 +30,7 @@ describe('In-Memory P2P Client', () => {
let p2pService: Mockify<P2PService>;
let kvStore: AztecKVStore;
let client: P2PClient;
const telemetryClient: TelemetryClient = new NoopTelemetryClient();

beforeEach(() => {
txPool = {
Expand Down Expand Up @@ -73,7 +76,7 @@ describe('In-Memory P2P Client', () => {
};

kvStore = openTmpStore();
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0, telemetryClient);
});

const advanceToProvenBlock = async (getProvenBlockNumber: number, provenEpochNumber = getProvenBlockNumber) => {
Expand Down Expand Up @@ -143,7 +146,7 @@ describe('In-Memory P2P Client', () => {
await client.start();
await client.stop();

const client2 = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
const client2 = new P2PClient(kvStore, blockSource, mempools, p2pService, 0, telemetryClient);
expect(client2.getSyncedLatestBlockNum()).toEqual(client.getSyncedLatestBlockNum());
});

Expand All @@ -158,7 +161,7 @@ describe('In-Memory P2P Client', () => {
});

it('deletes txs after waiting the set number of blocks', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 10);
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 10, telemetryClient);
blockSource.setProvenBlockNumber(0);
await client.start();
expect(txPool.deleteTxs).not.toHaveBeenCalled();
Expand All @@ -175,7 +178,7 @@ describe('In-Memory P2P Client', () => {
});

it('stores and returns epoch proof quotes', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0, telemetryClient);

blockSource.setProvenEpochNumber(2);
await client.start();
Expand Down Expand Up @@ -206,7 +209,7 @@ describe('In-Memory P2P Client', () => {
});

it('deletes expired proof quotes', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0);
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 0, telemetryClient);

blockSource.setProvenEpochNumber(1);
blockSource.setProvenBlockNumber(1);
Expand Down
12 changes: 11 additions & 1 deletion yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
import { INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js/constants';
import { createDebugLogger } from '@aztec/foundation/log';
import { type AztecKVStore, type AztecSingleton } from '@aztec/kv-store';
import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client';

import { type ENR } from '@chainsafe/enr';

Expand Down Expand Up @@ -169,7 +170,7 @@ export interface P2P {
/**
* The P2P client implementation.
*/
export class P2PClient implements P2P {
export class P2PClient extends WithTracer implements P2P {
/** L2 block download to stay in sync with latest blocks. */
private latestBlockDownloader: L2BlockDownloader;

Expand Down Expand Up @@ -210,8 +211,11 @@ export class P2PClient implements P2P {
mempools: MemPools,
private p2pService: P2PService,
private keepProvenTxsFor: number,
telemetryClient: TelemetryClient,
private log = createDebugLogger('aztec:p2p'),
) {
super(telemetryClient, 'P2PClient');

const { blockCheckIntervalMS: checkInterval, l2QueueSize: p2pL2QueueSize } = getP2PConfigEnvVars();
const l2DownloaderOpts = { maxQueueSize: p2pL2QueueSize, pollIntervalMS: checkInterval };
// TODO(palla/prover-node): This effectively downloads blocks twice from the archiver, which is an issue
Expand Down Expand Up @@ -318,6 +322,12 @@ export class P2PClient implements P2P {
this.log.info('P2P client stopped.');
}

@trackSpan('p2pClient.broadcastProposal', proposal => ({
[Attributes.BLOCK_NUMBER]: proposal.payload.header.globalVariables.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: proposal.payload.header.globalVariables.slotNumber.toNumber(),
[Attributes.BLOCK_ARCHIVE]: proposal.archive.toString(),
[Attributes.P2P_ID]: proposal.p2pMessageIdentifier().toString(),
}))
public broadcastProposal(proposal: BlockProposal): void {
this.log.verbose(`Broadcasting proposal ${proposal.p2pMessageIdentifier()} to peers`);
return this.p2pService.propagate(proposal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ describe('MemoryAttestationPool', () => {
const archive = Fr.random();
const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive)));

const proposalId = attestations[0].p2pMessageIdentifier().toString();

await ap.addAttestations(attestations);

// Check metrics have been updated.
expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length);

const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId);
const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString());

expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST);
expect(retreivedAttestations).toEqual(attestations);
Expand All @@ -52,7 +50,7 @@ describe('MemoryAttestationPool', () => {

expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length);

const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId);
const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString());
expect(retreivedAttestationsAfterDelete.length).toBe(0);
});

Expand All @@ -64,9 +62,9 @@ describe('MemoryAttestationPool', () => {

for (const attestation of attestations) {
const slot = attestation.payload.header.globalVariables.slotNumber;
const proposalId = attestation.p2pMessageIdentifier().toString();
const archive = attestation.archive.toString();

const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), proposalId);
const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), archive);
expect(retreivedAttestations.length).toBe(1);
expect(retreivedAttestations[0]).toEqual(attestation);
expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot);
Expand All @@ -84,7 +82,7 @@ describe('MemoryAttestationPool', () => {

for (const attestation of attestations) {
const slot = attestation.payload.header.globalVariables.slotNumber;
const proposalId = attestation.p2pMessageIdentifier().toString();
const proposalId = attestation.archive.toString();

const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), proposalId);
expect(retreivedAttestations.length).toBe(1);
Expand All @@ -97,7 +95,7 @@ describe('MemoryAttestationPool', () => {
const slotNumber = 420;
const archive = Fr.random();
const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive)));
const proposalId = attestations[0].p2pMessageIdentifier().toString();
const proposalId = attestations[0].archive.toString();

await ap.addAttestations(attestations);

Expand All @@ -119,7 +117,7 @@ describe('MemoryAttestationPool', () => {
const slotNumber = 420;
const archive = Fr.random();
const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive)));
const proposalId = attestations[0].p2pMessageIdentifier().toString();
const proposalId = attestations[0].archive.toString();

await ap.addAttestations(attestations);

Expand All @@ -137,7 +135,7 @@ describe('MemoryAttestationPool', () => {
const slotNumber = 420;
const archive = Fr.random();
const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive)));
const proposalId = attestations[0].p2pMessageIdentifier().toString();
const proposalId = attestations[0].archive.toString();

await ap.addAttestations(attestations);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class InMemoryAttestationPool implements AttestationPool {
// Perf: order and group by slot before insertion
const slotNumber = attestation.payload.header.globalVariables.slotNumber;

const proposalId = attestation.p2pMessageIdentifier().toString();
const proposalId = attestation.archive.toString();
const address = attestation.getSender();

const slotAttestationMap = getSlotOrDefault(this.attestations, slotNumber.toBigInt());
Expand Down Expand Up @@ -89,7 +89,7 @@ export class InMemoryAttestationPool implements AttestationPool {
const slotNumber = attestation.payload.header.globalVariables.slotNumber;
const slotAttestationMap = this.attestations.get(slotNumber.toBigInt());
if (slotAttestationMap) {
const proposalId = attestation.p2pMessageIdentifier().toString();
const proposalId = attestation.archive.toString();
const proposalAttestationMap = getProposalOrDefault(slotAttestationMap, proposalId);
if (proposalAttestationMap) {
const address = attestation.getSender();
Expand Down
37 changes: 35 additions & 2 deletions yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { createDebugLogger } from '@aztec/foundation/log';
import { SerialQueue } from '@aztec/foundation/queue';
import { RunningPromise } from '@aztec/foundation/running-promise';
import type { AztecKVStore } from '@aztec/kv-store';
import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client';

import { type ENR } from '@chainsafe/enr';
import { type GossipSub, type GossipSubComponents, gossipsub } from '@chainsafe/libp2p-gossipsub';
Expand Down Expand Up @@ -77,7 +78,7 @@ export async function createLibP2PPeerId(privateKey?: string): Promise<PeerId> {
/**
* Lib P2P implementation of the P2PService interface.
*/
export class LibP2PService implements P2PService {
export class LibP2PService extends WithTracer implements P2PService {
private jobQueue: SerialQueue = new SerialQueue();
private peerManager: PeerManager;
private discoveryRunningPromise?: RunningPromise;
Expand All @@ -100,9 +101,13 @@ export class LibP2PService implements P2PService {
private l2BlockSource: L2BlockSource,
private proofVerifier: ClientProtocolCircuitVerifier,
private worldStateSynchronizer: WorldStateSynchronizer,
telemetry: TelemetryClient,
private requestResponseHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS,
private logger = createDebugLogger('aztec:libp2p_service'),
) {
// Instatntiate tracer
super(telemetry, 'LibP2PService');

this.peerManager = new PeerManager(node, peerDiscoveryService, config, logger);
this.node.services.pubsub.score.params.appSpecificScore = (peerId: string) => {
return this.peerManager.getPeerScore(peerId);
Expand Down Expand Up @@ -204,6 +209,7 @@ export class LibP2PService implements P2PService {
proofVerifier: ClientProtocolCircuitVerifier,
worldStateSynchronizer: WorldStateSynchronizer,
store: AztecKVStore,
telemetry: TelemetryClient,
) {
const { tcpListenAddress, tcpAnnounceAddress, minPeerCount, maxPeerCount } = config;
const bindAddrTcp = convertToMultiaddr(tcpListenAddress, 'tcp');
Expand Down Expand Up @@ -306,6 +312,7 @@ export class LibP2PService implements P2PService {
l2BlockSource,
proofVerifier,
worldStateSynchronizer,
telemetry,
requestResponseHandlers,
);
}
Expand Down Expand Up @@ -397,6 +404,12 @@ export class LibP2PService implements P2PService {
*
* @param attestation - The attestation to process.
*/
@trackSpan('Libp2pService.processAttestationFromPeer', attestation => ({
[Attributes.BLOCK_NUMBER]: attestation.payload.header.globalVariables.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: attestation.payload.header.globalVariables.slotNumber.toNumber(),
[Attributes.BLOCK_ARCHIVE]: attestation.archive.toString(),
[Attributes.P2P_ID]: attestation.p2pMessageIdentifier().toString(),
}))
private async processAttestationFromPeer(attestation: BlockAttestation): Promise<void> {
this.logger.debug(`Received attestation ${attestation.p2pMessageIdentifier()} from external peer.`);
await this.mempools.attestationPool.addAttestations([attestation]);
Expand All @@ -409,17 +422,37 @@ export class LibP2PService implements P2PService {
* @param block - The block to process.
*/
// REVIEW: callback pattern https://github.com/AztecProtocol/aztec-packages/issues/7963
@trackSpan('Libp2pService.processBlockFromPeer', block => ({
[Attributes.BLOCK_NUMBER]: block.payload.header.globalVariables.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: block.payload.header.globalVariables.slotNumber.toNumber(),
[Attributes.BLOCK_ARCHIVE]: block.archive.toString(),
[Attributes.P2P_ID]: block.p2pMessageIdentifier().toString(),
}))
private async processBlockFromPeer(block: BlockProposal): Promise<void> {
this.logger.verbose(`Received block ${block.p2pMessageIdentifier()} from external peer.`);
const attestation = await this.blockReceivedCallback(block);

// TODO: fix up this pattern - the abstraction is not nice
// The attestation can be undefined if no handler is registered / the validator deems the block invalid
if (attestation != undefined) {
this.propagate(attestation);
this.broadcastAttestation(attestation);
}
}

/**
* Broadcast an attestation to all peers.
* @param attestation - The attestation to broadcast.
*/
@trackSpan('Libp2pService.broadcastAttestation', attestation => ({
[Attributes.BLOCK_NUMBER]: attestation.payload.header.globalVariables.blockNumber.toNumber(),
[Attributes.SLOT_NUMBER]: attestation.payload.header.globalVariables.slotNumber.toNumber(),
[Attributes.BLOCK_ARCHIVE]: attestation.archive.toString(),
[Attributes.P2P_ID]: attestation.p2pMessageIdentifier().toString(),
}))
private broadcastAttestation(attestation: BlockAttestation): void {
this.propagate(attestation);
}

private processEpochProofQuoteFromPeer(epochProofQuote: EpochProofQuote): void {
this.logger.verbose(`Received epoch proof quote ${epochProofQuote.p2pMessageIdentifier()} from external peer.`);
this.mempools.epochProofQuotePool.addQuote(epochProofQuote);
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/sequencer-client/src/sequencer/sequencer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ export class Sequencer {
this.isFlushing = true;
}

@trackSpan('Sequencer.collectAttestations', (block, txHashes) => ({
[Attributes.BLOCK_NUMBER]: block.number,
[Attributes.BLOCK_ARCHIVE]: block.archive.toString(),
[Attributes.BLOCK_TXS_COUNT]: txHashes.length,
}))
protected async collectAttestations(block: L2Block, txHashes: TxHash[]): Promise<Signature[] | undefined> {
// TODO(https://github.com/AztecProtocol/aztec-packages/issues/7962): inefficient to have a round trip in here - this should be cached
const committee = await this.publisher.getCurrentEpochCommittee();
Expand Down
Loading
Loading