Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
feat: restrict message sizes to 16kb
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed May 10, 2023
1 parent 4f1840e commit f15207e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 5 deletions.
79 changes: 74 additions & 5 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,70 @@ class StreamState {
}
}

interface MessageProcessor {
send: (bytes: Uint8ArrayList) => void
close?: () => void
}

const MAX_MESSAGE_SIZE = 16 * 1024
const createMessageProcessor = (channel: RTCDataChannel, maxMsgSize: number = MAX_MESSAGE_SIZE): MessageProcessor => {
if (maxMsgSize != null) {
/**
* Don't allow channel.bufferedAmount to exceed maxMsgSize
*/
let sendPaused: boolean = false
let sendMessageQueue: Uint8Array[] = []
const processMessageQueue = (): void => {
sendPaused = false
let message = sendMessageQueue.shift()
while (message != null) {
if (channel.bufferedAmount > maxMsgSize) {
sendPaused = true
sendMessageQueue.unshift(message)

const listener = (): void => {
channel.removeEventListener('bufferedamountlow', listener)
processMessageQueue()
}

channel.addEventListener('bufferedamountlow', listener)
return
}
channel.send(message)
message = sendMessageQueue.shift()
}
}

return {
send: (sendbuf: Uint8ArrayList) => {
/**
* Don't allow individual messages to exceed maxMsgSize
*/
let from = 0
let to = Math.min(sendbuf.length, maxMsgSize)
while (to !== from) {
sendMessageQueue.push(sendbuf.subarray(from, to))
from = to
to = Math.min(to + maxMsgSize, sendbuf.length)
}
if (sendPaused) {
return
}

processMessageQueue()
},

close: () => {
sendMessageQueue = []
}
}
} else {
return {
send: (sendbuf: Uint8ArrayList) => { channel.send(sendbuf.subarray()) }
}
}
}

export class WebRTCStream implements Stream {
/**
* Unique identifier for a stream
Expand Down Expand Up @@ -210,11 +274,15 @@ export class WebRTCStream implements Stream {
*/
closeCb?: (stream: WebRTCStream) => void

/**
* Processor for messages that allows throttling if necessary
*/
messageProcessor: MessageProcessor

constructor (opts: StreamInitOpts) {
this.channel = opts.channel
this.channel.binaryType = 'arraybuffer'
this.id = this.channel.label

this.stat = opts.stat
switch (this.channel.readyState) {
case 'open':
Expand Down Expand Up @@ -254,6 +322,8 @@ export class WebRTCStream implements Stream {
this.abort(err)
}

this.messageProcessor = createMessageProcessor(this.channel)

const self = this

// reader pipe
Expand Down Expand Up @@ -311,12 +381,11 @@ export class WebRTCStream implements Stream {
const closeWrite = this._closeWriteIterable()
for await (const buf of merge(closeWrite, src)) {
if (this.streamState.isWriteClosed()) {
this.messageProcessor.close?.()
return
}
const msgbuf = pb.Message.toBinary({ message: buf.subarray() })
const sendbuf = lengthPrefixed.encode.single(msgbuf)

this.channel.send(sendbuf.subarray())
this.messageProcessor.send(lengthPrefixed.encode.single(msgbuf))
}
}

Expand Down Expand Up @@ -435,7 +504,7 @@ export class WebRTCStream implements Stream {
try {
log.trace('Sending flag: %s', flag.toString())
const msgbuf = pb.Message.toBinary({ flag })
this.channel.send(lengthPrefixed.encode.single(msgbuf).subarray())
this.messageProcessor.send(lengthPrefixed.encode.single(msgbuf))
} catch (err) {
if (err instanceof Error) {
log.error(`Exception while sending flag ${flag}: ${err.message}`)
Expand Down
30 changes: 30 additions & 0 deletions test/stream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

import { expect } from 'aegir/chai'
import { Uint8ArrayList } from 'uint8arraylist'
import * as underTest from '../src/stream'

const setup = (cb: { send: (bytes: Uint8Array) => void }): underTest.WebRTCStream => {
const datachannel = {
readyState: 'open',
send: cb.send

}
return new underTest.WebRTCStream({ channel: datachannel as RTCDataChannel, stat: underTest.defaultStat('outbound') })
}

const MAX_MESSAGE_SIZE = 16 * 1024
describe('MessageProcessor', () => {
it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, () => {
const sent: Uint8Array[] = []
const webrtcStream = setup({ send: (bytes) => sent.push(bytes) })
webrtcStream.messageProcessor.send(new Uint8ArrayList(new Uint8Array(MAX_MESSAGE_SIZE)))
expect(sent).to.deep.equals([new Uint8Array(MAX_MESSAGE_SIZE)])
})

it(`sends messages large than ${MAX_MESSAGE_SIZE} bytes in parts`, () => {
const sent: Uint8Array[] = []
const webrtcStream = setup({ send: (bytes) => sent.push(bytes) })
webrtcStream.messageProcessor.send(new Uint8ArrayList(new Uint8Array(MAX_MESSAGE_SIZE + 1)))
expect(sent).to.deep.equals([new Uint8Array(MAX_MESSAGE_SIZE), new Uint8Array(1)])
})
})

0 comments on commit f15207e

Please sign in to comment.