Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: class HttpMetricsServer as close only #5586

Merged
merged 1 commit into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/beacon-node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export {createNodeJsLibp2p, NodeJsLibp2pOpts} from "./network/index.js";
export * from "./node/index.js";

// Export metrics utilities to de-duplicate validator metrics
export {RegistryMetricCreator, collectNodeJSMetrics, HttpMetricsServer} from "./metrics/index.js";
export {RegistryMetricCreator, collectNodeJSMetrics, HttpMetricsServer, getHttpMetricsServer} from "./metrics/index.js";

// Export monitoring service to make it usable by validator
export {MonitoringService} from "./monitoring/index.js";
Expand Down
151 changes: 71 additions & 80 deletions packages/beacon-node/src/metrics/server/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import http from "node:http";
import {Registry} from "prom-client";
import {Logger} from "@lodestar/utils";
import {wrapError} from "../../util/wrapError.js";
import {HistogramExtra} from "../utils/histogram.js";
import {HttpActiveSocketsTracker} from "../../api/rest/activeSockets.js";
import {RegistryMetricCreator} from "../utils/registryMetricCreator.js";

Expand All @@ -11,99 +10,91 @@ export type HttpMetricsServerOpts = {
address?: string;
};

export class HttpMetricsServer {
private readonly server: http.Server;
private readonly register: Registry;
private readonly getOtherMetrics: () => Promise<string>;
private readonly logger: Logger;
private readonly activeSockets: HttpActiveSocketsTracker;

private readonly httpServerRegister: RegistryMetricCreator;
private readonly scrapeTimeMetric: HistogramExtra<"status">;

constructor(
private readonly opts: HttpMetricsServerOpts,
{
register,
getOtherMetrics = async () => "",
logger,
}: {register: Registry; getOtherMetrics?: () => Promise<string>; logger: Logger}
) {
this.logger = logger;
this.register = register;
this.getOtherMetrics = getOtherMetrics;
this.server = http.createServer(this.onRequest.bind(this));

// New registry to metric the metrics. Using the same registry would deadlock the .metrics promise
this.httpServerRegister = new RegistryMetricCreator();

this.scrapeTimeMetric = this.httpServerRegister.histogram<"status">({
name: "lodestar_metrics_scrape_seconds",
help: "Lodestar metrics server async time to scrape metrics",
labelNames: ["status"],
buckets: [0.1, 1, 10],
});

const socketsMetrics = {
activeSockets: this.httpServerRegister.gauge({
name: "lodestar_metrics_server_active_sockets_count",
help: "Metrics server current count of active sockets",
}),
socketsBytesRead: this.httpServerRegister.gauge({
name: "lodestar_metrics_server_sockets_bytes_read_total",
help: "Metrics server total count of bytes read on all sockets",
}),
socketsBytesWritten: this.httpServerRegister.gauge({
name: "lodestar_metrics_server_sockets_bytes_written_total",
help: "Metrics server total count of bytes written on all sockets",
}),
};

this.activeSockets = new HttpActiveSocketsTracker(this.server, socketsMetrics);
}

async start(): Promise<void> {
const {port, address} = this.opts;
this.logger.info("Starting metrics HTTP server", {port, address: address ?? "127.0.0.1"});
return new Promise((resolve, reject) => {
this.server.once("error", reject);
this.server.listen(port, address, resolve);
});
}
export type HttpMetricsServer = {
close(): Promise<void>;
};

async stop(): Promise<void> {
// In NodeJS land calling close() only causes new connections to be rejected.
// Existing connections can prevent .close() from resolving for potentially forever.
// In Lodestar case when the BeaconNode wants to close we will just abruptly terminate
// all existing connections for a fast shutdown.
// Inspired by https://github.com/gajus/http-terminator/
this.activeSockets.destroyAll();
export async function getHttpMetricsServer(
opts: HttpMetricsServerOpts,
{
register,
getOtherMetrics = async () => "",
logger,
}: {register: Registry; getOtherMetrics?: () => Promise<string>; logger: Logger}
): Promise<HttpMetricsServer> {
// New registry to metric the metrics. Using the same registry would deadlock the .metrics promise
const httpServerRegister = new RegistryMetricCreator();

await new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err) reject(err);
else resolve();
});
});
}
const scrapeTimeMetric = httpServerRegister.histogram<"status">({
name: "lodestar_metrics_scrape_seconds",
help: "Lodestar metrics server async time to scrape metrics",
labelNames: ["status"],
buckets: [0.1, 1, 10],
});

