Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
feat: restrict message size to 16kb
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed May 15, 2023
1 parent f0f4e7c commit c3fda66
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 2 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
"multiformats": "^11.0.2",
"multihashes": "^4.0.3",
"p-defer": "^4.0.0",
"p-event": "^5.0.1",
"protons-runtime": "^5.0.0",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.3"
Expand Down
25 changes: 23 additions & 2 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import merge from 'it-merge'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
import defer, { type DeferredPromise } from 'p-defer'
import { pEvent } from 'p-event'
import { Uint8ArrayList } from 'uint8arraylist'
import { Message } from './pb/message.js'
import type { Stream, StreamStat, Direction } from '@libp2p/interface-connection'
Expand Down Expand Up @@ -151,6 +152,15 @@ class StreamState {
}
}

// Max message size that can be sent to the DataChannel
const MAX_MESSAGE_SIZE = 16 * 1024

// How much can be buffered to the DataChannel at once
const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024

// How long time we wait for the 'bufferedamountlow' event to be emitted
const BUFFERED_AMOUNT_LOW_TIMEOUT = 30 * 1000

export class WebRTCStream implements Stream {
/**
* Unique identifier for a stream
Expand Down Expand Up @@ -214,7 +224,6 @@ export class WebRTCStream implements Stream {
this.channel = opts.channel
this.channel.binaryType = 'arraybuffer'
this.id = this.channel.label

this.stat = opts.stat
switch (this.channel.readyState) {
case 'open':
Expand Down Expand Up @@ -313,10 +322,22 @@ export class WebRTCStream implements Stream {
if (this.streamState.isWriteClosed()) {
return
}

if (this.channel.bufferedAmount > MAX_BUFFERED_AMOUNT) {
await pEvent(this.channel, 'bufferedamountlow', { timeout: BUFFERED_AMOUNT_LOW_TIMEOUT })
}

const msgbuf = Message.encode({ message: buf.subarray() })
const sendbuf = lengthPrefixed.encode.single(msgbuf)

this.channel.send(sendbuf.subarray())
while (sendbuf.length > 0) {
if (sendbuf.length <= MAX_MESSAGE_SIZE) {
this.channel.send(sendbuf.subarray())
break
}
this.channel.send(sendbuf.subarray(0, MAX_MESSAGE_SIZE))
sendbuf.consume(MAX_MESSAGE_SIZE)
}
}
}

Expand Down
66 changes: 66 additions & 0 deletions test/stream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@

import { expect } from 'aegir/chai'
import * as lengthPrefixed from 'it-length-prefixed'
import { pushable } from 'it-pushable'
import { Message } from '../src/pb/message'
import * as underTest from '../src/stream'

const setup = (cb: { send: (bytes: Uint8Array) => void }): underTest.WebRTCStream => {
const datachannel = {
readyState: 'open',
send: cb.send,
close: () => {}
}
return new underTest.WebRTCStream({ channel: datachannel as RTCDataChannel, stat: underTest.defaultStat('outbound') })
}

const MAX_MESSAGE_SIZE = 16 * 1024

describe('Max message size', () => {
it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => {
const sent: Uint8Array[] = []
const data = new Uint8Array(MAX_MESSAGE_SIZE - 5)
const p = pushable()

// Make sure that the data that ought to be sent will result in a message with exactl MAX_MESSAGE_SIZE
const messageLengthEncoded = lengthPrefixed.encode.single(Message.encode({ message: data })).subarray()
expect(messageLengthEncoded.length).eq(MAX_MESSAGE_SIZE)
const webrtcStream = setup({
send: (bytes) => {
sent.push(bytes)
if (p.readableLength === 0) {
webrtcStream.close()
}
}
})
p.push(data)
p.end()
await webrtcStream.sink(p)
expect(sent).to.deep.equals([messageLengthEncoded])
})

it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => {
const sent: Uint8Array[] = []
const data = new Uint8Array(MAX_MESSAGE_SIZE - 4)
const p = pushable()

// Make sure that the data that ought to be sent will result in a message with exactl MAX_MESSAGE_SIZE
const messageLengthEncoded = lengthPrefixed.encode.single(Message.encode({ message: data })).subarray()
expect(messageLengthEncoded.length).eq(MAX_MESSAGE_SIZE + 1)

const webrtcStream = setup({
send: (bytes) => {
sent.push(bytes)
if (p.readableLength === 0) {
webrtcStream.close()
}
}
})
p.push(data)
p.end()
await webrtcStream.sink(p)

// Message is sent in two parts
expect(sent).to.deep.equals([messageLengthEncoded.subarray(0, messageLengthEncoded.length - 1), messageLengthEncoded.subarray(messageLengthEncoded.length - 1)])
})
})

0 comments on commit c3fda66

Please sign in to comment.