From 6a70fcca978937c8e0773f4174780a24fba5ff32 Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Thu, 24 Oct 2024 01:18:38 -0500 Subject: [PATCH] Enforce maximum payload size --- PROTOCOL.md | 29 +- .../__snapshots__/serialize.test.ts.snap | 432 ++++++++++++++++++ __tests__/max-payload-size.test.ts | 203 ++++++++ router/client.ts | 80 +++- router/errors.ts | 8 + router/server.ts | 14 +- transport/options.ts | 1 + transport/sessionStateMachine/common.ts | 22 + 8 files changed, 771 insertions(+), 18 deletions(-) create mode 100644 __tests__/max-payload-size.test.ts diff --git a/PROTOCOL.md b/PROTOCOL.md index ef8ed16e..e61f7b06 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -162,7 +162,7 @@ All messages MUST have no control flags set (i.e., the `controlFlags` field is ` - The payload MUST be `{ type: 'ACK' }`. - Because this is a control message that is not associated with a specific stream, you MUST NOT set `serviceName` or `procedureName` and `streamId` can be something arbitrary (e.g. `heartbeat`). -There are 4 error payloads that are defined in the protocol sent from server to client, these codes are reserved: +There are 5 error payloads that are defined in the protocol sent from server to client, these codes are reserved: ```ts // When a client sends a malformed request. This can be @@ -187,11 +187,18 @@ interface CancelError extends BaseError { message: string; } +// This occurrs when a client or server attempts to send a message that is +// larger than the configurable max payload size. +interface MaxPayloadSizeExceededError extends BaseError { + code: 'MAX_PAYLOAD_SIZE_EXCEEDED'; + message: string; +} + // This is sent when the server encounters an internal error // i.e. an invariant has been violated interface; -type ProtocolError = UncaughtError | InvalidRequestError | CancelError; +type ProtocolError = UncaughtError | InvalidRequestError | CancelError | MaxPayloadSizeExceededError; ``` `ProtocolError`s, just like service-level errors, are wrapped with a `Result`, which is further wrapped with `TransportMessage` and MUST have a `StreamCancelBit` flag. Please note that these are separate from user-defined errors, which should be treated just like any response message. @@ -672,3 +679,21 @@ This explicit ack serves three purposes: 1. It keeps the connection alive by preventing the underlying transport from timing out. 2. It allows the session to detect when the underlying transport has been lost, even in cases where the transport does not emit a close event. 3. It provides an upper bound on how many messages the session buffers in the case of a reconnection (consider the case where an upload procedure is really long-lived and the server doesn't send any messages until the upload is finished. Without these explicit heartbeats, the client will buffer everything!). + +#### Maximum payload size handling + +It is desirable to set a maximum allowed payload size to prevent clients or servers from sending payloads that are too large for the underlying transport. Some transports may enforce a maximum payload size and close the connection if that is exceeded. This would cause the entire river session to become broken as this payload cannot be transmitted. It is better to catch this before sending the message on the transport and only fail the single procedure call that was attempting to send the large payload. + +Payload size is calculated after encoding the message with the configured codec. + +##### Client handling + +###### Init Message +If the init message of a procedure exceeds the maximum payload size, the client must inject a `MAX_PAYLOAD_SIZE_EXCEEDED` error and fail the procedure. In this case no message is ever sent to the server and is handled entirely client-side. + +###### Non-init message +If the client attempts to send a message after a procedure has been started which exceeds the maximum payload size. The client must throw an error inside the `write` call and inject a `MAX_PAYLOAD_SIZE_EXCEEDED` error in the procedure read stream. The client must also send a cancel request to the server so the server-side resources can be cleaned up. + +##### Server handling + +If the server attempts to send a message after a procedure has been started which exceeds the maximum payload size, the server must reply with a `MAX_PAYLOAD_SIZE_EXCEEDED` error and close down the procedure. diff --git a/__tests__/__snapshots__/serialize.test.ts.snap b/__tests__/__snapshots__/serialize.test.ts.snap index 2d8bdbd9..8dc68315 100644 --- a/__tests__/__snapshots__/serialize.test.ts.snap +++ b/__tests__/__snapshots__/serialize.test.ts.snap @@ -83,6 +83,22 @@ exports[`serialize server to jsonschema > serialize entire service schema 1`] = ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -176,6 +192,22 @@ exports[`serialize server to jsonschema > serialize entire service schema 1`] = ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -264,6 +296,22 @@ exports[`serialize server to jsonschema > serialize entire service schema 1`] = ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -356,6 +404,22 @@ exports[`serialize server to jsonschema > serialize entire service schema 1`] = ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -458,6 +522,22 @@ exports[`serialize server to jsonschema > serialize entire service schema 1`] = ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -589,6 +669,22 @@ exports[`serialize server to jsonschema > serialize entire service schema 1`] = ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -697,6 +793,22 @@ exports[`serialize server to jsonschema > serialize entire service schema 1`] = ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -776,6 +888,22 @@ exports[`serialize server to jsonschema > serialize entire service schema 1`] = ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -868,6 +996,22 @@ exports[`serialize service to jsonschema > serialize backwards compatible with v ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "input": { @@ -961,6 +1105,22 @@ exports[`serialize service to jsonschema > serialize backwards compatible with v ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "input": { @@ -1049,6 +1209,22 @@ exports[`serialize service to jsonschema > serialize backwards compatible with v ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -1141,6 +1317,22 @@ exports[`serialize service to jsonschema > serialize backwards compatible with v ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -1243,6 +1435,22 @@ exports[`serialize service to jsonschema > serialize backwards compatible with v ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "input": { @@ -1374,6 +1582,22 @@ exports[`serialize service to jsonschema > serialize backwards compatible with v ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -1482,6 +1706,22 @@ exports[`serialize service to jsonschema > serialize backwards compatible with v ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "input": { @@ -1561,6 +1801,22 @@ exports[`serialize service to jsonschema > serialize backwards compatible with v ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -1651,6 +1907,22 @@ exports[`serialize service to jsonschema > serialize basic service 1`] = ` ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -1744,6 +2016,22 @@ exports[`serialize service to jsonschema > serialize basic service 1`] = ` ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -1832,6 +2120,22 @@ exports[`serialize service to jsonschema > serialize basic service 1`] = ` ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -1924,6 +2228,22 @@ exports[`serialize service to jsonschema > serialize basic service 1`] = ` ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -2026,6 +2346,22 @@ exports[`serialize service to jsonschema > serialize basic service 1`] = ` ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -2157,6 +2493,22 @@ exports[`serialize service to jsonschema > serialize basic service 1`] = ` ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -2265,6 +2617,22 @@ exports[`serialize service to jsonschema > serialize basic service 1`] = ` ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -2344,6 +2712,22 @@ exports[`serialize service to jsonschema > serialize basic service 1`] = ` ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -2434,6 +2818,22 @@ exports[`serialize service to jsonschema > serialize service with binary 1`] = ` ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -2578,6 +2978,22 @@ exports[`serialize service to jsonschema > serialize service with errors 1`] = ` ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { @@ -2691,6 +3107,22 @@ exports[`serialize service to jsonschema > serialize service with errors 1`] = ` ], "type": "object", }, + { + "properties": { + "code": { + "const": "MAX_PAYLOAD_SIZE_EXCEEDED", + "type": "string", + }, + "message": { + "type": "string", + }, + }, + "required": [ + "code", + "message", + ], + "type": "object", + }, ], }, "init": { diff --git a/__tests__/max-payload-size.test.ts b/__tests__/max-payload-size.test.ts new file mode 100644 index 00000000..5f4e2e9d --- /dev/null +++ b/__tests__/max-payload-size.test.ts @@ -0,0 +1,203 @@ +import { + afterEach, + assert, + beforeEach, + describe, + expect, + test, + vitest, +} from 'vitest'; +import { createMockTransportNetwork } from '../testUtil/fixtures/mockTransport'; +import { + Err, + Ok, + Procedure, + ServiceSchema, + createClient, + createServer, +} from '../router'; +import { MAX_PAYLOAD_SIZE_EXCEEDED_CODE } from '../router/errors'; +import { Type } from '@sinclair/typebox'; +import { readNextResult } from '../testUtil'; +import { MaxPayloadSizeExceeded } from '../transport/sessionStateMachine/common'; + +describe('client exceeded max payload size', () => { + let mockTransportNetwork: ReturnType; + + beforeEach(async () => { + mockTransportNetwork = createMockTransportNetwork({ + client: { maxPayloadSizeBytes: 1024 }, + }); + }); + + afterEach(async () => { + await mockTransportNetwork.cleanup(); + }); + + test('rpc init exceeds max payload size', async () => { + const mockHandler = vitest.fn(); + const services = { + service: ServiceSchema.define({ + echo: Procedure.rpc({ + requestInit: Type.String(), + responseData: Type.String(), + handler: mockHandler, + }), + }), + }; + createServer(mockTransportNetwork.getServerTransport(), services); + const client = createClient( + mockTransportNetwork.getClientTransport('client'), + 'SERVER', + ); + + const result = await client.service.echo.rpc('0'.repeat(1025)); + expect(result).toStrictEqual({ + ok: false, + payload: { + code: MAX_PAYLOAD_SIZE_EXCEEDED_CODE, + message: + 'client: payload exceeded maximum payload size size=1241 max=1024', + }, + }); + expect(mockHandler).not.toHaveBeenCalled(); + }); + + test('stream message exceeds max payload size', async () => { + let handlerCanceled: Promise | undefined; + const services = { + service: ServiceSchema.define({ + echo: Procedure.stream({ + requestInit: Type.String(), + requestData: Type.String(), + responseData: Type.String(), + responseError: Type.Object({ + code: Type.Literal('ERROR'), + message: Type.String(), + }), + handler: async ({ ctx, reqInit, reqReadable, resWritable }) => { + handlerCanceled = new Promise((resolve) => { + ctx.signal.onabort = () => resolve(null); + }); + + resWritable.write(Ok(reqInit)); + for await (const msg of reqReadable) { + if (msg.ok) { + resWritable.write(Ok(msg.payload)); + } else { + resWritable.write( + Err({ + code: 'ERROR', + message: 'error reading from client', + }), + ); + break; + } + } + }, + }), + }), + }; + createServer(mockTransportNetwork.getServerTransport(), services); + const transport = mockTransportNetwork.getClientTransport('client'); + const client = createClient(transport, 'SERVER'); + + const stream = client.service.echo.stream('start'); + let result = await readNextResult(stream.resReadable); + expect(result).toStrictEqual({ ok: true, payload: 'start' }); + + let error; + try { + stream.reqWritable.write('0'.repeat(1025)); + } catch (e) { + error = e; + } + expect(error).toBeInstanceOf(MaxPayloadSizeExceeded); + + result = await readNextResult(stream.resReadable); + expect(result).toStrictEqual({ + ok: false, + payload: { + code: MAX_PAYLOAD_SIZE_EXCEEDED_CODE, + message: + 'client: payload exceeded maximum payload size size=1148 max=1024', + }, + }); + assert(handlerCanceled); + await handlerCanceled; + }); +}); + +describe('server exceeded max payload size', () => { + let mockTransportNetwork: ReturnType; + + beforeEach(async () => { + mockTransportNetwork = createMockTransportNetwork({ + server: { maxPayloadSizeBytes: 1024 }, + }); + }); + + afterEach(async () => { + await mockTransportNetwork.cleanup(); + }); + + test('rpc response exceeds max payload size', async () => { + const services = { + service: ServiceSchema.define({ + echo: Procedure.rpc({ + requestInit: Type.String(), + responseData: Type.String(), + handler: async ({ reqInit }) => { + return Ok(reqInit); + }, + }), + }), + }; + createServer(mockTransportNetwork.getServerTransport(), services); + const client = createClient( + mockTransportNetwork.getClientTransport('client'), + 'SERVER', + ); + + const result = await client.service.echo.rpc('0'.repeat(1025)); + expect(result).toStrictEqual({ + ok: false, + payload: { + code: MAX_PAYLOAD_SIZE_EXCEEDED_CODE, + message: + 'server: payload exceeded maximum payload size size=1170 max=1024', + }, + }); + }); + + test('stream message exceeds max payload size', async () => { + const services = { + service: ServiceSchema.define({ + echo: Procedure.subscription({ + requestInit: Type.Object({}), + responseData: Type.String(), + handler: async ({ resWritable }) => { + resWritable.write(Ok('0'.repeat(1025))); + }, + }), + }), + }; + createServer(mockTransportNetwork.getServerTransport(), services); + const client = createClient( + mockTransportNetwork.getClientTransport('client'), + 'SERVER', + ); + + const stream = client.service.echo.subscribe({}); + const result = await readNextResult(stream.resReadable); + expect(result).toStrictEqual({ + ok: false, + payload: { + code: MAX_PAYLOAD_SIZE_EXCEEDED_CODE, + message: + 'server: payload exceeded maximum payload size size=1170 max=1024', + }, + }); + expect(stream.resReadable.isReadable()).toEqual(false); + }); +}); diff --git a/router/client.ts b/router/client.ts index 0c318b68..6773d70a 100644 --- a/router/client.ts +++ b/router/client.ts @@ -36,7 +36,9 @@ import { CANCEL_CODE, ReaderErrorSchema, UNEXPECTED_DISCONNECT_CODE, + MAX_PAYLOAD_SIZE_EXCEEDED_CODE, } from './errors'; +import { MaxPayloadSizeExceeded } from '../transport/sessionStateMachine/common'; const ReaderErrResultSchema = ErrResultSchema(ReaderErrorSchema); @@ -297,11 +299,42 @@ function handleProc( let cleanClose = true; const reqWritable = new WritableImpl>({ writeCb: (rawIn) => { - sessionScopedSend({ - streamId, - payload: rawIn, - controlFlags: 0, - }); + try { + sessionScopedSend({ + streamId, + payload: rawIn, + controlFlags: 0, + }); + } catch (e) { + if (!(e instanceof MaxPayloadSizeExceeded)) { + throw e; + } + + cleanClose = false; + if (!resReadable.isClosed()) { + resReadable._pushValue( + Err({ + code: MAX_PAYLOAD_SIZE_EXCEEDED_CODE, + message: `client: ${e.message}`, + }), + ); + closeReadable(); + } + + reqWritable.close(); + // TODO: Is this the right error to send to the server? + sessionScopedSend( + cancelMessage( + streamId, + Err({ + code: CANCEL_CODE, + message: 'cancelled by client', + }), + ), + ); + + throw e; + } }, // close callback closeCb: () => { @@ -480,16 +513,33 @@ function handleProc( transport.addEventListener('message', onMessage); transport.addEventListener('sessionStatus', onSessionStatus); - sessionScopedSend({ - streamId, - serviceName, - procedureName, - tracing: getPropagationContext(ctx), - payload: init, - controlFlags: procClosesWithInit - ? ControlFlags.StreamOpenBit | ControlFlags.StreamClosedBit - : ControlFlags.StreamOpenBit, - }); + try { + sessionScopedSend({ + streamId, + serviceName, + procedureName, + tracing: getPropagationContext(ctx), + payload: init, + controlFlags: procClosesWithInit + ? ControlFlags.StreamOpenBit | ControlFlags.StreamClosedBit + : ControlFlags.StreamOpenBit, + }); + } catch (e) { + if (!(e instanceof MaxPayloadSizeExceeded)) { + throw e; + } + + cleanClose = false; + resReadable._pushValue( + Err({ + code: MAX_PAYLOAD_SIZE_EXCEEDED_CODE, + message: `client: ${e.message}`, + }), + ); + closeReadable(); + + reqWritable.close(); + } if (procClosesWithInit) { reqWritable.close(); diff --git a/router/errors.ts b/router/errors.ts index 623ed771..f6275f5c 100644 --- a/router/errors.ts +++ b/router/errors.ts @@ -27,6 +27,10 @@ export const INVALID_REQUEST_CODE = 'INVALID_REQUEST'; * {@link CANCEL_CODE} is the code used when either server or client cancels the stream. */ export const CANCEL_CODE = 'CANCEL'; +/** + * {@link MAX_PAYLOAD_SIZE_EXCEEDED_CODE} is the code used when a request's payload exceeds the maximum allowed size. + */ +export const MAX_PAYLOAD_SIZE_EXCEEDED_CODE = 'MAX_PAYLOAD_SIZE_EXCEEDED'; type TLiteralString = TLiteral; @@ -72,6 +76,10 @@ export const ReaderErrorSchema = Type.Union([ code: Type.Literal(CANCEL_CODE), message: Type.String(), }), + Type.Object({ + code: Type.Literal(MAX_PAYLOAD_SIZE_EXCEEDED_CODE), + message: Type.String(), + }), ]) satisfies ProcedureErrorSchemaType; /** diff --git a/router/server.ts b/router/server.ts index 31667f26..38380269 100644 --- a/router/server.ts +++ b/router/server.ts @@ -8,6 +8,7 @@ import { INVALID_REQUEST_CODE, BaseErrorSchemaType, ErrResultSchema, + MAX_PAYLOAD_SIZE_EXCEEDED_CODE, } from './errors'; import { AnyService, @@ -42,7 +43,10 @@ import { ServerHandshakeOptions } from './handshake'; import { Connection } from '../transport/connection'; import { ServerTransport } from '../transport/server'; import { ReadableImpl, WritableImpl } from './streams'; -import { IdentifiedSession } from '../transport/sessionStateMachine/common'; +import { + IdentifiedSession, + MaxPayloadSizeExceeded, +} from '../transport/sessionStateMachine/common'; import { SessionBoundSendFn } from '../transport/transport'; type StreamId = string; @@ -521,6 +525,14 @@ class RiverServer span.recordException(err instanceof Error ? err : new Error(errorMsg)); span.setStatus({ code: SpanStatusCode.ERROR }); + if (err instanceof MaxPayloadSizeExceeded) { + onServerCancel({ + code: MAX_PAYLOAD_SIZE_EXCEEDED_CODE, + message: `server: ${err.message}`, + }); + return; + } + this.log?.error( `${serviceName}.${procedureName} handler threw an uncaught error`, { diff --git a/transport/options.ts b/transport/options.ts index 995ebbb1..a191aea0 100644 --- a/transport/options.ts +++ b/transport/options.ts @@ -13,6 +13,7 @@ export const defaultTransportOptions: TransportOptions = { connectionTimeoutMs: 2_000, handshakeTimeoutMs: 1_000, enableTransparentSessionReconnects: true, + maxPayloadSizeBytes: 4 * 1024 * 1024, codec: NaiveJsonCodec, }; diff --git a/transport/sessionStateMachine/common.ts b/transport/sessionStateMachine/common.ts index 06939e2d..594d101a 100644 --- a/transport/sessionStateMachine/common.ts +++ b/transport/sessionStateMachine/common.ts @@ -141,6 +141,15 @@ export interface SessionOptions { * The codec to use for encoding/decoding messages over the wire */ codec: Codec; + /** + * The maximum payload size that is allowed to be sent to the peer. + * This does not enforce max payload side when receiving messages, + * only when sending them. + * + * If the max payload size is exceeded an error with code {@link MAX_PAYLOAD_SIZE_EXCEEDED} + * will be returned and the procedure will be canceled. + */ + maxPayloadSizeBytes: number; } // all session states have a from and options @@ -209,6 +218,12 @@ export interface IdentifiedSessionProps extends CommonSessionProps { protocolVersion: ProtocolVersion; } +export class MaxPayloadSizeExceeded extends Error { + constructor(size: number, max: number) { + super(`payload exceeded maximum payload size size=${size} max=${max}`); + } +} + export abstract class IdentifiedSession extends CommonSession { readonly id: SessionId; readonly telemetry: TelemetryInfo; @@ -276,6 +291,13 @@ export abstract class IdentifiedSession extends CommonSession { data: this.options.codec.toBuffer(msg), }; + if (encodedMsg.data.byteLength > this.options.maxPayloadSizeBytes) { + throw new MaxPayloadSizeExceeded( + encodedMsg.data.byteLength, + this.options.maxPayloadSizeBytes, + ); + } + this.seq++; return encodedMsg;