Skip to content

Commit

Permalink
refactor: class HttpMetricsServer as close only (#5586)
Browse files Browse the repository at this point in the history
HttpMetricsServer as close only class
  • Loading branch information
dapplion committed May 30, 2023
1 parent 0e93c07 commit d79b869
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 100 deletions.
2 changes: 1 addition & 1 deletion packages/beacon-node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export {createNodeJsLibp2p, NodeJsLibp2pOpts} from "./network/index.js";
export * from "./node/index.js";

// Export metrics utilities to de-duplicate validator metrics
export {RegistryMetricCreator, collectNodeJSMetrics, HttpMetricsServer} from "./metrics/index.js";
export {RegistryMetricCreator, collectNodeJSMetrics, HttpMetricsServer, getHttpMetricsServer} from "./metrics/index.js";

// Export monitoring service to make it usable by validator
export {MonitoringService} from "./monitoring/index.js";
Expand Down
151 changes: 71 additions & 80 deletions packages/beacon-node/src/metrics/server/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import http from "node:http";
import {Registry} from "prom-client";
import {Logger} from "@lodestar/utils";
import {wrapError} from "../../util/wrapError.js";
import {HistogramExtra} from "../utils/histogram.js";
import {HttpActiveSocketsTracker} from "../../api/rest/activeSockets.js";
import {RegistryMetricCreator} from "../utils/registryMetricCreator.js";

Expand All @@ -11,99 +10,91 @@ export type HttpMetricsServerOpts = {
address?: string;
};

export class HttpMetricsServer {
private readonly server: http.Server;
private readonly register: Registry;
private readonly getOtherMetrics: () => Promise<string>;
private readonly logger: Logger;
private readonly activeSockets: HttpActiveSocketsTracker;

private readonly httpServerRegister: RegistryMetricCreator;
private readonly scrapeTimeMetric: HistogramExtra<"status">;

constructor(
private readonly opts: HttpMetricsServerOpts,
{
register,
getOtherMetrics = async () => "",
logger,
}: {register: Registry; getOtherMetrics?: () => Promise<string>; logger: Logger}
) {
this.logger = logger;
this.register = register;
this.getOtherMetrics = getOtherMetrics;
this.server = http.createServer(this.onRequest.bind(this));

// New registry to metric the metrics. Using the same registry would deadlock the .metrics promise
this.httpServerRegister = new RegistryMetricCreator();

this.scrapeTimeMetric = this.httpServerRegister.histogram<"status">({
name: "lodestar_metrics_scrape_seconds",
help: "Lodestar metrics server async time to scrape metrics",
labelNames: ["status"],
buckets: [0.1, 1, 10],
});

const socketsMetrics = {
activeSockets: this.httpServerRegister.gauge({
name: "lodestar_metrics_server_active_sockets_count",
help: "Metrics server current count of active sockets",
}),
socketsBytesRead: this.httpServerRegister.gauge({
name: "lodestar_metrics_server_sockets_bytes_read_total",
help: "Metrics server total count of bytes read on all sockets",
}),
socketsBytesWritten: this.httpServerRegister.gauge({
name: "lodestar_metrics_server_sockets_bytes_written_total",
help: "Metrics server total count of bytes written on all sockets",
}),
};

this.activeSockets = new HttpActiveSocketsTracker(this.server, socketsMetrics);
}

async start(): Promise<void> {
const {port, address} = this.opts;
this.logger.info("Starting metrics HTTP server", {port, address: address ?? "127.0.0.1"});
return new Promise((resolve, reject) => {
this.server.once("error", reject);
this.server.listen(port, address, resolve);
});
}
export type HttpMetricsServer = {
close(): Promise<void>;
};

async stop(): Promise<void> {
// In NodeJS land calling close() only causes new connections to be rejected.
// Existing connections can prevent .close() from resolving for potentially forever.
// In Lodestar case when the BeaconNode wants to close we will just abruptly terminate
// all existing connections for a fast shutdown.
// Inspired by https://github.com/gajus/http-terminator/
this.activeSockets.destroyAll();
export async function getHttpMetricsServer(
opts: HttpMetricsServerOpts,
{
register,
getOtherMetrics = async () => "",
logger,
}: {register: Registry; getOtherMetrics?: () => Promise<string>; logger: Logger}
): Promise<HttpMetricsServer> {
// New registry to metric the metrics. Using the same registry would deadlock the .metrics promise
const httpServerRegister = new RegistryMetricCreator();

await new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err) reject(err);
else resolve();
});
});
}
const scrapeTimeMetric = httpServerRegister.histogram<"status">({
name: "lodestar_metrics_scrape_seconds",
help: "Lodestar metrics server async time to scrape metrics",
labelNames: ["status"],
buckets: [0.1, 1, 10],
});