private async onRequest(req: http.IncomingMessage, res: http.ServerResponse): Promise<void> {
const server = http.createServer(async function onRequest(
req: http.IncomingMessage,
res: http.ServerResponse
): Promise<void> {
if (req.method === "GET" && req.url && req.url.includes("/metrics")) {
const timer = this.scrapeTimeMetric.startTimer();
const metricsRes = await Promise.all([wrapError(this.register.metrics()), this.getOtherMetrics()]);
const timer = scrapeTimeMetric.startTimer();
const metricsRes = await Promise.all([wrapError(register.metrics()), getOtherMetrics()]);
timer({status: metricsRes[0].err ? "error" : "success"});

// Ensure we only writeHead once
if (metricsRes[0].err) {
res.writeHead(500, {"content-type": "text/plain"}).end(metricsRes[0].err.stack);
} else {
// Get scrape time metrics
const httpServerMetrics = await this.httpServerRegister.metrics();
const httpServerMetrics = await httpServerRegister.metrics();
const metricsStr = `${metricsRes[0].result}\n\n${metricsRes[1]}\n\n${httpServerMetrics}`;
res.writeHead(200, {"content-type": this.register.contentType}).end(metricsStr);
res.writeHead(200, {"content-type": register.contentType}).end(metricsStr);
}
} else {
res.writeHead(404).end();
}
}
});

const socketsMetrics = {
activeSockets: httpServerRegister.gauge({
name: "lodestar_metrics_server_active_sockets_count",
help: "Metrics server current count of active sockets",
}),
socketsBytesRead: httpServerRegister.gauge({
name: "lodestar_metrics_server_sockets_bytes_read_total",
help: "Metrics server total count of bytes read on all sockets",
}),
socketsBytesWritten: httpServerRegister.gauge({
name: "lodestar_metrics_server_sockets_bytes_written_total",
help: "Metrics server total count of bytes written on all sockets",
}),
};

const activeSockets = new HttpActiveSocketsTracker(server, socketsMetrics);

const {port, address} = opts;
logger.info("Starting metrics HTTP server", {port, address: address ?? "127.0.0.1"});

await new Promise<void>((resolve, reject) => {
server.once("error", reject);
server.listen(port, address, resolve);
});

