Skip to content

Commit

Permalink
Merge 67c7eb5 into b79e9ba
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain authored Sep 8, 2023
2 parents b79e9ba + 67c7eb5 commit 11781f8
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 32 deletions.
26 changes: 13 additions & 13 deletions packages/beacon-node/src/network/core/networkCoreWorker.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import worker from "node:worker_threads";
import fs from "node:fs";
import path from "node:path";
import {createFromProtobuf} from "@libp2p/peer-id-factory";
import worker from "node:worker_threads";
import type {ModuleThread} from "@chainsafe/threads";
import {expose} from "@chainsafe/threads/worker";
import type {WorkerModule} from "@chainsafe/threads/dist/types/worker.js";
import {createFromProtobuf} from "@libp2p/peer-id-factory";
import {chainConfigFromJson, createBeaconConfig} from "@lodestar/config";
import {getNodeLogger} from "@lodestar/logger/node";
import {collectNodeJSMetrics, RegistryMetricCreator} from "../../metrics/index.js";
import {RegistryMetricCreator, collectNodeJSMetrics} from "../../metrics/index.js";
import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js";
import {Clock} from "../../util/clock.js";
import {wireEventsOnWorkerThread} from "../../util/workerEvents.js";
import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js";
import {peerIdToString} from "../../util/peerId.js";
import {profileNodeJS} from "../../util/profile.js";
import {getNetworkCoreWorkerMetrics} from "./metrics.js";
import {NetworkWorkerApi, NetworkWorkerData} from "./types.js";
import {NetworkCore} from "./networkCore.js";
import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js";
import {wireEventsOnWorkerThread} from "../../util/workerEvents.js";
import {
NetworkWorkerThreadEventType,
ReqRespBridgeEventBus,
Expand All @@ -24,8 +21,11 @@ import {
getReqRespBridgeRespEvents,
reqRespBridgeEventDirection,
} from "./events.js";
import {getNetworkCoreWorkerMetrics} from "./metrics.js";
import {NetworkCore} from "./networkCore.js";
import {NetworkWorkerApi, NetworkWorkerData} from "./types.js";

// Cloned data from instatiation
// Cloned data from instantiation
const workerData = worker.workerData as NetworkWorkerData;
const parentPort = worker.parentPort;
// eslint-disable-next-line @typescript-eslint/strict-boolean-expressions
Expand Down Expand Up @@ -120,9 +120,9 @@ wireEventsOnWorkerThread<ReqRespBridgeEventData>(
);

const libp2pWorkerApi: NetworkWorkerApi = {
close: () => {
close: async () => {
abortController.abort();
return core.close();
await core.close();
},
scrapeMetrics: () => core.scrapeMetrics(),

Expand Down Expand Up @@ -162,4 +162,4 @@ const libp2pWorkerApi: NetworkWorkerApi = {
},
};

expose(libp2pWorkerApi as WorkerModule<keyof NetworkWorkerApi>);
expose(libp2pWorkerApi as ModuleThread<NetworkWorkerApi>);
50 changes: 31 additions & 19 deletions packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
import worker_threads from "node:worker_threads";
import {exportToProtobuf} from "@libp2p/peer-id-factory";
import {PeerId} from "@libp2p/interface/peer-id";
import {PeerScoreStatsDump} from "@chainsafe/libp2p-gossipsub/dist/src/score/peer-score.js";
import {PublishOpts} from "@chainsafe/libp2p-gossipsub/types";
import {spawn, Thread, Worker} from "@chainsafe/threads";
import {ModuleThread, Thread, Worker, spawn} from "@chainsafe/threads";
import {PeerId} from "@libp2p/interface/peer-id";
import {exportToProtobuf} from "@libp2p/peer-id-factory";
import {routes} from "@lodestar/api";
import {phase0} from "@lodestar/types";
import {ResponseIncoming, ResponseOutgoing} from "@lodestar/reqresp";
import {BeaconConfig, chainConfigToJson} from "@lodestar/config";
import type {LoggerNode} from "@lodestar/logger/node";
import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js";
import {wireEventsOnMainThread} from "../../util/workerEvents.js";
import {ResponseIncoming, ResponseOutgoing} from "@lodestar/reqresp";
import {phase0} from "@lodestar/types";
import {Metrics} from "../../metrics/index.js";
import {IncomingRequestArgs, OutgoingRequestArgs, GetReqRespHandlerFn} from "../reqresp/types.js";
import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js";
import {peerIdFromString} from "../../util/peerId.js";
import {terminateWorkerThread, wireEventsOnMainThread} from "../../util/workerEvents.js";
import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js";
import {CommitteeSubscription} from "../subnets/interface.js";
import {PeerAction, PeerScoreStats} from "../peers/index.js";
import {NetworkOptions} from "../options.js";
import {peerIdFromString} from "../../util/peerId.js";
import {NetworkWorkerApi, NetworkWorkerData, INetworkCore, MultiaddrStr, PeerIdStr} from "./types.js";
import {PeerAction, PeerScoreStats} from "../peers/index.js";
import {GetReqRespHandlerFn, IncomingRequestArgs, OutgoingRequestArgs} from "../reqresp/types.js";
import {CommitteeSubscription} from "../subnets/interface.js";
import {
NetworkWorkerThreadEventType,
ReqRespBridgeEventBus,
Expand All @@ -27,6 +26,7 @@ import {
getReqRespBridgeRespEvents,
reqRespBridgeEventDirection,
} from "./events.js";
import {INetworkCore, MultiaddrStr, NetworkWorkerApi, NetworkWorkerData, PeerIdStr} from "./types.js";

export type WorkerNetworkCoreOpts = NetworkOptions & {
metricsEnabled: boolean;
Expand All @@ -47,10 +47,13 @@ export type WorkerNetworkCoreInitModules = {
};

type WorkerNetworkCoreModules = WorkerNetworkCoreInitModules & {
workerApi: NetworkWorkerApi;
networkThreadApi: ModuleThread<NetworkWorkerApi>;
worker: Worker;
};

const NETWORK_WORKER_EXIT_TIMEOUT_MS = 1000;
const NETWORK_WORKER_EXIT_RETRY_COUNT = 3;

/**
* NetworkCore implementation using a Worker thread
*/
Expand Down Expand Up @@ -81,6 +84,10 @@ export class WorkerNetworkCore implements INetworkCore {
reqRespBridgeEventDirection
);

Thread.errors(modules.networkThreadApi).subscribe((err) => {
this.modules.logger.error("Network worker thread error", {}, err);
});

const {metrics} = modules;
if (metrics) {
metrics.networkWorkerHandler.reqRespBridgeReqCallerPending.addCollect(() => {
Expand Down Expand Up @@ -124,24 +131,29 @@ export class WorkerNetworkCore implements INetworkCore {
} as ConstructorParameters<typeof Worker>[1]);

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const workerApi = (await spawn<any>(worker, {
const networkThreadApi = (await spawn<any>(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
// TODO: types are broken on spawn, which claims that `NetworkWorkerApi` does not satifies its contrains
})) as unknown as NetworkWorkerApi;
})) as unknown as ModuleThread<NetworkWorkerApi>;

return new WorkerNetworkCore({
...modules,
workerApi,
networkThreadApi,
worker,
});
}

async close(): Promise<void> {
await this.getApi().close();
this.modules.logger.debug("terminating network worker");
await Thread.terminate(this.modules.workerApi as unknown as Thread);
await terminateWorkerThread({
worker: this.getApi(),
retryCount: NETWORK_WORKER_EXIT_RETRY_COUNT,
retryMs: NETWORK_WORKER_EXIT_TIMEOUT_MS,
logger: this.modules.logger,
});
this.modules.logger.debug("terminated network worker");
}

Expand Down Expand Up @@ -231,7 +243,7 @@ export class WorkerNetworkCore implements INetworkCore {
return this.getApi().writeDiscv5Profile(durationMs, dirpath);
}

private getApi(): NetworkWorkerApi {
return this.modules.workerApi;
private getApi(): ModuleThread<NetworkWorkerApi> {
return this.modules.networkThreadApi;
}
}
3 changes: 3 additions & 0 deletions packages/beacon-node/src/network/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ export type NetworkWorkerData = {
* API exposed by the libp2p worker
*/
export type NetworkWorkerApi = INetworkCorePublic & {
// To satisfy the constraint of `ModuleThread` type
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[string: string]: (...args: any[]) => Promise<any> | any;
// Async method through worker boundary
reportPeer(peer: PeerIdStr, action: PeerAction, actionName: string): Promise<void>;
reStatusPeers(peers: PeerIdStr[]): Promise<void>;
Expand Down
34 changes: 34 additions & 0 deletions packages/beacon-node/src/util/workerEvents.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import {MessagePort, Worker} from "node:worker_threads";
import {Thread} from "@chainsafe/threads";
import {Logger} from "@lodestar/logger";
import {sleep} from "@lodestar/utils";
import {StrictEventEmitterSingleArg} from "./strictEvents.js";

export type WorkerBridgeEvent<EventData> = {
Expand Down Expand Up @@ -85,3 +88,34 @@ export function wireEventsOnMainThread<EventData>(
}
}
}

export async function terminateWorkerThread({
worker,
retryMs,
retryCount,
logger,
}: {
worker: Thread;
retryMs: number;
retryCount: number;
logger?: Logger;
}): Promise<void> {
const terminated = new Promise((resolve) => {
Thread.events(worker).subscribe((event) => {
if (event.type === "termination") {
resolve(true);
}
});
});

for (let i = 0; i < retryCount; i++) {
await Thread.terminate(worker);
const result = await Promise.race([terminated, sleep(retryMs).then(() => false)]);

if (result) return;

logger?.warn("Worker thread failed to terminate, retrying...");
}

throw new Error(`Worker thread failed to terminate in ${retryCount * retryMs}ms.`);
}

0 comments on commit 11781f8

Please sign in to comment.