From c3fda66766caed2276d7b8596f8a9c8ae25706a5 Mon Sep 17 00:00:00 2001 From: Marcus Pousette Date: Mon, 15 May 2023 11:55:40 +0200 Subject: [PATCH] feat: restrict message size to 16kb --- package.json | 1 + src/stream.ts | 25 +++++++++++++++-- test/stream.spec.ts | 66 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 test/stream.spec.ts diff --git a/package.json b/package.json index a95a1b3..df2e476 100644 --- a/package.json +++ b/package.json @@ -158,6 +158,7 @@ "multiformats": "^11.0.2", "multihashes": "^4.0.3", "p-defer": "^4.0.0", + "p-event": "^5.0.1", "protons-runtime": "^5.0.0", "uint8arraylist": "^2.4.3", "uint8arrays": "^4.0.3" diff --git a/src/stream.ts b/src/stream.ts index cf35eed..701e2fe 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -4,6 +4,7 @@ import merge from 'it-merge' import { pipe } from 'it-pipe' import { pushable } from 'it-pushable' import defer, { type DeferredPromise } from 'p-defer' +import { pEvent } from 'p-event' import { Uint8ArrayList } from 'uint8arraylist' import { Message } from './pb/message.js' import type { Stream, StreamStat, Direction } from '@libp2p/interface-connection' @@ -151,6 +152,15 @@ class StreamState { } } +// Max message size that can be sent to the DataChannel +const MAX_MESSAGE_SIZE = 16 * 1024 + +// How much can be buffered to the DataChannel at once +const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024 + +// How long time we wait for the 'bufferedamountlow' event to be emitted +const BUFFERED_AMOUNT_LOW_TIMEOUT = 30 * 1000 + export class WebRTCStream implements Stream { /** * Unique identifier for a stream @@ -214,7 +224,6 @@ export class WebRTCStream implements Stream { this.channel = opts.channel this.channel.binaryType = 'arraybuffer' this.id = this.channel.label - this.stat = opts.stat switch (this.channel.readyState) { case 'open': @@ -313,10 +322,22 @@ export class WebRTCStream implements Stream { if (this.streamState.isWriteClosed()) { return } + + if (this.channel.bufferedAmount > MAX_BUFFERED_AMOUNT) { + await pEvent(this.channel, 'bufferedamountlow', { timeout: BUFFERED_AMOUNT_LOW_TIMEOUT }) + } + const msgbuf = Message.encode({ message: buf.subarray() }) const sendbuf = lengthPrefixed.encode.single(msgbuf) - this.channel.send(sendbuf.subarray()) + while (sendbuf.length > 0) { + if (sendbuf.length <= MAX_MESSAGE_SIZE) { + this.channel.send(sendbuf.subarray()) + break + } + this.channel.send(sendbuf.subarray(0, MAX_MESSAGE_SIZE)) + sendbuf.consume(MAX_MESSAGE_SIZE) + } } } diff --git a/test/stream.spec.ts b/test/stream.spec.ts new file mode 100644 index 0000000..b133a06 --- /dev/null +++ b/test/stream.spec.ts @@ -0,0 +1,66 @@ + +import { expect } from 'aegir/chai' +import * as lengthPrefixed from 'it-length-prefixed' +import { pushable } from 'it-pushable' +import { Message } from '../src/pb/message' +import * as underTest from '../src/stream' + +const setup = (cb: { send: (bytes: Uint8Array) => void }): underTest.WebRTCStream => { + const datachannel = { + readyState: 'open', + send: cb.send, + close: () => {} + } + return new underTest.WebRTCStream({ channel: datachannel as RTCDataChannel, stat: underTest.defaultStat('outbound') }) +} + +const MAX_MESSAGE_SIZE = 16 * 1024 + +describe('Max message size', () => { + it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => { + const sent: Uint8Array[] = [] + const data = new Uint8Array(MAX_MESSAGE_SIZE - 5) + const p = pushable() + + // Make sure that the data that ought to be sent will result in a message with exactl MAX_MESSAGE_SIZE + const messageLengthEncoded = lengthPrefixed.encode.single(Message.encode({ message: data })).subarray() + expect(messageLengthEncoded.length).eq(MAX_MESSAGE_SIZE) + const webrtcStream = setup({ + send: (bytes) => { + sent.push(bytes) + if (p.readableLength === 0) { + webrtcStream.close() + } + } + }) + p.push(data) + p.end() + await webrtcStream.sink(p) + expect(sent).to.deep.equals([messageLengthEncoded]) + }) + + it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => { + const sent: Uint8Array[] = [] + const data = new Uint8Array(MAX_MESSAGE_SIZE - 4) + const p = pushable() + + // Make sure that the data that ought to be sent will result in a message with exactl MAX_MESSAGE_SIZE + const messageLengthEncoded = lengthPrefixed.encode.single(Message.encode({ message: data })).subarray() + expect(messageLengthEncoded.length).eq(MAX_MESSAGE_SIZE + 1) + + const webrtcStream = setup({ + send: (bytes) => { + sent.push(bytes) + if (p.readableLength === 0) { + webrtcStream.close() + } + } + }) + p.push(data) + p.end() + await webrtcStream.sink(p) + + // Message is sent in two parts + expect(sent).to.deep.equals([messageLengthEncoded.subarray(0, messageLengthEncoded.length - 1), messageLengthEncoded.subarray(messageLengthEncoded.length - 1)]) + }) +})