return {
async close(): Promise<void> {
// In NodeJS land calling close() only causes new connections to be rejected.
// Existing connections can prevent .close() from resolving for potentially forever.
// In Lodestar case when the BeaconNode wants to close we will just abruptly terminate
// all existing connections for a fast shutdown.
// Inspired by https://github.com/gajus/http-terminator/
activeSockets.destroyAll();

await new Promise<void>((resolve, reject) => {
server.close((err) => {
if (err) reject(err);
else resolve();
});
});
},
};
}
15 changes: 6 additions & 9 deletions packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {Network, getReqRespHandlers} from "../network/index.js";
import {BeaconSync, IBeaconSync} from "../sync/index.js";
import {BackfillSync} from "../sync/backfill/index.js";
import {BeaconChain, IBeaconChain, initBeaconMetrics} from "../chain/index.js";
import {createMetrics, Metrics, HttpMetricsServer} from "../metrics/index.js";
import {createMetrics, Metrics, HttpMetricsServer, getHttpMetricsServer} from "../metrics/index.js";
import {MonitoringService} from "../monitoring/index.js";
import {getApi, BeaconRestApiServer} from "../api/index.js";
import {initializeExecutionEngine, initializeExecutionBuilder} from "../execution/index.js";
Expand All @@ -36,7 +36,7 @@ export type BeaconNodeModules = {
api: {[K in keyof Api]: ServerApi<Api[K]>};
sync: IBeaconSync;
backfillSync: BackfillSync | null;
metricsServer?: HttpMetricsServer;
metricsServer: HttpMetricsServer | null;
monitoring: MonitoringService | null;
restApi?: BeaconRestApiServer;
controller?: AbortController;
Expand Down Expand Up @@ -90,7 +90,7 @@ export class BeaconNode {
config: BeaconConfig;
db: IBeaconDb;
metrics: Metrics | null;
metricsServer?: HttpMetricsServer;
metricsServer: HttpMetricsServer | null;
monitoring: MonitoringService | null;
network: Network;
chain: IBeaconChain;
Expand Down Expand Up @@ -272,15 +272,12 @@ export class BeaconNode {

// only start server if metrics are explicitly enabled
const metricsServer = opts.metrics.enabled
? new HttpMetricsServer(opts.metrics, {
? await getHttpMetricsServer(opts.metrics, {
register: (metrics as Metrics).register,
getOtherMetrics: () => network.scrapeMetrics(),
logger: logger.child({module: LoggerModule.metrics}),
})
: undefined;
if (metricsServer) {
await metricsServer.start();
}
: null;

const restApi = new BeaconRestApiServer(opts.api.rest, {
config,
Expand Down Expand Up @@ -320,7 +317,7 @@ export class BeaconNode {
this.sync.close();
this.backfillSync?.close();
await this.network.close();
if (this.metricsServer) await this.metricsServer.stop();
if (this.metricsServer) await this.metricsServer.close();
if (this.monitoring) this.monitoring.stop();
if (this.restApi) await this.restApi.close();
await this.chain.persistToDisk();
Expand Down
13 changes: 7 additions & 6 deletions packages/beacon-node/test/unit/metrics/server/http.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import request from "supertest";
import {HttpMetricsServer} from "../../../../src/metrics/index.js";
import fetch from "cross-fetch";
import {getHttpMetricsServer, HttpMetricsServer} from "../../../../src/metrics/index.js";
import {testLogger} from "../../../utils/logger.js";
import {createMetricsTest} from "../utils.js";

describe("HttpMetricsServer", () => {
const logger = testLogger();

let server: HttpMetricsServer | null = null;
const port = 14500;

it("should serve metrics on /metrics", async () => {
const metrics = createMetricsTest();
server = new HttpMetricsServer({port: 0}, {register: metrics.register, logger});
server = await getHttpMetricsServer({port}, {register: metrics.register, logger});

await server.start();
await request(server["server"]).get("/metrics").expect(200);
const res = await fetch(`http://127.0.0.1:${port}/metrics`);
await res.text();
});

after(async () => {
if (server) await server.stop();
if (server) await server.close();
});
});
12 changes: 8 additions & 4 deletions packages/cli/src/cmds/validator/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ import {
BuilderSelection,
} from "@lodestar/validator";
import {getMetrics, MetricsRegister} from "@lodestar/validator";
import {RegistryMetricCreator, collectNodeJSMetrics, HttpMetricsServer, MonitoringService} from "@lodestar/beacon-node";
import {
RegistryMetricCreator,
collectNodeJSMetrics,
getHttpMetricsServer,
MonitoringService,
} from "@lodestar/beacon-node";
import {getNodeLogger} from "@lodestar/logger/node";
import {getBeaconConfigFromArgs} from "../../config/index.js";
import {GlobalArgs} from "../../options/index.js";
Expand Down Expand Up @@ -122,10 +127,9 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr
if (args["metrics"]) {
const port = args["metrics.port"] ?? validatorMetricsDefaultOptions.port;
const address = args["metrics.address"] ?? validatorMetricsDefaultOptions.address;
const metricsServer = new HttpMetricsServer({port, address}, {register, logger});
const metricsServer = await getHttpMetricsServer({port, address}, {register, logger});

onGracefulShutdownCbs.push(() => metricsServer.stop());
await metricsServer.start();
onGracefulShutdownCbs.push(() => metricsServer.close());
}
}

Expand Down