Skip to content

Commit

Permalink
feat: track time to stable mesh for aggregator duties (#5897)
Browse files Browse the repository at this point in the history
* feat: track time to form stable mesh for aggregator duties

* fix: set time to stable mesh -1 if not able to form stable mesh within 2 slots
  • Loading branch information
twoeths authored Aug 23, 2023
1 parent 2b8a500 commit 945f892
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 15 deletions.
14 changes: 14 additions & 0 deletions packages/beacon-node/src/network/core/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,20 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
name: "lodestar_attnets_service_committee_subscriptions_total",
help: "Count of committee subscriptions",
}),
subscriptionsCommitteeMeshPeers: register.histogram<"subnet">({
name: "lodestar_attnets_service_committee_subscriptions_mesh_peers",
help: "Histogram of mesh peers per committee subscription",
labelNames: ["subnet"],
// Dlow = 6, D = 8, DHi = 12 plus 2 more buckets
buckets: [0, 4, 6, 8, 12],
}),
subscriptionsCommitteeTimeToStableMesh: register.histogram<"subnet">({
name: "lodestar_attnets_service_committee_subscriptions_time_to_stable_mesh_seconds",
help: "Histogram of time until committee subscription is considered healthy (>= 6 mesh peers)",
labelNames: ["subnet"],
// we subscribe 2 slots = 24s before aggregator duty
buckets: [0, 6, 12, 18, 24],
}),
subscriptionsRandom: register.gauge({
name: "lodestar_attnets_service_random_subscriptions_total",
help: "Count of random subscriptions",
Expand Down
105 changes: 90 additions & 15 deletions packages/beacon-node/src/network/subnets/dllAttnetsService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {ChainForkConfig} from "@lodestar/config";
import {
ATTESTATION_SUBNET_COUNT,
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION,
Expand All @@ -8,12 +6,15 @@ import {
} from "@lodestar/params";
import {Epoch, Slot, ssz} from "@lodestar/types";
import {Logger, MapDef} from "@lodestar/utils";
import {BeaconConfig} from "@lodestar/config";
import {ClockEvent, IClock} from "../../util/clock.js";
import {GossipType} from "../gossip/index.js";
import {MetadataController} from "../metadata.js";
import {SubnetMap, RequestedSubnet} from "../peers/utils/index.js";
import {getActiveForks} from "../forks.js";
import {NetworkCoreMetrics} from "../core/metrics.js";
import {stringifyGossipTopic} from "../gossip/topic.js";
import {GOSSIP_D_LOW} from "../gossip/scoringParameters.js";
import {IAttnetsService, CommitteeSubscription, SubnetsServiceOpts, GossipSubscriber, NodeId} from "./interface.js";
import {computeSubscribedSubnet} from "./util.js";

Expand All @@ -24,6 +25,15 @@ enum SubnetSource {
longLived = "long_lived",
}

type Subnet = number;
// map of subnet to time to form stable mesh as seconds, null if not yet formed
type AggregatorDutyInfo = Map<Subnet, number | null>;

/**
* This value means node is not able to form stable mesh.
*/
const NOT_ABLE_TO_FORM_STABLE_MESH_SEC = -1;

/**
* Manage deleterministic long lived (DLL) subnets and short lived subnets.
* - PeerManager uses attnetsService to know which peers are required for duties and long lived subscriptions
Expand All @@ -42,13 +52,13 @@ export class DLLAttnetsService implements IAttnetsService {
/** ${SUBNETS_PER_NODE} long lived subscriptions, may overlap with `shortLivedSubscriptions` */
private longLivedSubscriptions = new Set<number>();
/**
* Map of an aggregator at a slot and subnet
* Map of an aggregator at a slot and AggregatorDutyInfo
* Used to determine if we should process an attestation.
*/
private aggregatorSlotSubnet = new MapDef<Slot, Set<number>>(() => new Set());
private aggregatorSlotSubnet = new MapDef<Slot, AggregatorDutyInfo>(() => new Map());

constructor(
private readonly config: ChainForkConfig,
private readonly config: BeaconConfig,
private readonly clock: IClock,
private readonly gossip: GossipSubscriber,
private readonly metadata: MetadataController,
Expand Down Expand Up @@ -108,7 +118,7 @@ export class DLLAttnetsService implements IAttnetsService {
this.committeeSubnets.request({subnet, toSlot: slot + 1});
if (isAggregator) {
// need exact slot here
this.aggregatorSlotSubnet.getOrDefault(slot).add(subnet);
this.aggregatorSlotSubnet.getOrDefault(slot).set(subnet, null);
}
}
}
Expand Down Expand Up @@ -154,26 +164,82 @@ export class DLLAttnetsService implements IAttnetsService {
* Run per slot.
* - Subscribe to gossip subnets 2 slots in advance
* - Unsubscribe from expired subnets
* - Track time to stable mesh if not yet formed
*/
private onSlot = (clockSlot: Slot): void => {
try {
for (const [dutiedSlot, subnets] of this.aggregatorSlotSubnet.entries()) {
setTimeout(
() => {
this.onHalfSlot(clockSlot);
},
this.config.SECONDS_PER_SLOT * 0.5 * 1000
);

for (const [dutiedSlot, dutiedInfo] of this.aggregatorSlotSubnet.entries()) {
if (dutiedSlot === clockSlot + this.opts.slotsToSubscribeBeforeAggregatorDuty) {
// Trigger gossip subscription first, in batch
if (subnets.size > 0) {
this.subscribeToSubnets(Array.from(subnets), SubnetSource.committee);
if (dutiedInfo.size > 0) {
this.subscribeToSubnets(Array.from(dutiedInfo.keys()), SubnetSource.committee);
}
// Then, register the subscriptions
Array.from(subnets).map((subnet) => this.shortLivedSubscriptions.request({subnet, toSlot: dutiedSlot}));
for (const subnet of dutiedInfo.keys()) {
this.shortLivedSubscriptions.request({subnet, toSlot: dutiedSlot});
}
}
this.trackTimeToStableMesh(clockSlot, dutiedSlot, dutiedInfo);
}

this.unsubscribeExpiredCommitteeSubnets(clockSlot);
this.pruneExpiredAggregator(clockSlot);
} catch (e) {
this.logger.error("Error on AttnetsService.onSlot", {slot: clockSlot}, e as Error);
}
};

private onHalfSlot = (clockSlot: Slot): void => {
for (const [dutiedSlot, dutiedInfo] of this.aggregatorSlotSubnet.entries()) {
this.trackTimeToStableMesh(clockSlot, dutiedSlot, dutiedInfo);
}
};

/**
* Track time to form stable mesh if not yet formed
*/
private trackTimeToStableMesh(clockSlot: Slot, dutiedSlot: Slot, dutiedInfo: AggregatorDutyInfo): void {
if (dutiedSlot < clockSlot) {
// aggregator duty is expired, set timeToStableMesh to some big value so we know this value is not good
for (const [subnet, timeToFormMesh] of dutiedInfo.entries()) {
if (timeToFormMesh === null) {
dutiedInfo.set(subnet, NOT_ABLE_TO_FORM_STABLE_MESH_SEC);
this.metrics?.attnetsService.subscriptionsCommitteeTimeToStableMesh.observe(
{subnet},
NOT_ABLE_TO_FORM_STABLE_MESH_SEC
);
}
}
} else if (dutiedSlot <= clockSlot + this.opts.slotsToSubscribeBeforeAggregatorDuty) {
// aggregator duty is not expired, track time to stable mesh if this is the 1st time we see mesh peers>=Dlo (6)
for (const [subnet, timeToFormMesh] of dutiedInfo.entries()) {
if (timeToFormMesh === null) {
const topicStr = stringifyGossipTopic(this.config, {
type: gossipType,
fork: this.config.getForkName(dutiedSlot),
subnet,
});
const numMeshPeers = this.gossip.mesh.get(topicStr)?.size ?? 0;
if (numMeshPeers >= GOSSIP_D_LOW) {
const timeToStableMeshSec = this.clock.secFromSlot(
dutiedSlot - this.opts.slotsToSubscribeBeforeAggregatorDuty
);
// set to dutiedInfo so we'll not set to metrics again
dutiedInfo.set(subnet, timeToStableMeshSec);
this.metrics?.attnetsService.subscriptionsCommitteeTimeToStableMesh.observe({subnet}, timeToStableMeshSec);
}
}
}
}
}

/**
* Run per epoch, clean-up operations that are not urgent
* Subscribe to new random subnets every EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION epochs
Expand All @@ -183,8 +249,6 @@ export class DLLAttnetsService implements IAttnetsService {
if (epoch % EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION === 0) {
this.recomputeLongLivedSubnets();
}
const slot = computeStartSlotAtEpoch(epoch);
this.pruneExpiredAggregator(slot);
} catch (e) {
this.logger.error("Error on AttnetsService.onEpoch", {epoch}, e as Error);
}
Expand Down Expand Up @@ -245,9 +309,9 @@ export class DLLAttnetsService implements IAttnetsService {
* @param currentSlot
*/
private pruneExpiredAggregator(currentSlot: Slot): void {
for (const slot of this.aggregatorSlotSubnet.keys()) {
if (currentSlot > slot) {
this.aggregatorSlotSubnet.delete(slot);
for (const dutiedSlot of this.aggregatorSlotSubnet.keys()) {
if (currentSlot > dutiedSlot) {
this.aggregatorSlotSubnet.delete(dutiedSlot);
}
}
}
Expand Down Expand Up @@ -303,6 +367,17 @@ export class DLLAttnetsService implements IAttnetsService {
private onScrapeLodestarMetrics(metrics: NetworkCoreMetrics): void {
metrics.attnetsService.committeeSubnets.set(this.committeeSubnets.size);
metrics.attnetsService.subscriptionsCommittee.set(this.shortLivedSubscriptions.size);
// track short lived subnet status, >= 6 (Dlo) means healthy, otherwise unhealthy
const currentSlot = this.clock.currentSlot;
for (const {subnet} of this.shortLivedSubscriptions.getActiveTtl(currentSlot)) {
const topicStr = stringifyGossipTopic(this.config, {
type: gossipType,
fork: this.config.getForkName(currentSlot),
subnet,
});
const numMeshPeers = this.gossip.mesh.get(topicStr)?.size ?? 0;
metrics.attnetsService.subscriptionsCommitteeMeshPeers.observe({subnet}, numMeshPeers);
}
metrics.attnetsService.longLivedSubscriptions.set(this.longLivedSubscriptions.size);
let aggregatorCount = 0;
for (const subnets of this.aggregatorSlotSubnet.values()) {
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/network/subnets/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ export type SubnetsServiceTestOpts = {
shuffleFn?: ShuffleFn;
};

type TopicStr = string;
type PeerIdStr = string;

export type GossipSubscriber = {
subscribeTopic(topic: GossipTopic): void;
unsubscribeTopic(topic: GossipTopic): void;
mesh: Map<TopicStr, Set<PeerIdStr>>;
};

// uint256 in the spec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ describe("DLLAttnetsService", () => {
beforeEach(function () {
sandbox.useFakeTimers(Date.now());
gossipStub = sandbox.createStubInstance(Eth2Gossipsub) as SinonStubbedInstance<Eth2Gossipsub> & Eth2Gossipsub;
Object.defineProperty(gossipStub, "mesh", {value: new Map()});
clock = new Clock({
genesisTime: Math.floor(Date.now() / 1000),
config,
Expand Down

0 comments on commit 945f892

Please sign in to comment.