From da79c934e289539672bea614ec22420a87990b30 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 24 Nov 2022 11:01:09 +0000 Subject: [PATCH 1/4] fix: limit unprocessed message queue size separately to message size It's possible to receive lots of small messages in one buffer that can be larger than the max message size, so limit the unprocessed message queue size separately from the max message size. --- README.md | 3 ++- src/decode.ts | 20 ++++++++++++++++---- src/index.ts | 12 +++++++++++- test/restrict-size.spec.ts | 3 ++- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index beaeaa6..ebe2db8 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,8 @@ Creates a factory that can be used to create new muxers. `options` is an optional `Object` that may have the following properties: -- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB) +- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 4194304 - e.g. 1MB) +- `maxMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: ) - `maxInboundStreams` - a number that defines how many incoming streams are allowed per connection (default: 1024) - `maxOutboundStreams` - a number that defines how many outgoing streams are allowed per connection (default: 1024) - `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 \* 1024 \* 4 - e.g. 4MB) diff --git a/src/decode.ts b/src/decode.ts index 6c4e5ab..4e1e754 100644 --- a/src/decode.ts +++ b/src/decode.ts @@ -4,6 +4,7 @@ import type { Source } from 'it-stream-types' import type { Message } from './message-types.js' export const MAX_MSG_SIZE = 1 << 20 // 1MB +export const MAX_MSG_QUEUE_SIZE = 4 << 20 // 4MB interface MessageHeader { id: number @@ -16,11 +17,13 @@ class Decoder { private readonly _buffer: Uint8ArrayList private _headerInfo: MessageHeader | null private readonly _maxMessageSize: number + private readonly _maxMessageQueueSize: number - constructor (maxMessageSize: number = MAX_MSG_SIZE) { + constructor (maxMessageSize: number = MAX_MSG_SIZE, maxMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) { this._buffer = new Uint8ArrayList() this._headerInfo = null this._maxMessageSize = maxMessageSize + this._maxMessageQueueSize = maxMessageQueueSize } write (chunk: Uint8Array) { @@ -30,8 +33,8 @@ class Decoder { this._buffer.append(chunk) - if (this._buffer.byteLength > this._maxMessageSize) { - throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' }) + if (this._buffer.byteLength > this._maxMessageQueueSize) { + throw Object.assign(new Error('message queue size too large!'), { code: 'ERR_MSG_QUEUE_TOO_BIG' }) } const msgs: Message[] = [] @@ -40,7 +43,11 @@ class Decoder { if (this._headerInfo == null) { try { this._headerInfo = this._decodeHeader(this._buffer) - } catch (_) { + } catch (err: any) { + if (err.code === 'ERR_MSG_TOO_BIG') { + throw err + } + break // We haven't received enough data yet } } @@ -90,6 +97,11 @@ class Decoder { throw new Error(`Invalid type received: ${type}`) } + // test message type varint + data length + if (length > this._maxMessageSize) { + throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' }) + } + // @ts-expect-error h is a number not a CODE return { id: h >> 3, type, offset: offset + end, length } } diff --git a/src/index.ts b/src/index.ts index 2d7fa7d..dba44a8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,10 +5,20 @@ export interface MplexInit { /** * The maximum size of message that can be sent in one go in bytes. * Messages larger than this will be split into multiple smaller - * messages (default: 1MB) + * messages. If we receive a message larger than this an error will + * be thrown and the connection closed. (default: 1MB) */ maxMsgSize?: number + /** + * Constrains the size of the unprocessed message queue buffer. + * Before messages are deserialized, the raw bytes are buffered to ensure + * we have the complete message to deserialized. If the queue gets longer + * than this value an error will be thrown and the connection closed. + * (default: 4MB) + */ + maxMessageQueueSize?: number + /** * The maximum number of multiplexed streams that can be open at any * one time. A request to open more than this will have a stream diff --git a/test/restrict-size.spec.ts b/test/restrict-size.spec.ts index aacb620..23a9e7e 100644 --- a/test/restrict-size.spec.ts +++ b/test/restrict-size.spec.ts @@ -36,9 +36,10 @@ describe('restrict size', () => { ) } catch (err: any) { expect(err).to.have.property('code', 'ERR_MSG_TOO_BIG') - expect(output).to.have.length(2) + expect(output).to.have.length(3) expect(output[0]).to.deep.equal(input[0]) expect(output[1]).to.deep.equal(input[1]) + expect(output[2]).to.deep.equal(input[2]) return } throw new Error('did not restrict size') From 9dc007e2dbd9edff82102fa54c3f1501598ed8f4 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 24 Nov 2022 11:09:32 +0000 Subject: [PATCH 2/4] chore: add tests --- README.md | 2 +- package.json | 1 + src/decode.ts | 14 ++++----- src/index.ts | 2 +- test/restrict-size.spec.ts | 62 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 72 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index ebe2db8..824f89c 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ Creates a factory that can be used to create new muxers. `options` is an optional `Object` that may have the following properties: - `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 4194304 - e.g. 1MB) -- `maxMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: ) +- `maxUnprocessedMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: ) - `maxInboundStreams` - a number that defines how many incoming streams are allowed per connection (default: 1024) - `maxOutboundStreams` - a number that defines how many outgoing streams are allowed per connection (default: 1024) - `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 \* 1024 \* 4 - e.g. 4MB) diff --git a/package.json b/package.json index 9536d55..505f8e4 100644 --- a/package.json +++ b/package.json @@ -172,6 +172,7 @@ "it-drain": "^2.0.0", "it-foreach": "^1.0.0", "it-map": "^2.0.0", + "it-to-buffer": "^3.0.0", "p-defer": "^4.0.0", "random-int": "^3.0.0", "typescript": "^4.7.4" diff --git a/src/decode.ts b/src/decode.ts index 4e1e754..3ef8a5d 100644 --- a/src/decode.ts +++ b/src/decode.ts @@ -17,13 +17,13 @@ class Decoder { private readonly _buffer: Uint8ArrayList private _headerInfo: MessageHeader | null private readonly _maxMessageSize: number - private readonly _maxMessageQueueSize: number + private readonly _maxUnprocessedMessageQueueSize: number - constructor (maxMessageSize: number = MAX_MSG_SIZE, maxMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) { + constructor (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) { this._buffer = new Uint8ArrayList() this._headerInfo = null this._maxMessageSize = maxMessageSize - this._maxMessageQueueSize = maxMessageQueueSize + this._maxUnprocessedMessageQueueSize = maxUnprocessedMessageQueueSize } write (chunk: Uint8Array) { @@ -33,8 +33,8 @@ class Decoder { this._buffer.append(chunk) - if (this._buffer.byteLength > this._maxMessageQueueSize) { - throw Object.assign(new Error('message queue size too large!'), { code: 'ERR_MSG_QUEUE_TOO_BIG' }) + if (this._buffer.byteLength > this._maxUnprocessedMessageQueueSize) { + throw Object.assign(new Error('unprocessed message queue size too large!'), { code: 'ERR_MSG_QUEUE_TOO_BIG' }) } const msgs: Message[] = [] @@ -140,9 +140,9 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) { /** * Decode a chunk and yield an _array_ of decoded messages */ -export function decode (maxMessageSize: number = MAX_MSG_SIZE) { +export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) { return async function * decodeMessages (source: Source): Source { - const decoder = new Decoder(maxMessageSize) + const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize) for await (const chunk of source) { const msgs = decoder.write(chunk) diff --git a/src/index.ts b/src/index.ts index dba44a8..da98e48 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,7 +17,7 @@ export interface MplexInit { * than this value an error will be thrown and the connection closed. * (default: 4MB) */ - maxMessageQueueSize?: number + maxUnprocessedMessageQueueSize?: number /** * The maximum number of multiplexed streams that can be open at any diff --git a/test/restrict-size.spec.ts b/test/restrict-size.spec.ts index 23a9e7e..2067e98 100644 --- a/test/restrict-size.spec.ts +++ b/test/restrict-size.spec.ts @@ -10,6 +10,7 @@ import { Message, MessageTypes } from '../src/message-types.js' import { encode } from '../src/encode.js' import { decode } from '../src/decode.js' import { Uint8ArrayList } from 'uint8arraylist' +import toBuffer from 'it-to-buffer' describe('restrict size', () => { it('should throw when size is too big', async () => { @@ -60,4 +61,65 @@ describe('restrict size', () => { ) expect(output).to.deep.equal(input) }) + + it('should throw when unprocessed message queue size is too big', async () => { + const maxMessageSize = 32 + const maxUnprocessedMessageQueueSize = 64 + + const input: Message[] = [ + { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }, + { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }, + { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }, + { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }, + { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }, + { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }, + { id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) } + ] + + const output: Message[] = [] + + try { + await pipe( + input, + encode, + async function * (source) { + // make one big buffer + yield toBuffer(source) + }, + decode(maxMessageSize, maxUnprocessedMessageQueueSize), + (source) => each(source, chunk => { + output.push(chunk) + }), + async (source) => await drain(source) + ) + } catch (err: any) { + expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG') + expect(output).to.have.length(0) + return + } + throw new Error('did not restrict size') + }) + + it('should throw when unprocessed message queue size is too big because of garbage', async () => { + const maxMessageSize = 32 + const maxUnprocessedMessageQueueSize = 64 + const input = randomBytes(maxUnprocessedMessageQueueSize + 1) + const output: Message[] = [] + + try { + await pipe( + [input], + decode(maxMessageSize, maxUnprocessedMessageQueueSize), + (source) => each(source, chunk => { + output.push(chunk) + }), + async (source) => await drain(source) + ) + } catch (err: any) { + expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG') + expect(output).to.have.length(0) + return + } + throw new Error('did not restrict size') + }) }) From 309f80f2a9d13e8155f82ee87a2e656e47effd69 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 24 Nov 2022 11:49:27 +0000 Subject: [PATCH 3/4] chore: pass option to decoder --- src/mplex.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mplex.ts b/src/mplex.ts index 1e79cce..b80b187 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -203,7 +203,7 @@ export class MplexStreamMuxer implements StreamMuxer { try { await pipe( source, - decode(this._init.maxMsgSize), + decode(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize), async source => { for await (const msg of source) { await this._handleIncoming(msg) From f915b61910b051164979ae8036f98500b5619399 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 24 Nov 2022 13:31:21 +0000 Subject: [PATCH 4/4] chore: PR comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Marin Petrunić --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 824f89c..2a66b60 100644 --- a/README.md +++ b/README.md @@ -69,8 +69,8 @@ Creates a factory that can be used to create new muxers. `options` is an optional `Object` that may have the following properties: -- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 4194304 - e.g. 1MB) -- `maxUnprocessedMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: ) +- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB) +- `maxUnprocessedMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: 4194304 - e.g. 4MB) - `maxInboundStreams` - a number that defines how many incoming streams are allowed per connection (default: 1024) - `maxOutboundStreams` - a number that defines how many outgoing streams are allowed per connection (default: 1024) - `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 \* 1024 \* 4 - e.g. 4MB)