Skip to content

Commit

Permalink
feat: add reqresp TTFB metrics (#5589)
Browse files Browse the repository at this point in the history
Add reqresp TTFB metrics
  • Loading branch information
dapplion committed May 30, 2023
1 parent 5672d84 commit bcf9c92
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 9 deletions.
3 changes: 2 additions & 1 deletion packages/reqresp/src/ReqResp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ export class ReqResp {

try {
yield* sendRequest(
{logger: this.logger, libp2p: this.libp2p, peerClient},
{logger: this.logger, libp2p: this.libp2p, metrics: this.metrics, peerClient},
peerId,
protocols,
protocolIDs,
Expand Down Expand Up @@ -218,6 +218,7 @@ export class ReqResp {
try {
await handleRequest({
logger: this.logger,
metrics: this.metrics,
stream,
peerId,
protocol: protocol as Protocol,
Expand Down
8 changes: 7 additions & 1 deletion packages/reqresp/src/encoders/responseEncode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ import {RespStatus, RpcResponseStatusError} from "../interface.js";
* Note: `response` has zero or more chunks (denoted by `<>*`)
*/
export function responseEncodeSuccess(
protocol: Protocol
protocol: Protocol,
cbs: {onChunk: (chunkIndex: number) => void}
): (source: AsyncIterable<ResponseOutgoing>) => AsyncIterable<Buffer> {
return async function* responseEncodeSuccessTransform(source) {
let chunkIndex = 0;

for await (const chunk of source) {
// Postfix increment, return 0 as first chunk
cbs.onChunk(chunkIndex++);

// <result>
yield Buffer.from([RespStatus.SUCCESS]);

Expand Down
20 changes: 18 additions & 2 deletions packages/reqresp/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ export function getMetrics(register: MetricsRegister) {
name: "beacon_reqresp_outgoing_request_roundtrip_time_seconds",
help: "Histogram of outgoing requests round-trip time",
labelNames: ["method"],
buckets: [0.1, 0.2, 0.5, 1, 5, 15, 60],
// Spec sets RESP_TIMEOUT = 10 sec
buckets: [0.1, 0.2, 0.5, 1, 5, 10, 15, 60],
}),
outgoingErrors: register.gauge<"method">({
name: "beacon_reqresp_outgoing_requests_error_total",
Expand All @@ -90,13 +91,28 @@ export function getMetrics(register: MetricsRegister) {
name: "beacon_reqresp_incoming_request_handler_time_seconds",
help: "Histogram of incoming requests internal handling time",
labelNames: ["method"],
buckets: [0.1, 0.2, 0.5, 1, 5],
// Spec sets RESP_TIMEOUT = 10 sec
buckets: [0.1, 0.2, 0.5, 1, 5, 10],
}),
incomingErrors: register.gauge<"method">({
name: "beacon_reqresp_incoming_requests_error_total",
help: "Counts total failed responses handled per method",
labelNames: ["method"],
}),
outgoingResponseTTFB: register.histogram<"method">({
name: "beacon_reqresp_outgoing_response_ttfb_seconds",
help: "Time to first byte (TTFB) for outgoing responses",
labelNames: ["method"],
// Spec sets TTFB_TIMEOUT = 5 sec
buckets: [0.1, 1, 5],
}),
incomingResponseTTFB: register.histogram<"method">({
name: "beacon_reqresp_incoming_response_ttfb_seconds",
help: "Time to first byte (TTFB) for incoming responses",
labelNames: ["method"],
// Spec sets TTFB_TIMEOUT = 5 sec
buckets: [0.1, 1, 5],
}),
dialErrors: register.gauge({
name: "beacon_reqresp_dial_errors_total",
help: "Count total dial errors",
Expand Down
8 changes: 7 additions & 1 deletion packages/reqresp/src/request/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {Uint8ArrayList} from "uint8arraylist";
import {ErrorAborted, Logger, withTimeout, TimeoutError} from "@lodestar/utils";
import {MixedProtocol, ResponseIncoming} from "../types.js";
import {prettyPrintPeerId, abortableSource} from "../utils/index.js";
import {Metrics} from "../metrics.js";
import {ResponseError} from "../response/index.js";
import {requestEncode} from "../encoders/requestEncode.js";
import {responseDecode} from "../encoders/responseDecode.js";
Expand Down Expand Up @@ -32,6 +33,7 @@ export interface SendRequestOpts {
type SendRequestModules = {
logger: Logger;
libp2p: Libp2p;
metrics: Metrics | null;
peerClient?: string;
};

Expand All @@ -47,7 +49,7 @@ type SendRequestModules = {
* - The maximum number of requested chunks are read. Does not throw, returns read chunks only.
*/
export async function* sendRequest(
{logger, libp2p, peerClient}: SendRequestModules,
{logger, libp2p, metrics, peerClient}: SendRequestModules,
peerId: PeerId,
protocols: MixedProtocol[],
protocolIDs: string[],
Expand Down Expand Up @@ -110,6 +112,9 @@ export async function* sendRequest(
}
});

// TODO: Does the TTFB timer start on opening stream or after receiving request
const timerTTFB = metrics?.outgoingResponseTTFB.startTimer({method});

// Parse protocol selected by the responder
const protocolId = stream.stat.protocol ?? "unknown";
const protocol = protocolsMap.get(protocolId);
Expand Down Expand Up @@ -177,6 +182,7 @@ export async function* sendRequest(
onFirstHeader() {
// On first byte, cancel the single use TTFB_TIMEOUT, and start RESP_TIMEOUT
clearTimeout(timeoutTTFB);
timerTTFB?.();
restartRespTimeout();
},
onFirstResponseChunk() {
Expand Down
12 changes: 11 additions & 1 deletion packages/reqresp/src/response/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {responseEncodeError, responseEncodeSuccess} from "../encoders/responseEn
import {RespStatus} from "../interface.js";
import {RequestError, RequestErrorCode} from "../request/errors.js";
import {ReqRespRateLimiter} from "../rate_limiter/ReqRespRateLimiter.js";
import {Metrics} from "../metrics.js";
import {ResponseError} from "./errors.js";

export {ResponseError};
Expand All @@ -19,6 +20,7 @@ export const DEFAULT_REQUEST_TIMEOUT = 5 * 1000; // 5 sec

export interface HandleRequestOpts {
logger: Logger;
metrics: Metrics | null;
stream: Stream;
peerId: PeerId;
protocol: Protocol;
Expand All @@ -44,6 +46,7 @@ export interface HandleRequestOpts {
*/
export async function handleRequest({
logger,
metrics,
stream,
peerId,
protocol,
Expand All @@ -65,6 +68,9 @@ export async function handleRequest({
// in case request whose body is a List fails at chunk_i > 0, without breaking out of the for..await..of
(async function* requestHandlerSource() {
try {
// TODO: Does the TTFB timer start on opening stream or after receiving request
const timerTTFB = metrics?.outgoingResponseTTFB.startTimer({method: protocol.method});

const requestBody = await withTimeout(
() => pipe(stream.source as AsyncIterable<Uint8ArrayList>, requestDecode(protocol)),
REQUEST_TIMEOUT,
Expand Down Expand Up @@ -97,7 +103,11 @@ export async function handleRequest({
// NOTE: Do not log the resp chunk contents, logs get extremely cluttered
// Note: Not logging on each chunk since after 1 year it hasn't add any value when debugging
// onChunk(() => logger.debug("Resp sending chunk", logCtx)),
responseEncodeSuccess(protocol)
responseEncodeSuccess(protocol, {
onChunk(chunkIndex) {
if (chunkIndex === 0) timerTTFB?.();
},
})
);
} catch (e) {
const status = e instanceof ResponseError ? e.status : RespStatus.SERVER_ERROR;
Expand Down
4 changes: 2 additions & 2 deletions packages/reqresp/test/unit/request/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ describe("request / sendRequest", () => {

const responses = await pipe(
sendRequest(
{logger, libp2p},
{logger, libp2p, metrics: null},
peerId,
protocols,
protocols.map((p) => p.method),
Expand Down Expand Up @@ -144,7 +144,7 @@ describe("request / sendRequest", () => {
await expectRejectedWithLodestarError(
pipe(
sendRequest(
{logger, libp2p},
{logger, libp2p, metrics: null},
peerId,
[emptyProtocol],
[testMethod],
Expand Down
1 change: 1 addition & 0 deletions packages/reqresp/test/unit/response/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ describe("response / handleRequest", () => {

const resultPromise = handleRequest({
logger,
metrics: null,
protocol,
protocolID: protocol.method,
stream,
Expand Down
6 changes: 5 additions & 1 deletion packages/reqresp/test/utils/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import {arrToSource} from "../utils/index.js";
export async function* responseEncode(responseChunks: ResponseChunk[], protocol: Protocol): AsyncIterable<Buffer> {
for (const chunk of responseChunks) {
if (chunk.status === RespStatus.SUCCESS) {
yield* pipe(arrToSource([(chunk as SuccessResponseChunk).payload]), responseEncodeSuccess(protocol));
yield* pipe(
arrToSource([(chunk as SuccessResponseChunk).payload]),
// eslint-disable-next-line @typescript-eslint/no-empty-function
responseEncodeSuccess(protocol, {onChunk: () => {}})
);
} else {
yield* responseEncodeError(protocol, chunk.status, chunk.errorMessage);
}
Expand Down

0 comments on commit bcf9c92

Please sign in to comment.