Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: attestation pool for electra #6744

Merged
merged 4 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions packages/api/src/beacon/routes/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ export type Api = {
*/
getAggregatedAttestation(
attestationDataRoot: Root,
slot: Slot
slot: Slot,
index: CommitteeIndex
): Promise<
ApiClientResponse<
{[HttpStatusCode.OK]: {data: allForks.Attestation; version: ForkName}},
Expand Down Expand Up @@ -498,7 +499,7 @@ export type ReqTypes = {
produceBlindedBlock: {params: {slot: number}; query: {randao_reveal: string; graffiti: string}};
produceAttestationData: {query: {slot: number; committee_index: number}};
produceSyncCommitteeContribution: {query: {slot: number; subcommittee_index: number; beacon_block_root: string}};
getAggregatedAttestation: {query: {attestation_data_root: string; slot: number}};
getAggregatedAttestation: {query: {attestation_data_root: string; slot: number; index: number}};
publishAggregateAndProofs: {body: unknown};
publishContributionAndProofs: {body: unknown};
prepareBeaconCommitteeSubnet: {body: unknown};
Expand Down Expand Up @@ -647,10 +648,10 @@ export function getReqSerializers(): ReqSerializers<Api, ReqTypes> {
},

getAggregatedAttestation: {
writeReq: (root, slot) => ({query: {attestation_data_root: toHexString(root), slot}}),
parseReq: ({query}) => [fromHexString(query.attestation_data_root), query.slot],
writeReq: (root, slot, index) => ({query: {attestation_data_root: toHexString(root), slot, index}}),
parseReq: ({query}) => [fromHexString(query.attestation_data_root), query.slot, query.index],
schema: {
query: {attestation_data_root: Schema.StringRequired, slot: Schema.UintRequired},
query: {attestation_data_root: Schema.StringRequired, slot: Schema.UintRequired, index: Schema.UintRequired},
},
},

Expand Down
98 changes: 52 additions & 46 deletions packages/api/test/unit/beacon/testData/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@ export const eventTestData: EventData = {
block: "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf",
executionOptimistic: false,
},
[EventType.attestation]: ssz.phase0.Attestation.fromJson({
aggregation_bits: "0x01",
signature:
"0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
data: {
slot: "1",
index: "1",
beacon_block_root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
source: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
target: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
},
}),
[EventType.attestation]: {
version: ForkName.altair,
data: ssz.phase0.Attestation.fromJson({
aggregation_bits: "0x01",
signature:
"0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
data: {
slot: "1",
index: "1",
beacon_block_root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
source: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
target: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
},
}),
},
[EventType.voluntaryExit]: ssz.phase0.SignedVoluntaryExit.fromJson({
message: {epoch: "1", validator_index: "1"},
signature:
Expand Down Expand Up @@ -72,44 +75,47 @@ export const eventTestData: EventData = {
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
}),
[EventType.attesterSlashing]: ssz.phase0.AttesterSlashing.fromJson({
attestation_1: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
[EventType.attesterSlashing]: {
version: ForkName.altair,
data: ssz.phase0.AttesterSlashing.fromJson({
attestation_1: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
attestation_2: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
attestation_2: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
}),
}),
},
[EventType.blsToExecutionChange]: ssz.capella.SignedBLSToExecutionChange.fromJson({
message: {
validator_index: "1",
Expand Down
7 changes: 5 additions & 2 deletions packages/api/test/unit/beacon/testData/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ export const testData: GenericServerTestCases<Api> = {
res: {data: ssz.altair.SyncCommitteeContribution.defaultValue()},
},
getAggregatedAttestation: {
args: [ZERO_HASH, 32000],
res: {data: ssz.phase0.Attestation.defaultValue()},
args: [ZERO_HASH, 32000, 2],
res: {
data: ssz.phase0.Attestation.defaultValue(),
version: ForkName.altair,
},
},
publishAggregateAndProofs: {
args: [[ssz.phase0.SignedAggregateAndProof.defaultValue()]],
Expand Down
9 changes: 6 additions & 3 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1063,16 +1063,19 @@ export function getValidatorApi({
};
},

async getAggregatedAttestation(attestationDataRoot, slot) {
async getAggregatedAttestation(attestationDataRoot, slot, committeeIndex) {
notWhileSyncing();

await waitForSlot(slot); // Must never request for a future slot > currentSlot

const dataRootHex = toHexString(attestationDataRoot);
const aggregate = chain.attestationPool.getAggregate(slot, dataRootHex);
const aggregate = chain.attestationPool.getAggregate(slot, committeeIndex, dataRootHex);

if (!aggregate) {
throw new ApiError(404, `No aggregated attestation for slot=${slot}, dataRoot=${dataRootHex}`);
throw new ApiError(
404,
`No aggregated attestation for slot=${slot} committeeIndex=${committeeIndex}, dataRoot=${dataRootHex}`
);
}

metrics?.production.producedAggregateParticipants.observe(aggregate.aggregationBits.getTrueBitIndexes().length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
getBlockRootAtSlot,
} from "@lodestar/state-transition";
import {IForkChoice, EpochDifference} from "@lodestar/fork-choice";
import {toHex, MapDef} from "@lodestar/utils";
import {toHex, MapDef, assert} from "@lodestar/utils";
import {intersectUint8Arrays, IntersectResult} from "../../util/bitArray.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
import {InsertOutcome} from "./types.js";
Expand Down Expand Up @@ -141,10 +141,8 @@ export class AggregatedAttestationPool {
? // this attestation is added to pool after validation
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;
if (committeeIndex === null) {
// this should not happen because attestation should be validated before reaching this
throw Error(`Invalid attestation slot=${slot} committeeIndex=${committeeIndex}`);
}
// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in aggregated attestation pool");
let attestationGroup = attestationGroupByIndex.get(committeeIndex);
if (!attestationGroup) {
attestationGroup = new MatchingDataAttestationGroup(committee, attestation.data);
Expand Down
79 changes: 30 additions & 49 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {PointFormat, Signature} from "@chainsafe/bls/types";
import bls from "@chainsafe/bls";
import {BitArray} from "@chainsafe/ssz";
import {Slot, RootHex, allForks} from "@lodestar/types";
import {MapDef} from "@lodestar/utils";
import {Slot, RootHex, allForks, isElectraAttestation} from "@lodestar/types";
import {MapDef, assert} from "@lodestar/utils";
import {IClock} from "../../util/clock.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
Expand Down Expand Up @@ -36,6 +36,8 @@ type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;
/** Hex string of DataRoot `TODO` */
type DataRootHex = string;

type CommitteeIndex = number;

/**
* A pool of `Attestation` that is specially designed to store "unaggregated" attestations from
* the native aggregation scheme.
Expand All @@ -60,8 +62,8 @@ type DataRootHex = string;
* receives and it can be triggered manually.
*/
export class AttestationPool {
private readonly attestationByRootBySlot = new MapDef<Slot, Map<DataRootHex, AggregateFast>>(
() => new Map<DataRootHex, AggregateFast>()
private readonly attestationByRootBySlot = new MapDef<Slot, Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>>(
() => new Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>()
);
private lowestPermissibleSlot = 0;

Expand Down Expand Up @@ -117,23 +119,35 @@ export class AttestationPool {
throw new OpPoolError({code: OpPoolErrorCode.REACHED_MAX_PER_SLOT});
}

const committeeIndex = isElectraAttestation(attestation)
? // this attestation is added to pool after validation
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;
// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool");

// Pre-aggregate the contribution with existing items
const aggregate = aggregateByRoot.get(attDataRootHex);
let aggregateByIndex = aggregateByRoot.get(attDataRootHex);
if (aggregateByIndex === undefined) {
aggregateByIndex = new Map<CommitteeIndex, AggregateFast>();
aggregateByRoot.set(attDataRootHex, aggregateByIndex);
}
const aggregate = aggregateByIndex.get(committeeIndex);
if (aggregate) {
// Aggregate mutating
return aggregateAttestationInto(aggregate, attestation);
} else {
// Create new aggregate
aggregateByRoot.set(attDataRootHex, attestationToAggregate(attestation));
aggregateByIndex.set(committeeIndex, attestationToAggregate(attestation));
return InsertOutcome.NewData;
}
}

/**
* For validator API to get an aggregate
*/
getAggregate(slot: Slot, dataRootHex: RootHex): allForks.Attestation | null {
const aggregate = this.attestationByRootBySlot.get(slot)?.get(dataRootHex);
getAggregate(slot: Slot, committeeIndex: CommitteeIndex, dataRootHex: RootHex): allForks.Attestation | null {
const aggregate = this.attestationByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex);
if (!aggregate) {
// TODO: Add metric for missing aggregates
return null;
Expand Down Expand Up @@ -166,8 +180,10 @@ export class AttestationPool {

for (const aggregateByRoot of aggregateByRoots) {
if (aggregateByRoot) {
for (const aggFast of aggregateByRoot.values()) {
attestations.push(fastToAttestation(aggFast));
for (const aggFastByIndex of aggregateByRoot.values()) {
for (const aggFast of aggFastByIndex.values()) {
attestations.push(fastToAttestation(aggFast));
}
}
}
}
Expand All @@ -180,35 +196,13 @@ export class AttestationPool {
// - Insert attestations coming from gossip and API

/**
* Aggregate a new contribution into `aggregate` mutating it
* Aggregate a new attestation into `aggregate` mutating it
*/
function aggregateAttestationInto(aggregate: AggregateFast, attestation: allForks.Attestation): InsertOutcome {
const bitIndex = attestation.aggregationBits.getSingleTrueBit();

// Should never happen, attestations are verified against this exact condition before
if (bitIndex === null) {
throw Error("Invalid attestation not exactly one bit set");
}

if ("committeeBits" in attestation && !("committeeBits" in aggregate)) {
throw Error("Attempt to aggregate electra attestation into phase0 attestation");
}

if (!("committeeBits" in attestation) && "committeeBits" in aggregate) {
throw Error("Attempt to aggregate phase0 attestation into electra attestation");
}

if ("committeeBits" in attestation) {
// We assume attestation.committeeBits should already be validated in api and gossip handler and should be non-null
const attestationCommitteeIndex = attestation.committeeBits.getSingleTrueBit();
const aggregateCommitteeIndex = (aggregate as AggregateFastElectra).committeeBits.getSingleTrueBit();

if (attestationCommitteeIndex !== aggregateCommitteeIndex) {
throw Error(
`Committee index mismatched: attestation ${attestationCommitteeIndex} aggregate ${aggregateCommitteeIndex}`
);
}
}
assert.notNull(bitIndex, "Invalid attestation in pool, not exactly one bit set");

if (aggregate.aggregationBits.get(bitIndex) === true) {
return InsertOutcome.AlreadyKnown;
Expand All @@ -226,7 +220,7 @@ function aggregateAttestationInto(aggregate: AggregateFast, attestation: allFork
* Format `contribution` into an efficient `aggregate` to add more contributions in with aggregateContributionInto()
*/
function attestationToAggregate(attestation: allForks.Attestation): AggregateFast {
if ("committeeBits" in attestation) {
if (isElectraAttestation(attestation)) {
return {
data: attestation.data,
// clone because it will be mutated
Expand All @@ -247,18 +241,5 @@ function attestationToAggregate(attestation: allForks.Attestation): AggregateFas
* Unwrap AggregateFast to phase0.Attestation
*/
function fastToAttestation(aggFast: AggregateFast): allForks.Attestation {
if ("committeeBits" in aggFast) {
return {
data: aggFast.data,
aggregationBits: aggFast.aggregationBits,
committeeBits: aggFast.committeeBits,
signature: aggFast.signature.toBytes(PointFormat.compressed),
};
} else {
return {
data: aggFast.data,
aggregationBits: aggFast.aggregationBits,
signature: aggFast.signature.toBytes(PointFormat.compressed),
};
}
return {...aggFast, signature: aggFast.signature.toBytes(PointFormat.compressed)};
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async function validateAggregateAndProof(
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NOT_EXACTLY_ONE_COMMITTEE_BIT_SET});
}
// [REJECT] aggregate.data.index == 0
if (attData.index === 0) {
if (attData.index !== 0) {
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NON_ZERO_ATTESTATION_DATA_INDEX});
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export type AttestationValidationResult = {
export type AttestationOrBytes = ApiAttestation | GossipAttestation;

/** attestation from api */
export type ApiAttestation = {attestation: phase0.Attestation; serializedData: null}; // TODO Electra: add new attestation type
export type ApiAttestation = {attestation: phase0.Attestation; serializedData: null};

/** attestation from gossip */
export type GossipAttestation = {
Expand Down Expand Up @@ -298,7 +298,7 @@ async function validateGossipAttestationNoSignatureCheck(
}

// [REJECT] aggregate.data.index == 0
if (attData.index === 0) {
if (attData.index !== 0) {
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NON_ZERO_ATTESTATION_DATA_INDEX});
}
} else {
Expand Down
Loading
Loading