Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: limit unprocessed message queue size separately to message size #234

Merged
merged 4 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ Creates a factory that can be used to create new muxers.

`options` is an optional `Object` that may have the following properties:

- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB)
- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 4194304 - e.g. 1MB)
- `maxUnprocessedMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: )
- `maxInboundStreams` - a number that defines how many incoming streams are allowed per connection (default: 1024)
- `maxOutboundStreams` - a number that defines how many outgoing streams are allowed per connection (default: 1024)
- `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 \* 1024 \* 4 - e.g. 4MB)
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
"it-drain": "^2.0.0",
"it-foreach": "^1.0.0",
"it-map": "^2.0.0",
"it-to-buffer": "^3.0.0",
"p-defer": "^4.0.0",
"random-int": "^3.0.0",
"typescript": "^4.7.4"
Expand Down
24 changes: 18 additions & 6 deletions src/decode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Source } from 'it-stream-types'
import type { Message } from './message-types.js'

export const MAX_MSG_SIZE = 1 << 20 // 1MB
export const MAX_MSG_QUEUE_SIZE = 4 << 20 // 4MB

interface MessageHeader {
id: number
Expand All @@ -16,11 +17,13 @@ class Decoder {
private readonly _buffer: Uint8ArrayList
private _headerInfo: MessageHeader | null
private readonly _maxMessageSize: number
private readonly _maxUnprocessedMessageQueueSize: number

constructor (maxMessageSize: number = MAX_MSG_SIZE) {
constructor (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
this._buffer = new Uint8ArrayList()
this._headerInfo = null
this._maxMessageSize = maxMessageSize
this._maxUnprocessedMessageQueueSize = maxUnprocessedMessageQueueSize
}

write (chunk: Uint8Array) {
Expand All @@ -30,8 +33,8 @@ class Decoder {

this._buffer.append(chunk)

if (this._buffer.byteLength > this._maxMessageSize) {
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
if (this._buffer.byteLength > this._maxUnprocessedMessageQueueSize) {
throw Object.assign(new Error('unprocessed message queue size too large!'), { code: 'ERR_MSG_QUEUE_TOO_BIG' })
}

const msgs: Message[] = []
Expand All @@ -40,7 +43,11 @@ class Decoder {
if (this._headerInfo == null) {
try {
this._headerInfo = this._decodeHeader(this._buffer)
} catch (_) {
} catch (err: any) {
if (err.code === 'ERR_MSG_TOO_BIG') {
throw err
}

break // We haven't received enough data yet
}
}
Expand Down Expand Up @@ -90,6 +97,11 @@ class Decoder {
throw new Error(`Invalid type received: ${type}`)
}

// test message type varint + data length
if (length > this._maxMessageSize) {
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
}

// @ts-expect-error h is a number not a CODE
return { id: h >> 3, type, offset: offset + end, length }
}
Expand Down Expand Up @@ -128,9 +140,9 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
/**
* Decode a chunk and yield an _array_ of decoded messages
*/
export function decode (maxMessageSize: number = MAX_MSG_SIZE) {
export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
const decoder = new Decoder(maxMessageSize)
const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize)

for await (const chunk of source) {
const msgs = decoder.write(chunk)
Expand Down
12 changes: 11 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,20 @@ export interface MplexInit {
/**
* The maximum size of message that can be sent in one go in bytes.
* Messages larger than this will be split into multiple smaller
* messages (default: 1MB)
* messages. If we receive a message larger than this an error will
* be thrown and the connection closed. (default: 1MB)
*/
maxMsgSize?: number

/**
* Constrains the size of the unprocessed message queue buffer.
* Before messages are deserialized, the raw bytes are buffered to ensure
* we have the complete message to deserialized. If the queue gets longer
* than this value an error will be thrown and the connection closed.
* (default: 4MB)
*/
maxUnprocessedMessageQueueSize?: number

/**
* The maximum number of multiplexed streams that can be open at any
* one time. A request to open more than this will have a stream
Expand Down
2 changes: 1 addition & 1 deletion src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ export class MplexStreamMuxer implements StreamMuxer {
try {
await pipe(
source,
decode(this._init.maxMsgSize),
decode(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize),
async source => {
for await (const msg of source) {
await this._handleIncoming(msg)
Expand Down
65 changes: 64 additions & 1 deletion test/restrict-size.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { Message, MessageTypes } from '../src/message-types.js'
import { encode } from '../src/encode.js'
import { decode } from '../src/decode.js'
import { Uint8ArrayList } from 'uint8arraylist'
import toBuffer from 'it-to-buffer'

describe('restrict size', () => {
it('should throw when size is too big', async () => {
Expand All @@ -36,9 +37,10 @@ describe('restrict size', () => {
)
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_TOO_BIG')
expect(output).to.have.length(2)
expect(output).to.have.length(3)
expect(output[0]).to.deep.equal(input[0])
expect(output[1]).to.deep.equal(input[1])
expect(output[2]).to.deep.equal(input[2])
return
}
throw new Error('did not restrict size')
Expand All @@ -59,4 +61,65 @@ describe('restrict size', () => {
)
expect(output).to.deep.equal(input)
})

it('should throw when unprocessed message queue size is too big', async () => {
const maxMessageSize = 32
const maxUnprocessedMessageQueueSize = 64

const input: Message[] = [
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }
]

const output: Message[] = []

try {
await pipe(
input,
encode,
async function * (source) {
// make one big buffer
yield toBuffer(source)
},
decode(maxMessageSize, maxUnprocessedMessageQueueSize),
(source) => each(source, chunk => {
output.push(chunk)
}),
async (source) => await drain(source)
)
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG')
expect(output).to.have.length(0)
return
}
throw new Error('did not restrict size')
})

it('should throw when unprocessed message queue size is too big because of garbage', async () => {
const maxMessageSize = 32
const maxUnprocessedMessageQueueSize = 64
const input = randomBytes(maxUnprocessedMessageQueueSize + 1)
const output: Message[] = []

try {
await pipe(
[input],
decode(maxMessageSize, maxUnprocessedMessageQueueSize),
(source) => each(source, chunk => {
output.push(chunk)
}),
async (source) => await drain(source)
)
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG')
expect(output).to.have.length(0)
return
}
throw new Error('did not restrict size')
})
})