private async onRequest(req: http.IncomingMessage, res: http.ServerResponse): Promise<void> {
const server = http.createServer(async function onRequest(
req: http.IncomingMessage,
res: http.ServerResponse
): Promise<void> {
if (req.method === "GET" && req.url && req.url.includes("/metrics")) {
const timer = this.scrapeTimeMetric.startTimer();
const metricsRes = await Promise.all([wrapError(this.register.metrics()), this.getOtherMetrics()]);
const timer = scrapeTimeMetric.startTimer();
const metricsRes = await Promise.all([wrapError(register.metrics()), getOtherMetrics()]);
timer({status: metricsRes[0].err ? "error" : "success"});

// Ensure we only writeHead once
if (metricsRes[0].err) {
res.writeHead(500, {"content-type": "text/plain"}).end(metricsRes[0].err.stack);
} else {
// Get scrape time metrics
const httpServerMetrics = await this.httpServerRegister.metrics();
const httpServerMetrics = await httpServerRegister.metrics();
const metricsStr = `${metricsRes[0].result}\n\n${metricsRes[1]}\n\n${httpServerMetrics}`;
res.writeHead(200, {"content-type": this.register.contentType}).end(metricsStr);
res.writeHead(200, {"content-type": register.contentType}).end(metricsStr);
}
} else {
res.writeHead(404).end();
}
}
});

const socketsMetrics = {
activeSockets: httpServerRegister.gauge({
name: "lodestar_metrics_server_active_sockets_count",
help: "Metrics server current count of active sockets",
}),
socketsBytesRead: httpServerRegister.gauge({
name: "lodestar_metrics_server_sockets_bytes_read_total",
help: "Metrics server total count of bytes read on all sockets",
}),
socketsBytesWritten: httpServerRegister.gauge({
name: "lodestar_metrics_server_sockets_bytes_written_total",
help: "Metrics server total count of bytes written on all sockets",
}),
};

const activeSockets = new HttpActiveSocketsTracker(server, socketsMetrics);

const {port, address} = opts;
logger.info("Starting metrics HTTP server", {port, address: address ?? "127.0.0.1"});

await new Promise<void>((resolve, reject) => {
server.once("error", reject);
server.listen(port, address, resolve);
});

