Skip to content

Commit

Permalink
Attempt to cleanly exit beacon node
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig committed May 23, 2023
1 parent 504a9d3 commit 6b7fd39
Show file tree
Hide file tree
Showing 19 changed files with 175 additions and 24 deletions.
2 changes: 1 addition & 1 deletion lodestar
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
#
# ./lodestar.sh beacon --network prater

node --trace-deprecation --max-old-space-size=4096 ./packages/cli/bin/lodestar.js "$@"
node --trace-deprecation --max-old-space-size=4096 -r why-is-node-running/include ./packages/cli/bin/lodestar.js "$@"
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
"release:tag-rc": "node scripts/release/tag_rc.mjs",
"release:tag-stable": "node scripts/release/tag_stable.mjs",
"release:publish": "lerna publish from-package --yes --no-verify-access",
"check-readme": "lerna run check-readme"
"check-readme": "lerna run check-readme",
"postinstall": "patch-package"
},
"devDependencies": {
"@chainsafe/eslint-plugin-node": "^11.2.3",
Expand Down Expand Up @@ -68,7 +69,9 @@
"node-gyp": "^9.3.1",
"npm-run-all": "^4.1.5",
"nyc": "^15.1.0",
"patch-package": "^6.5.1",
"path-browserify": "^1.0.1",
"postinstall-postinstall": "^2.1.0",
"prettier": "^2.8.7",
"process": "^0.11.10",
"resolve-typescript-plugin": "^2.0.1",
Expand Down
6 changes: 5 additions & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import {
} from "@lodestar/state-transition";
import {routes} from "@lodestar/api";
import {ForkChoiceError, ForkChoiceErrorCode, EpochDifference, AncestorStatus} from "@lodestar/fork-choice";
import {isErrorAborted} from "@lodestar/utils";
import {ZERO_HASH_HEX} from "../../constants/index.js";
import {toCheckpointHex} from "../stateCache/index.js";
import {isOptimisticBlock} from "../../util/forkChoice.js";
import {isQueueErrorAborted} from "../../util/queue/errors.js";
import {ChainEvent, ReorgEventData} from "../emitter.js";
import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import {RegenCaller} from "../regen/interface.js";
Expand Down Expand Up @@ -317,7 +319,9 @@ export async function importBlock(
finalizedBlockHash
)
.catch((e) => {
this.logger.error("Error pushing notifyForkchoiceUpdate()", {headBlockHash, finalizedBlockHash}, e);
if (!isErrorAborted(e) && !isQueueErrorAborted(e)) {
this.logger.error("Error pushing notifyForkchoiceUpdate()", {headBlockHash, finalizedBlockHash}, e);
}
});
}
}
Expand Down
10 changes: 7 additions & 3 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {WithOptionalBytes, allForks} from "@lodestar/types";
import {toHex} from "@lodestar/utils";
import {JobItemQueue} from "../../util/queue/index.js";
import {isErrorAborted, toHex} from "@lodestar/utils";
import {JobItemQueue, isQueueErrorAborted} from "../../util/queue/index.js";
import {Metrics} from "../../metrics/metrics.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockError, BlockErrorCode, isBlockErrorAborted} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import type {BeaconChain} from "../chain.js";
import {verifyBlocksInEpoch} from "./verifyBlock.js";
Expand Down Expand Up @@ -111,6 +111,10 @@ export async function processBlocks(
await importBlock.call(this, fullyVerifiedBlock, opts);
}
} catch (e) {
if (isErrorAborted(e) || isQueueErrorAborted(e) || isBlockErrorAborted(e)) {
return; // Ignore
}

// above functions should only throw BlockError
const err = getBlockError(e, blocks[0].block);

Expand Down
9 changes: 9 additions & 0 deletions packages/beacon-node/src/chain/errors/blockError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {LodestarError} from "@lodestar/utils";
import {toHexString} from "@chainsafe/ssz";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {ExecutePayloadStatus} from "../../execution/engine/interface.js";
import {QueueErrorCode} from "../../util/queue/errors.js";
import {GossipActionError} from "./gossipValidation.js";

export enum BlockErrorCode {
Expand Down Expand Up @@ -116,6 +117,14 @@ export class BlockError extends LodestarError<BlockErrorType> {
}
}

export function isBlockErrorAborted(e: unknown): e is BlockError {
return (
e instanceof BlockError &&
e.type.code === BlockErrorCode.EXECUTION_ENGINE_ERROR &&
e.type.errorMessage === QueueErrorCode.QUEUE_ABORTED
);
}

export function renderBlockErrorType(type: BlockErrorType): Record<string, string | number | null> {
switch (type.code) {
case BlockErrorCode.PRESTATE_MISSING:
Expand Down
8 changes: 5 additions & 3 deletions packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {computeEpochAtSlot, isExecutionStateType, computeTimeAtSlot} from "@lode
import {ChainForkConfig} from "@lodestar/config";
import {ForkSeq, SLOTS_PER_EPOCH, ForkExecution} from "@lodestar/params";
import {Slot} from "@lodestar/types";
import {Logger, sleep, fromHex} from "@lodestar/utils";
import {Logger, sleep, fromHex, isErrorAborted} from "@lodestar/utils";
import {routes} from "@lodestar/api";
import {GENESIS_SLOT, ZERO_HASH_HEX} from "../constants/constants.js";
import {Metrics} from "../metrics/index.js";
Expand Down Expand Up @@ -172,8 +172,10 @@ export class PrepareNextSlotScheduler {
}
}
} catch (e) {
this.metrics?.precomputeNextEpochTransition.count.inc({result: "error"}, 1);
this.logger.error("Failed to run prepareForNextSlot", {nextEpoch, isEpochTransition, prepareSlot}, e as Error);
if (!isErrorAborted(e)) {
this.metrics?.precomputeNextEpochTransition.count.inc({result: "error"}, 1);
this.logger.error("Failed to run prepareForNextSlot", {nextEpoch, isEpochTransition, prepareSlot}, e as Error);
}
}
};
}
3 changes: 1 addition & 2 deletions packages/beacon-node/src/eth1/eth1DepositDataTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ export class Eth1DepositDataTracker {
private async runAutoUpdate(): Promise<void> {
let lastRunMs = 0;

// eslint-disable-next-line no-constant-condition
while (true) {
while (!this.signal.aborted) {
lastRunMs = Date.now();

try {
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {SLOTS_PER_EPOCH, ForkName, ForkSeq} from "@lodestar/params";
import {ErrorJsonRpcResponse, HttpRpcError} from "../../eth1/provider/jsonRpcHttpClient.js";
import {IJsonRpcHttpClient, ReqOpts} from "../../eth1/provider/jsonRpcHttpClient.js";
import {Metrics} from "../../metrics/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {JobItemQueue, isQueueErrorAborted} from "../../util/queue/index.js";
import {EPOCHS_PER_BATCH} from "../../sync/constants.js";
import {numToQuantity} from "../../eth1/provider/utils.js";
import {
Expand Down
14 changes: 12 additions & 2 deletions packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {LoggerNode} from "@lodestar/logger/node";
import {Api, ServerApi} from "@lodestar/api";
import {BeaconStateAllForks} from "@lodestar/state-transition";
import {ProcessShutdownCallback} from "@lodestar/validator";
import {sleep} from "@lodestar/utils";

import {IBeaconDb} from "../db/index.js";
import {Network, getReqRespHandlers} from "../network/index.js";
Expand Down Expand Up @@ -74,6 +75,12 @@ enum LoggerModule {
sync = "sync",
}

/**
* Short delay before closing db to give async operations sufficient time to finish
* and prevent "Database is not open" errors when shutting down beacon node.
*/
const DELAY_BEFORE_CLOSING_DB_MS = 1000;

/**
* The main Beacon Node class. Contains various components for getting and processing data from the
* Ethereum Consensus ecosystem as well as systems for getting beacon node metadata.
Expand Down Expand Up @@ -309,19 +316,22 @@ export class BeaconNode {
*/
async close(): Promise<void> {
if (this.status === BeaconNodeStatus.started) {
console.time("close bn");
this.status = BeaconNodeStatus.closing;
this.sync.close();
this.backfillSync?.close();
await this.network.close();
if (this.metricsServer) await this.metricsServer.stop();
if (this.monitoring) this.monitoring.stop();
if (this.restApi) await this.restApi.close();

await this.chain.persistToDisk();
await this.chain.close();
await this.db.stop();
if (this.controller) this.controller.abort();
await sleep(DELAY_BEFORE_CLOSING_DB_MS);
await this.db.stop();
this.status = BeaconNodeStatus.closed;
console.log("Beacon node closed!");
console.timeEnd("close bn");
}
}
}
7 changes: 5 additions & 2 deletions packages/beacon-node/src/util/clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,11 @@ export class Clock extends EventEmitter implements IClock {
this.emit(ClockEvent.epoch, currentEpoch);
}
}
//recursively invoke onNextSlot
this.timeoutId = setTimeout(this.onNextSlot, this.msUntilNextSlot());

if (!this.signal.aborted) {
//recursively invoke onNextSlot
this.timeoutId = setTimeout(this.onNextSlot, this.msUntilNextSlot());
}
};

private msUntilNextSlot(): number {
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/util/queue/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ export class QueueError extends LodestarError<QueueErrorCodeType> {
super(type);
}
}

export function isQueueErrorAborted(e: unknown): e is QueueError {
return e instanceof QueueError && e.type.code === QueueErrorCode.QUEUE_ABORTED;
}
2 changes: 1 addition & 1 deletion packages/beacon-node/src/util/queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export * from "./fnQueue.js";
export * from "./itemQueue.js";
export * from "./options.js";
export {QueueError, QueueErrorCode} from "./errors.js";
export {QueueError, QueueErrorCode, isQueueErrorAborted} from "./errors.js";
2 changes: 2 additions & 0 deletions packages/cli/bin/lodestar.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env node

await import("../lib/index.js");

console.log("Process ID", process.pid);
3 changes: 2 additions & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
"@types/inquirer": "^9.0.3",
"@types/lodash": "^4.14.192",
"@types/yargs": "^17.0.24",
"@types/debug": "^4.1.7"
"@types/debug": "^4.1.7",
"why-is-node-running": "^2.2.2"
}
}
1 change: 1 addition & 0 deletions packages/cli/src/cmds/validator/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr
const onGracefulShutdownCbs: (() => Promise<void> | void)[] = [];
onGracefulShutdown(async () => {
for (const cb of onGracefulShutdownCbs) await cb();
console.log("Validator client closed!");
}, logger.info.bind(logger));

// Callback for validator to request forced exit, in case of doppelganger detection
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/util/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export function onGracefulShutdown(
});

