Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle multiple own sync committee aggregator per subnet #4135

Merged
merged 4 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions packages/lodestar/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,33 @@ export function getBeaconPoolApi({
// Worst case if `signature` is not valid, gossip peers will drop it and slightly downscore us.
await validateSyncCommitteeSigOnly(chain, state, signature);

// The same validator can appear multiple times in the sync committee. It can appear multiple times per
// subnet even. First compute on which subnet the signature must be broadcasted to.
const subnets: number[] = [];

for (const indexInCommittee of indexesInCommittee) {
// Sync committee subnet members are just sequential in the order they appear in SyncCommitteeIndexes array
const subnet = Math.floor(indexInCommittee / SYNC_COMMITTEE_SUBNET_SIZE);
const indexInSubcommittee = indexInCommittee % SYNC_COMMITTEE_SUBNET_SIZE;
chain.syncCommitteeMessagePool.add(subnet, signature, indexInSubcommittee);

// Cheap de-duplication code to avoid using a Set. indexesInCommittee is always sorted
if (subnets.length === 0 || subnets[subnets.length - 1] !== subnet) {
subnets.push(subnet);
}
}

// TODO: Broadcast at once to all topics
await Promise.all(
indexesInCommittee.map(async (indexInCommittee) => {
// Sync committee subnet members are just sequential in the order they appear in SyncCommitteeIndexes array
const subnet = Math.floor(indexInCommittee / SYNC_COMMITTEE_SUBNET_SIZE);
const indexInSubcommittee = indexInCommittee % SYNC_COMMITTEE_SUBNET_SIZE;
chain.syncCommitteeMessagePool.add(subnet, signature, indexInSubcommittee);
await network.gossip.publishSyncCommitteeSignature(signature, subnet);
})
subnets.map(async (subnet) => network.gossip.publishSyncCommitteeSignature(signature, subnet))
);
} catch (e) {
// TODO: gossipsub should allow publishing same message to different topics
// https://github.com/ChainSafe/js-libp2p-gossipsub/issues/272
if ((e as Error).message === "PublishError.Duplicate") {
return;
}

errors.push(e as Error);
logger.error(
`Error on submitPoolSyncCommitteeSignatures [${i}]`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,9 @@ export class SyncContributionAndProofPool {
const {contribution} = contributionAndProof;
const {slot, beaconBlockRoot} = contribution;
const rootHex = toHexString(beaconBlockRoot);
const lowestPermissibleSlot = this.lowestPermissibleSlot;

// Reject if too old.
if (slot < lowestPermissibleSlot) {
if (slot < this.lowestPermissibleSlot) {
return InsertOutcome.Old;
}

Expand Down
71 changes: 50 additions & 21 deletions packages/validator/src/services/syncCommitteeDuties.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
isSyncCommitteeAggregator,
} from "@chainsafe/lodestar-beacon-state-transition";
import {IChainForkConfig} from "@chainsafe/lodestar-config";
import {BLSSignature, Epoch, Root, Slot, SyncPeriod, ValidatorIndex} from "@chainsafe/lodestar-types";
import {BLSSignature, Epoch, RootHex, Slot, SyncPeriod, ValidatorIndex} from "@chainsafe/lodestar-types";
import {toHexString} from "@chainsafe/ssz";
import {Api, routes} from "@chainsafe/lodestar-api";
import {extendError} from "@chainsafe/lodestar-utils";
Expand All @@ -14,6 +14,7 @@ import {PubkeyHex} from "../types.js";
import {Metrics} from "../metrics.js";
import {ValidatorStore} from "./validatorStore.js";
import {IndicesService} from "./indices.js";
import {syncCommitteeIndicesToSubnets} from "./utils.js";

/** Only retain `HISTORICAL_DUTIES_PERIODS` duties prior to the current periods. */
const HISTORICAL_DUTIES_PERIODS = 2;
Expand All @@ -32,6 +33,18 @@ const ALTAIR_FORK_LOOKAHEAD_EPOCHS = 0;
/** How many epochs prior from a subscription starting, ask the node to subscribe */
const SUBSCRIPTIONS_LOOKAHEAD_EPOCHS = 2;

export type SyncDutySubnet = {
pubkey: string;
/** Index of validator in validator registry. */
validatorIndex: ValidatorIndex;
/**
* The indices of the validator in the sync committee.
* The same validator can appear multiples in the sync committee. Given how sync messages are constructor, the
* validator client only cares in which subnets the validator is in, not the specific index.
*/
subnets: number[];
};

export type SyncSelectionProof = {
/** This value is only set to not null if the proof indicates that the validator is an aggregator. */
selectionProof: BLSSignature | null;
Expand All @@ -40,12 +53,18 @@ export type SyncSelectionProof = {

/** Neatly joins SyncDuty with the locally-generated `selectionProof`. */
export type SyncDutyAndProofs = {
duty: routes.validator.SyncDuty;
duty: SyncDutySubnet;
/**
* Array because the same validator can appear multiple times in the sync committee.
* `routes.validator.SyncDuty` `.validatorSyncCommitteeIndices` is an array for that reason.
* SelectionProof signs over slot + index in committee, so the length of `.selectionProofs` equals
* `.validatorSyncCommitteeIndices`.
*/
selectionProofs: SyncSelectionProof[];
};

// To assist with readability
type DutyAtPeriod = {dependentRoot: Root; duty: routes.validator.SyncDuty};
type DutyAtPeriod = {dependentRoot: RootHex; duty: SyncDutySubnet};

/**
* Validators are part of a static long (~27h) sync committee, and part of static subnets.
Expand Down Expand Up @@ -109,7 +128,7 @@ export class SyncCommitteeDutiesService {
removeDutiesForKey(pubkey: PubkeyHex): void {
for (const [syncPeriod, validatorDutyAtPeriodMap] of this.dutiesByIndexByPeriod) {
for (const [validatorIndex, dutyAtPeriod] of validatorDutyAtPeriodMap) {
if (toHexString(dutyAtPeriod.duty.pubkey) === pubkey) {
if (dutyAtPeriod.duty.pubkey === pubkey) {
validatorDutyAtPeriodMap.delete(validatorIndex);
if (validatorDutyAtPeriodMap.size === 0) {
this.dutiesByIndexByPeriod.delete(syncPeriod);
Expand Down Expand Up @@ -188,7 +207,9 @@ export class SyncCommitteeDutiesService {
if (currentEpoch >= fromEpoch - SUBSCRIPTIONS_LOOKAHEAD_EPOCHS) {
syncCommitteeSubscriptions.push({
validatorIndex,
syncCommitteeIndices: dutyAtEpoch.duty.validatorSyncCommitteeIndices,
// prepareSyncCommitteeSubnets does not care about which specific index in the sync committee the
// validator is, but at what subnets is it participating.
syncCommitteeIndices: dutyAtEpoch.duty.subnets.map((subnet) => subnet * SYNC_COMMITTEE_SUBNET_SIZE),
untilEpoch,
// No need to send isAggregator here since the beacon node will assume validator always aggregates
});
Expand Down Expand Up @@ -220,7 +241,7 @@ export class SyncCommitteeDutiesService {
throw extendError(e, "Failed to obtain SyncDuties");
});

const dependentRoot = syncDuties.dependentRoot;
const dependentRoot = toHexString(syncDuties.dependentRoot);
const dutiesByIndex = new Map<ValidatorIndex, DutyAtPeriod>();
let count = 0;

Expand All @@ -231,6 +252,19 @@ export class SyncCommitteeDutiesService {
}
count++;

// Note: For networks where `state.validators.length < SYNC_COMMITTEE_SIZE` the same validator can appear
// multiple times in the sync committee. So `routes.validator.SyncDuty` `.validatorSyncCommitteeIndices`
// is an array, with all of those apparences.
//
// Validator signs two messages:
// `SyncCommitteeMessage`:
// - depends on slot, blockRoot, and validatorIndex.
// - Validator signs and publishes only one message regardless of validatorSyncCommitteeIndices length
// `SyncCommitteeContribution`:
// - depends on slot, blockRoot, validatorIndex, and subnet.
// - Validator must sign and publish only one message per subnet MAX. Regarless of validatorSyncCommitteeIndices
const subnets = syncCommitteeIndicesToSubnets(duty.validatorSyncCommitteeIndices);

// TODO: Enable dependentRoot functionality
// Meanwhile just overwrite them, since the latest duty will be older and less likely to re-org
//
Expand All @@ -240,9 +274,12 @@ export class SyncCommitteeDutiesService {
// - The dependent root has changed, signalling a re-org.
//
// if (reorg) this.metrics?.syncCommitteeDutiesReorg.inc()

//
// Using `alreadyWarnedReorg` avoids excessive logs.
dutiesByIndex.set(validatorIndex, {dependentRoot, duty});

// TODO: Use memory-efficient toHexString()
const pubkeyHex = toHexString(duty.pubkey);
dutiesByIndex.set(validatorIndex, {dependentRoot, duty: {pubkey: pubkeyHex, validatorIndex, subnets}});
}

// these could be redundant duties due to the state of next period query reorged
Expand All @@ -251,25 +288,17 @@ export class SyncCommitteeDutiesService {
const period = computeSyncPeriodAtEpoch(epoch);
this.dutiesByIndexByPeriod.set(period, dutiesByIndex);

this.logger.debug("Downloaded SyncDuties", {
epoch,
dependentRoot: toHexString(dependentRoot),
count,
});
this.logger.debug("Downloaded SyncDuties", {epoch, dependentRoot, count});
}

private async getSelectionProofs(slot: Slot, duty: routes.validator.SyncDuty): Promise<SyncSelectionProof[]> {
// Fast indexing with precomputed pubkeyHex. Fallback to toHexString(duty.pubkey)
const pubkey = this.indicesService.index2pubkey.get(duty.validatorIndex) ?? toHexString(duty.pubkey);

private async getSelectionProofs(slot: Slot, duty: SyncDutySubnet): Promise<SyncSelectionProof[]> {
const dutiesAndProofs: SyncSelectionProof[] = [];
for (const index of duty.validatorSyncCommitteeIndices) {
const subcommitteeIndex = Math.floor(index / SYNC_COMMITTEE_SUBNET_SIZE);
const selectionProof = await this.validatorStore.signSyncCommitteeSelectionProof(pubkey, slot, subcommitteeIndex);
for (const subnet of duty.subnets) {
const selectionProof = await this.validatorStore.signSyncCommitteeSelectionProof(duty.pubkey, slot, subnet);
dutiesAndProofs.push({
// selectionProof === null is used to check if is aggregator
selectionProof: isSyncCommitteeAggregator(selectionProof) ? selectionProof : null,
subcommitteeIndex,
subcommitteeIndex: subnet,
});
}
return dutiesAndProofs;
Expand Down
19 changes: 16 additions & 3 deletions packages/validator/src/services/utils.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {routes} from "@chainsafe/lodestar-api";
import {SYNC_COMMITTEE_SUBNET_SIZE} from "@chainsafe/lodestar-params";
import {CommitteeIndex, SubcommitteeIndex} from "@chainsafe/lodestar-types";
import {AttDutyAndProof} from "./attestationDuties.js";
import {SyncDutyAndProofs, SyncSelectionProof} from "./syncCommitteeDuties.js";
import {SyncDutyAndProofs, SyncDutySubnet, SyncSelectionProof} from "./syncCommitteeDuties.js";

/** Sync committee duty associated to a single sub committee subnet */
export type SubcommitteeDuty = {
duty: routes.validator.SyncDuty;
duty: SyncDutySubnet;
selectionProof: SyncSelectionProof["selectionProof"];
};

Expand Down Expand Up @@ -43,3 +43,16 @@ export function groupSyncDutiesBySubcommitteeIndex(

return dutiesBySubcommitteeIndex;
}

/**
* Given a list of indexes of a sync committee returns the list of unique subnet numbers the indexes are part of
*/
export function syncCommitteeIndicesToSubnets(indexesInCommittee: number[]): number[] {
const subnets = new Set<number>();

for (const indexInCommittee of indexesInCommittee) {
subnets.add(Math.floor(indexInCommittee / SYNC_COMMITTEE_SUBNET_SIZE));
}

return Array.from(subnets);
}
14 changes: 8 additions & 6 deletions packages/validator/src/services/validatorStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ export type SignerRemote = {
pubkeyHex: PubkeyHex;
};

type BLSPubkeyMaybeHex = BLSPubkey | string;

/**
* Validator entity capable of producing signatures. Either:
* - local: With BLS secret key
Expand Down Expand Up @@ -213,7 +215,7 @@ export class ValidatorStore {
}

async signSyncCommitteeSignature(
pubkey: BLSPubkey,
pubkey: BLSPubkeyMaybeHex,
validatorIndex: ValidatorIndex,
slot: Slot,
beaconBlockRoot: Root
Expand All @@ -230,7 +232,7 @@ export class ValidatorStore {
}

async signContributionAndProof(
duty: Pick<routes.validator.SyncDuty, "pubkey" | "validatorIndex">,
duty: {pubkey: BLSPubkeyMaybeHex; validatorIndex: number},
selectionProof: BLSSignature,
contribution: altair.SyncCommitteeContribution
): Promise<altair.SignedContributionAndProof> {
Expand All @@ -249,15 +251,15 @@ export class ValidatorStore {
};
}

async signAttestationSelectionProof(pubkey: BLSPubkey, slot: Slot): Promise<BLSSignature> {
async signAttestationSelectionProof(pubkey: BLSPubkeyMaybeHex, slot: Slot): Promise<BLSSignature> {
const domain = this.config.getDomain(DOMAIN_SELECTION_PROOF, slot);
const signingRoot = computeSigningRoot(ssz.Slot, slot, domain);

return await this.getSignature(pubkey, signingRoot);
}

async signSyncCommitteeSelectionProof(
pubkey: BLSPubkey | string,
pubkey: BLSPubkeyMaybeHex,
slot: Slot,
subcommitteeIndex: number
): Promise<BLSSignature> {
Expand All @@ -273,7 +275,7 @@ export class ValidatorStore {
}

async signVoluntaryExit(
pubkey: PubkeyHex,
pubkey: BLSPubkeyMaybeHex,
validatorIndex: number,
exitEpoch: Epoch
): Promise<phase0.SignedVoluntaryExit> {
Expand All @@ -288,7 +290,7 @@ export class ValidatorStore {
};
}

private async getSignature(pubkey: BLSPubkey | string, signingRoot: Uint8Array): Promise<BLSSignature> {
private async getSignature(pubkey: BLSPubkeyMaybeHex, signingRoot: Uint8Array): Promise<BLSSignature> {
// TODO: Refactor indexing to not have to run toHexString() on the pubkey every time
const pubkeyHex = typeof pubkey === "string" ? pubkey : toHexString(pubkey);

Expand Down
Loading