return {
async close(): Promise<void> {
// In NodeJS land calling close() only causes new connections to be rejected.
// Existing connections can prevent .close() from resolving for potentially forever.
// In Lodestar case when the BeaconNode wants to close we will just abruptly terminate
// all existing connections for a fast shutdown.
// Inspired by https://github.com/gajus/http-terminator/
activeSockets.destroyAll();

await new Promise<void>((resolve, reject) => {
server.close((err) => {
if (err) reject(err);
else resolve();
});
});
},
};
}
15 changes: 6 additions & 9 deletions packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {Network, getReqRespHandlers} from "../network/index.js";
import {BeaconSync, IBeaconSync} from "../sync/index.js";
import {BackfillSync} from "../sync/backfill/index.js";
import {BeaconChain, IBeaconChain, initBeaconMetrics} from "../chain/index.js";
import {createMetrics, Metrics, HttpMetricsServer} from "../metrics/index.js";
import {createMetrics, Metrics, HttpMetricsServer, getHttpMetricsServer} from "../metrics/index.js";
import {MonitoringService} from "../monitoring/index.js";
import {getApi, BeaconRestApiServer} from "../api/index.js";
import {initializeExecutionEngine, initializeExecutionBuilder} from "../execution/index.js";
Expand All @@ -36,7 +36,7 @@ export type BeaconNodeModules = {
api: {[K in keyof Api]: ServerApi<Api[K]>};
sync: IBeaconSync;
backfillSync: BackfillSync | null;
metricsServer?: HttpMetricsServer;
metricsServer: HttpMetricsServer | null;
monitoring: MonitoringService | null;
restApi?: BeaconRestApiServer;
controller?: AbortController;
Expand Down Expand Up @@ -90,7 +90,7 @@ export class BeaconNode {
config: BeaconConfig;
db: IBeaconDb;
metrics: Metrics | null;
metricsServer?: HttpMetricsServer;
metricsServer: HttpMetricsServer | null;
monitoring: MonitoringService | null;
network: Network;
chain: IBeaconChain;
Expand Down Expand Up @@ -272,15 +272,12 @@ export class BeaconNode {

// only start server if metrics are explicitly enabled
const metricsServer = opts.metrics.enabled
? new HttpMetricsServer(opts.metrics, {
? await getHttpMetricsServer(opts.metrics, {
register: (metrics as Metrics).register,
getOtherMetrics: () => network.scrapeMetrics(),
logger: logger.child({module: LoggerModule.metrics}),
})
: undefined;
if (metricsServer) {
await metricsServer.start();
}
: null;

const restApi = new BeaconRestApiServer(opts.api.rest, {
config,
Expand Down Expand Up @@ -320,7 +317,7 @@ export class BeaconNode {
this.sync.close();
this.backfillSync?.close();
await this.network.close();
if (this.metricsServer) await this.metricsServer.stop();
if (this.metricsServer) await this.metricsServer.close();
if (this.monitoring) this.monitoring.stop();
if (this.restApi) await this.restApi.close();
await this.chain.persistToDisk();
Expand Down
13 changes: 7 additions & 6 deletions packages/beacon-node/test/unit/metrics/server/http.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import request from "supertest";
import {HttpMetricsServer} from "../../../../src/metrics/index.js";
import fetch from "cross-fetch";
import {getHttpMetricsServer, HttpMetricsServer} from "../../../../src/metrics/index.js";
import {testLogger} from "../../../utils/logger.js";
import {createMetricsTest} from "../utils.js";

describe("HttpMetricsServer", () => {
const logger = testLogger();

let server: HttpMetricsServer | null = null;
const port = 14500;

it("should serve metrics on /metrics", async () => {
const metrics = createMetricsTest();
server = new HttpMetricsServer({port: 0}, {register: metrics.register, logger});
server = await getHttpMetricsServer({port}, {register: metrics.register, logger});

await server.start();
await request(server["server"]).get("/metrics").expect(200);
const res = await fetch(`http://127.0.0.1:${port}/metrics`);
await res.text();
});

after(async () => {
if (server) await server.stop();
if (server) await server.close();
});
});
12 changes: 8 additions & 4 deletions packages/cli/src/cmds/validator/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ import {
BuilderSelection,
} from "@lodestar/validator";
import {getMetrics, MetricsRegister} from "@lodestar/validator";
import {RegistryMetricCreator, collectNodeJSMetrics, HttpMetricsServer, MonitoringService} from "@lodestar/beacon-node";
import {
RegistryMetricCreator,
collectNodeJSMetrics,
getHttpMetricsServer,
MonitoringService,
} from "@lodestar/beacon-node";
import {getNodeLogger} from "@lodestar/logger/node";
import {getBeaconConfigFromArgs} from "../../config/index.js";
import {GlobalArgs} from "../../options/index.js";
Expand Down Expand Up @@ -122,10 +127,9 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr
if (args["metrics"]) {
const port = args["metrics.port"] ?? validatorMetricsDefaultOptions.port;
const address = args["metrics.address"] ?? validatorMetricsDefaultOptions.address;
const metricsServer = new HttpMetricsServer({port, address}, {register, logger});
const metricsServer = await getHttpMetricsServer({port, address}, {register, logger});

onGracefulShutdownCbs.push(() => metricsServer.stop());
await metricsServer.start();
onGracefulShutdownCbs.push(() => metricsServer.close());
}
}

Expand Down

0 comments on commit d79b869

Please sign in to comment.