await cleanUpFunction();
console.log("Clean up functions succeeded!");
});
}
}
1 change: 1 addition & 0 deletions packages/reqresp/src/ReqResp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ export class ReqResp {
}

async stop(): Promise<void> {
this.rateLimiter.stop();
this.controller.abort();
}

Expand Down
37 changes: 37 additions & 0 deletions patches/@chainsafe+threads+1.10.0.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
diff --git a/node_modules/@chainsafe/threads/dist/master/implementation.node.js b/node_modules/@chainsafe/threads/dist/master/implementation.node.js
index e8a2cdb..64454e2 100644
--- a/node_modules/@chainsafe/threads/dist/master/implementation.node.js
+++ b/node_modules/@chainsafe/threads/dist/master/implementation.node.js
@@ -138,14 +138,7 @@ function initWorkerThreadsWorker() {
this.off(eventName, listener);
}
}
- const terminateWorkersAndMaster = () => {
- // we should terminate all workers and then gracefully shutdown self process
- Promise.all(allWorkers.map(worker => worker.terminate())).then(() => process.exit(0), () => process.exit(1));
- allWorkers = [];
- };
- // Take care to not leave orphaned processes behind. See #147.
- process.on("SIGINT", () => terminateWorkersAndMaster());
- process.on("SIGTERM", () => terminateWorkersAndMaster());
+
class BlobWorker extends Worker {
constructor(blob, options) {
super(Buffer.from(blob).toString("utf-8"), Object.assign(Object.assign({}, options), { fromSource: true }));
@@ -202,15 +195,7 @@ function initTinyWorker() {
return super.terminate();
}
}
- const terminateWorkersAndMaster = () => {
- // we should terminate all workers and then gracefully shutdown self process
- Promise.all(allWorkers.map(worker => worker.terminate())).then(() => process.exit(0), () => process.exit(1));
- allWorkers = [];
- };
- // Take care to not leave orphaned processes behind
- // See <https://github.com/avoidwork/tiny-worker#faq>
- process.on("SIGINT", () => terminateWorkersAndMaster());
- process.on("SIGTERM", () => terminateWorkersAndMaster());
+
class BlobWorker extends Worker {
constructor(blob, options) {
super(Buffer.from(blob).toString("utf-8"), Object.assign(Object.assign({}, options), { fromSource: true }));
Loading

0 comments on commit 6b7fd39

Please sign in to comment.