Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Range sync w/ dynamic target #2464

Merged
merged 1 commit into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/cli/src/cmds/dev/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ const devOwnOptions: ICliCommandOptions<IDevOwnArgs> = {
* Note: use beaconNodeOptions and globalOptions to make sure option key is correct
*/
const externalOptionsOverrides: {[k: string]: Options} = {
"sync.minPeers": {
...beaconNodeOptions["sync.minPeers"],
"sync.isSingleNode": {
...beaconNodeOptions["sync.isSingleNode"],
defaultDescription: undefined,
default: 0,
default: true,
},
"network.maxPeers": {
...beaconNodeOptions["network.maxPeers"],
Expand Down
15 changes: 9 additions & 6 deletions packages/cli/src/options/beaconNodeOptions/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@ import {defaultOptions, IBeaconNodeOptions} from "@chainsafe/lodestar";
import {ICliCommandOptions} from "../../util";

export interface ISyncArgs {
"sync.minPeers": number;
"sync.isSingleNode": boolean;
}

export function parseArgs(args: ISyncArgs): IBeaconNodeOptions["sync"] {
return {
minPeers: args["sync.minPeers"],
isSingleNode: args["sync.isSingleNode"],
};
}

export const options: ICliCommandOptions<ISyncArgs> = {
"sync.minPeers": {
type: "number",
description: "Minimum number of peers before the beacon chain starts syncing",
defaultDescription: String(defaultOptions.sync.minPeers),
"sync.isSingleNode": {
hidden: true,
type: "boolean",
description:
"Allow node to consider itself synced without being connected to a peer. \
Use only for local networks with a single node, can be dangerous in regular networks.",
defaultDescription: String(defaultOptions.sync.isSingleNode),
group: "sync",
},
};
4 changes: 2 additions & 2 deletions packages/cli/test/unit/options/beaconNodeOptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe("options / beaconNodeOptions", () => {
"network.bootMultiaddrs": [],
"network.localMultiaddrs": [],

"sync.minPeers": 17,
"sync.isSingleNode": true,
} as IBeaconNodeArgs;

const expectedOptions: RecursivePartial<IBeaconNodeOptions> = {
Expand Down Expand Up @@ -76,7 +76,7 @@ describe("options / beaconNodeOptions", () => {
localMultiaddrs: [],
},
sync: {
minPeers: 17,
isSingleNode: true,
},
};

Expand Down
15 changes: 12 additions & 3 deletions packages/lodestar/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {Epoch} from "@chainsafe/lodestar-types";
import {IApiModules} from "..";
import {IApiModules} from "../interface";
import {getLatestWeakSubjectivityCheckpointEpoch} from "../../../../../beacon-state-transition/lib/fast/util/weakSubjectivity";
import {IBeaconChain} from "../../../chain";
import {IBeaconSync} from "../../../sync";
import {SyncChainDebugState} from "../../../sync/range/chain";

/* eslint-disable @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access */
export interface ILodestarApi {
getWtfNode(): string;
getLatestWeakSubjectivityCheckpointEpoch(): Promise<Epoch>;
getSyncChainsDebugState(): SyncChainDebugState[];
}

export class LodestarApi implements ILodestarApi {
private readonly config: IBeaconConfig;
private readonly chain: IBeaconChain;
private readonly sync: IBeaconSync;

constructor(modules: Pick<IApiModules, "config" | "chain">) {
constructor(modules: Pick<IApiModules, "config" | "chain" | "sync">) {
this.config = modules.config;
this.chain = modules.chain;
this.sync = modules.sync;

// Allows to load wtfnode listeners immedeatelly. Usefull when dockerized,
// so after an unexpected restart wtfnode becomes properly loaded again
Expand All @@ -41,6 +45,7 @@ export class LodestarApi implements ILodestarApi {
function logger(...args: string[]): void {
for (const arg of args) logs.push(arg);
}
/* eslint-disable @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access */
wtfnode.setLogger("info", logger);
wtfnode.setLogger("warn", logger);
wtfnode.setLogger("error", logger);
Expand All @@ -52,4 +57,8 @@ export class LodestarApi implements ILodestarApi {
const state = this.chain.getHeadState();
return getLatestWeakSubjectivityCheckpointEpoch(this.config, state);
}

getSyncChainsDebugState(): SyncChainDebugState[] {
return this.sync.getSyncChainsDebugState();
}
}
13 changes: 5 additions & 8 deletions packages/lodestar/src/api/impl/validator/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {ZERO_HASH} from "../../../constants";
import {IBeaconDb} from "../../../db";
import {IEth1ForBlockProduction} from "../../../eth1";
import {INetwork} from "../../../network";
import {IBeaconSync, SyncMode} from "../../../sync";
import {IBeaconSync, SyncState} from "../../../sync";
import {toGraffitiBuffer} from "../../../util/graffiti";
import {IApiOptions} from "../../options";
import {ApiError} from "../errors";
Expand Down Expand Up @@ -317,8 +317,8 @@ export class ValidatorApi implements IValidatorApi {

const syncState = this.sync.state;
switch (syncState) {
case SyncMode.INITIAL_SYNCING:
case SyncMode.REGULAR_SYNCING: {
case SyncState.SyncingFinalized:
case SyncState.SyncingHead: {
const currentSlot = this.chain.clock.currentSlot;
const headSlot = this.chain.forkChoice.getHead().slot;
if (currentSlot - headSlot > SYNC_TOLERANCE_EPOCHS * this.config.params.SLOTS_PER_EPOCH) {
Expand All @@ -328,14 +328,11 @@ export class ValidatorApi implements IValidatorApi {
}
}

case SyncMode.SYNCED:
case SyncState.Synced:
return;

case SyncMode.WAITING_PEERS:
case SyncState.Stalled:
throw new ApiError(503, "Node is waiting for peers");

case SyncMode.STOPPED:
throw new ApiError(503, "Node is stopped");
}
}
}
13 changes: 13 additions & 0 deletions packages/lodestar/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,18 @@ export function createLodestarMetrics(register: RegistryMetricCreator, metadata:
name: "lodestar_bls_thread_pool_job_groups_started_total",
help: "Count of total jobs groups started in bls thread pool, job groups include +1 jobs",
}),

// Sync

syncChainsStarted: register.gauge<"syncType">({
name: "lodestar_sync_chains_started",
help: "Total number of sync chains started events, labeled by syncType",
labelNames: ["syncType"],
}),

syncStatus: register.gauge({
name: "lodestar_sync_status",
help: "Range sync status: [Stalled, SyncingFinalized, SyncingHead, Synced]",
}),
};
}
1 change: 1 addition & 0 deletions packages/lodestar/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export interface INetwork {
getEnr(): ENR | undefined;
getConnectionsByPeer(): Map<string, Connection[]>;
getConnectedPeers(): PeerId[];
hasSomeConnectedPeer(): boolean;
/** Search peers joining subnets */
prepareBeaconCommitteeSubnet(subscriptions: phase0.BeaconCommitteeSubscription[]): void;
reStatusPeers(peers: PeerId[]): void;
Expand Down
5 changes: 5 additions & 0 deletions packages/lodestar/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ export class Network implements INetwork {
this.metadata.stop();
this.gossip.stop();
this.reqResp.stop();
this.gossip.stop();
await this.libp2p.stop();
}

Expand All @@ -134,6 +135,10 @@ export class Network implements INetwork {
return this.peerManager.getConnectedPeerIds();
}

hasSomeConnectedPeer(): boolean {
return this.peerManager.hasSomeConnectedPeer();
}

/**
* Request att subnets up `toSlot`. Network will ensure to mantain some peers for each
*/
Expand Down
3 changes: 0 additions & 3 deletions packages/lodestar/src/network/peers/metastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ export interface IPeerMetadataStore {
metadata: PeerStoreBucket<phase0.Metadata>;
rpcScore: PeerStoreBucket<number>;
rpcScoreLastUpdate: PeerStoreBucket<number>;
status: PeerStoreBucket<phase0.Status>;
}

export type PeerStoreBucket<T> = {
Expand All @@ -31,7 +30,6 @@ export class Libp2pPeerMetadataStore implements IPeerMetadataStore {
metadata: PeerStoreBucket<phase0.Metadata>;
rpcScore: PeerStoreBucket<number>;
rpcScoreLastUpdate: PeerStoreBucket<number>;
status: PeerStoreBucket<phase0.Status>;

private readonly config: IBeaconConfig;
private readonly metabook: MetadataBook;
Expand All @@ -43,7 +41,6 @@ export class Libp2pPeerMetadataStore implements IPeerMetadataStore {
this.metadata = this.typedStore("metadata", this.config.types.phase0.Metadata);
this.rpcScore = this.typedStore("score", this.config.types.Number64);
this.rpcScoreLastUpdate = this.typedStore("score-last-update", this.config.types.Number64);
this.status = this.typedStore("status", this.config.types.phase0.Status);
}

private typedStore<T>(key: string, type: BasicType<T> | ContainerType<T>): PeerStoreBucket<T> {
Expand Down
18 changes: 15 additions & 3 deletions packages/lodestar/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@ import {IReqResp} from "../reqresp";
import {Libp2pPeerMetadataStore} from "./metastore";
import {PeerDiscovery} from "./discover";
import {IPeerRpcScoreStore, ScoreState} from "./score";
import {getConnectedPeerIds, PeerMapDelay, assertPeerRelevance, prioritizePeers, IrrelevantPeerError} from "./utils";
import {
getConnectedPeerIds,
hasSomeConnectedPeer,
PeerMapDelay,
assertPeerRelevance,
prioritizePeers,
IrrelevantPeerError,
} from "./utils";
import {prettyPrintPeerId} from "../util";
import {IAttestationService} from "../attestationService";

Expand Down Expand Up @@ -128,6 +135,13 @@ export class PeerManager {
return getConnectedPeerIds(this.libp2p);
}

/**
* Efficiently check if there is at least one peer connected
*/
hasSomeConnectedPeer(): boolean {
return hasSomeConnectedPeer(this.libp2p);
}

async goodbyeAndDisconnectAllPeers(): Promise<void> {
await Promise.all(
// Filter by peers that support the goodbye protocol: {supportsProtocols: [goodbyeProtocol]}
Expand Down Expand Up @@ -230,8 +244,6 @@ export class PeerManager {
// libp2p.connectionManager.get() returns not null if there's +1 open connections with `peer`
if (this.libp2p.connectionManager.get(peer)) {
this.networkEventBus.emit(NetworkEvent.peerConnected, peer, status);
// TODO - TEMP: RangeSync refactor may delete peerMetadata.status
this.peerMetadata.status.set(peer, status);
}
}

Expand Down
20 changes: 18 additions & 2 deletions packages/lodestar/src/network/peers/utils/getConnectedPeerIds.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import LibP2p from "libp2p";
import LibP2p, {Connection} from "libp2p";
import PeerId from "peer-id";

/**
Expand All @@ -7,10 +7,26 @@ import PeerId from "peer-id";
export function getConnectedPeerIds(libp2p: LibP2p): PeerId[] {
const peerIds: PeerId[] = [];
for (const connections of libp2p.connectionManager.connections.values()) {
const openConnection = connections.find((connection) => connection.stat.status === "open");
const openConnection = connections.find(isConnectionOpen);
if (openConnection) {
peerIds.push(openConnection.remotePeer);
}
}
return peerIds;
}

/**
* Efficiently check if there is at least one peer connected
*/
export function hasSomeConnectedPeer(libp2p: LibP2p): boolean {
for (const connections of libp2p.connectionManager.connections.values()) {
if (connections.some(isConnectionOpen)) {
return true;
}
}
return false;
}

function isConnectionOpen(connection: Connection): boolean {
return connection.stat.status === "open";
}
3 changes: 1 addition & 2 deletions packages/lodestar/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ export class BeaconNode {
});

await network.start();
await sync.start();
chores.start();

void runNodeNotifier({network, chain, sync, config, logger, signal});
Expand Down Expand Up @@ -224,7 +223,7 @@ export class BeaconNode {
if (this.status === BeaconNodeStatus.started) {
this.status = BeaconNodeStatus.closing;
await this.chores.stop();
await this.sync.stop();
this.sync.close();
await this.network.stop();
if (this.metricsServer) await this.metricsServer.stop();
if (this.restApi) await this.restApi.close();
Expand Down
13 changes: 6 additions & 7 deletions packages/lodestar/src/node/notifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {ErrorAborted, ILogger, sleep, prettyBytes} from "@chainsafe/lodestar-uti
import {AbortSignal} from "abort-controller";
import {IBeaconChain} from "../chain";
import {INetwork} from "../network";
import {IBeaconSync, SyncMode} from "../sync";
import {IBeaconSync, SyncState} from "../sync";
import {prettyTimeDiff} from "../util/time";
import {TimeSeries} from "../util/timeSeries";

Expand Down Expand Up @@ -57,10 +57,10 @@ export async function runNodeNotifier({
const headRow = `head: ${headInfo.slot} ${prettyBytes(headInfo.blockRoot)}`;
const currentSlotRow = `currentSlot: ${currentSlot}`;

let nodeState: string[] = [];
let nodeState: string[];
switch (sync.state) {
case SyncMode.INITIAL_SYNCING:
case SyncMode.REGULAR_SYNCING: {
case SyncState.SyncingFinalized:
case SyncState.SyncingHead: {
const slotsPerSecond = timeSeries.computeLinearSpeed();
const distance = Math.max(currentSlot - headSlot, 0);
const secondsLeft = distance / slotsPerSecond;
Expand All @@ -77,13 +77,12 @@ export async function runNodeNotifier({
break;
}

case SyncMode.SYNCED: {
case SyncState.Synced: {
nodeState = ["Synced", finalizedCheckpointRow, headRow, peersRow];
break;
}

case SyncMode.STOPPED:
case SyncMode.WAITING_PEERS: {
case SyncState.Stalled: {
nodeState = ["Searching for peers", peersRow, finalizedCheckpointRow, headRow, currentSlotRow];
}
}
Expand Down
36 changes: 36 additions & 0 deletions packages/lodestar/src/sync/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/** The number of head syncing chains to sync at a time. */
export const PARALLEL_HEAD_CHAINS = 2;

/** Minimum work we require a finalized chain to do before picking a chain with more peers. */
export const MIN_FINALIZED_CHAIN_VALIDATED_EPOCHS = 10;

/** The number of times to retry a batch before it is considered failed. */
export const MAX_BATCH_DOWNLOAD_ATTEMPTS = 5;

/** Consider batch faulty after downloading and processing this number of times */
export const MAX_BATCH_PROCESSING_ATTEMPTS = 3;

/** Batch range excludes the first block of the epoch. @see Batch */
export const BATCH_SLOT_OFFSET = 1;

/** First epoch to allow to start gossip */
export const MIN_EPOCH_TO_START_GOSSIP = -1;

/**
* Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
* blocks per batch are requested _at most_. A batch may request less blocks to account for
* already requested slots. There is a timeout for each batch request. If this value is too high,
* we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which
* case the responder will fill the response up to the max request size, assuming they have the
* bandwidth to do so.
*/
export const EPOCHS_PER_BATCH = 2;

/**
* The maximum number of batches to queue before requesting more.
* In good network conditions downloading batches is much faster than processing them
* A number > 5 results in wasted progress when the chain completes syncing
*
* TODO: When switching branches usually all batches in AwaitingProcessing are dropped, could it be optimized?
*/
export const BATCH_BUFFER_SIZE = 5;
4 changes: 4 additions & 0 deletions packages/lodestar/src/sync/gossip/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ export class BeaconGossipHandler {
this.addGossipHandlers();
}

get isStarted(): boolean {
return this.state.status === GossipHandlerStatus.Started;
}

close(): void {
this.removeGossipHandlers();
if (this.state.status === GossipHandlerStatus.Started) {
Expand Down
1 change: 0 additions & 1 deletion packages/lodestar/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@
*/
export * from "./interface";
export * from "./sync";
export * from "./regular";
export * from "./gossip";
Loading