Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
fix: restrict message sizes to 16kb (#147)
Browse files Browse the repository at this point in the history
If the datachannel buffer grows to larger than 256kb it will close if you are using Chrome, so if it grows too large, wait for the `bufferedamountlow` event before continuing to send data.

Also split data into 16kb while sending to ensure maximum cross browser compatibility.

Fixes #144 
Fixes #158

---------

Co-authored-by: Alex Potsides <alex@achingbrain.net>
  • Loading branch information
marcus-pousette and achingbrain authored May 17, 2023
1 parent efe5ce7 commit aca4422
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 48 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
"multiformats": "^11.0.2",
"multihashes": "^4.0.3",
"p-defer": "^4.0.0",
"p-event": "^5.0.1",
"protons-runtime": "^5.0.0",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.3"
Expand Down
25 changes: 21 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
import { WebRTCTransport } from './private-to-private/transport.js'
import { WebRTCDirectTransport, type WebRTCDirectTransportComponents } from './private-to-public/transport.js'
import { WebRTCDirectTransport, type WebRTCTransportDirectInit, type WebRTCDirectTransportComponents } from './private-to-public/transport.js'
import type { WebRTCTransportComponents, WebRTCTransportInit } from './private-to-private/transport.js'
import type { Transport } from '@libp2p/interface-transport'

function webRTCDirect (): (components: WebRTCDirectTransportComponents) => Transport {
return (components: WebRTCDirectTransportComponents) => new WebRTCDirectTransport(components)
/**
* @param {WebRTCTransportDirectInit} init - WebRTC direct transport configuration
* @param init.dataChannel - DataChannel configurations
* @param {number} init.dataChannel.maxMessageSize - Max message size that can be sent through the DataChannel. Larger messages will be chunked into smaller messages below this size (default 16kb)
* @param {number} init.dataChannel.maxBufferedAmount - Max buffered amount a DataChannel can have (default 16mb)
* @param {number} init.dataChannel.bufferedAmountLowEventTimeout - If max buffered amount is reached, this is the max time that is waited before the buffer is cleared (default 30 seconds)
* @returns
*/
function webRTCDirect (init?: WebRTCTransportDirectInit): (components: WebRTCDirectTransportComponents) => Transport {
return (components: WebRTCDirectTransportComponents) => new WebRTCDirectTransport(components, init)
}

/**
* @param {WebRTCTransportInit} init - WebRTC transport configuration
* @param {RTCConfiguration} init.rtcConfiguration - RTCConfiguration
* @param init.dataChannel - DataChannel configurations
* @param {number} init.dataChannel.maxMessageSize - Max message size that can be sent through the DataChannel. Larger messages will be chunked into smaller messages below this size (default 16kb)
* @param {number} init.dataChannel.maxBufferedAmount - Max buffered amount a DataChannel can have (default 16mb)
* @param {number} init.dataChannel.bufferedAmountLowEventTimeout - If max buffered amount is reached, this is the max time that is waited before the buffer is cleared (default 30 seconds)
* @returns
*/
function webRTC (init?: WebRTCTransportInit): (components: WebRTCTransportComponents) => Transport {
return (components: WebRTCTransportComponents) => new WebRTCTransport(components, init ?? {})
return (components: WebRTCTransportComponents) => new WebRTCTransport(components, init)
}

export { webRTC, webRTCDirect }
54 changes: 25 additions & 29 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WebRTCStream } from './stream.js'
import { type DataChannelOpts, WebRTCStream } from './stream.js'
import { nopSink, nopSource } from './util.js'
import type { Stream } from '@libp2p/interface-connection'
import type { CounterGroup } from '@libp2p/interface-metrics'
Expand All @@ -7,56 +7,55 @@ import type { Source, Sink } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface DataChannelMuxerFactoryInit {
/**
* WebRTC Peer Connection
*/
peerConnection: RTCPeerConnection

/**
* Optional metrics for this data channel muxer
*/
metrics?: CounterGroup

/**
* Data channel options
*/
dataChannelOptions?: Partial<DataChannelOpts>
}

export class DataChannelMuxerFactory implements StreamMuxerFactory {
/**
* WebRTC Peer Connection
*/
private readonly peerConnection: RTCPeerConnection
private streamBuffer: WebRTCStream[] = []
private readonly metrics?: CounterGroup

constructor (peerConnection: RTCPeerConnection, metrics?: CounterGroup, readonly protocol = '/webrtc') {
this.peerConnection = peerConnection
constructor (readonly init: DataChannelMuxerFactoryInit, readonly protocol = '/webrtc') {
// store any datachannels opened before upgrade has been completed
this.peerConnection.ondatachannel = ({ channel }) => {
this.init.peerConnection.ondatachannel = ({ channel }) => {
const stream = new WebRTCStream({
channel,
stat: {
direction: 'inbound',
timeline: { open: 0 }
},
dataChannelOptions: init.dataChannelOptions,
closeCb: (_stream) => {
this.streamBuffer = this.streamBuffer.filter(s => !_stream.eq(s))
}
})
this.streamBuffer.push(stream)
}
this.metrics = metrics
}

createStreamMuxer (init?: StreamMuxerInit | undefined): StreamMuxer {
return new DataChannelMuxer(this.peerConnection, this.streamBuffer, this.protocol, init, this.metrics)
return new DataChannelMuxer(this.init, this.streamBuffer, this.protocol, init)
}
}

/**
* A libp2p data channel stream muxer
*/
export class DataChannelMuxer implements StreamMuxer {
/**
* WebRTC Peer Connection
*/
private readonly peerConnection: RTCPeerConnection

/**
* Optional metrics for this data channel muxer
*/
private readonly metrics?: CounterGroup

/**
* Array of streams in the data channel
*/
Expand All @@ -82,24 +81,19 @@ export class DataChannelMuxer implements StreamMuxer {
*/
sink: Sink<Source<Uint8Array | Uint8ArrayList>, Promise<void>> = nopSink

constructor (peerConnection: RTCPeerConnection, streams: Stream[], readonly protocol: string = '/webrtc', init?: StreamMuxerInit, metrics?: CounterGroup) {
constructor (readonly dataChannelMuxer: DataChannelMuxerFactoryInit, streams: Stream[], readonly protocol: string = '/webrtc', init?: StreamMuxerInit) {
/**
* Initialized stream muxer
*/
this.init = init

/**
* WebRTC Peer Connection
*/
this.peerConnection = peerConnection

/**
* Fired when a data channel has been added to the connection has been
* added by the remote peer.
*
* {@link https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/datachannel_event}
*/
this.peerConnection.ondatachannel = ({ channel }) => {
this.dataChannelMuxer.peerConnection.ondatachannel = ({ channel }) => {
const stream = new WebRTCStream({
channel,
stat: {
Expand All @@ -108,12 +102,13 @@ export class DataChannelMuxer implements StreamMuxer {
open: 0
}
},
dataChannelOptions: dataChannelMuxer.dataChannelOptions,
closeCb: this.wrapStreamEnd(init?.onIncomingStream)
})

this.streams.push(stream)
if ((init?.onIncomingStream) != null) {
this.metrics?.increment({ incoming_stream: true })
this.dataChannelMuxer.metrics?.increment({ incoming_stream: true })
init.onIncomingStream(stream)
}
}
Expand All @@ -133,9 +128,9 @@ export class DataChannelMuxer implements StreamMuxer {

newStream (): Stream {
// The spec says the label SHOULD be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label
const channel = this.peerConnection.createDataChannel('')
const channel = this.dataChannelMuxer.peerConnection.createDataChannel('')
const closeCb = (stream: Stream): void => {
this.metrics?.increment({ stream_end: true })
this.dataChannelMuxer.metrics?.increment({ stream_end: true })
this.init?.onStreamEnd?.(stream)
}
const stream = new WebRTCStream({
Expand All @@ -146,10 +141,11 @@ export class DataChannelMuxer implements StreamMuxer {
open: 0
}
},
dataChannelOptions: this.dataChannelMuxer.dataChannelOptions,
closeCb: this.wrapStreamEnd(closeCb)
})
this.streams.push(stream)
this.metrics?.increment({ outgoing_stream: true })
this.dataChannelMuxer.metrics?.increment({ outgoing_stream: true })

return stream
}
Expand Down
13 changes: 7 additions & 6 deletions src/private-to-private/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import pDefer, { type DeferredPromise } from 'p-defer'
import { DataChannelMuxerFactory } from '../muxer.js'
import { Message } from './pb/message.js'
import { readCandidatesUntilConnected, resolveOnConnected } from './util.js'
import type { DataChannelOpts } from '../stream.js'
import type { Stream } from '@libp2p/interface-connection'
import type { IncomingStreamData } from '@libp2p/interface-registrar'
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer'
Expand All @@ -13,14 +14,13 @@ const DEFAULT_TIMEOUT = 30 * 1000

const log = logger('libp2p:webrtc:peer')

export type IncomingStreamOpts = { rtcConfiguration?: RTCConfiguration } & IncomingStreamData
export type IncomingStreamOpts = { rtcConfiguration?: RTCConfiguration, dataChannelOptions?: Partial<DataChannelOpts> } & IncomingStreamData

export async function handleIncomingStream ({ rtcConfiguration, stream: rawStream }: IncomingStreamOpts): Promise<{ pc: RTCPeerConnection, muxerFactory: StreamMuxerFactory, remoteAddress: string }> {
export async function handleIncomingStream ({ rtcConfiguration, dataChannelOptions, stream: rawStream }: IncomingStreamOpts): Promise<{ pc: RTCPeerConnection, muxerFactory: StreamMuxerFactory, remoteAddress: string }> {
const signal = AbortSignal.timeout(DEFAULT_TIMEOUT)
const stream = pbStream(abortableDuplex(rawStream, signal)).pb(Message)
const pc = new RTCPeerConnection(rtcConfiguration)
const muxerFactory = new DataChannelMuxerFactory(pc)

const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions })
const connectedPromise: DeferredPromise<void> = pDefer()
const answerSentPromise: DeferredPromise<void> = pDefer()

Expand Down Expand Up @@ -86,13 +86,14 @@ export interface ConnectOptions {
stream: Stream
signal: AbortSignal
rtcConfiguration?: RTCConfiguration
dataChannelOptions?: Partial<DataChannelOpts>
}

export async function initiateConnection ({ rtcConfiguration, signal, stream: rawStream }: ConnectOptions): Promise<{ pc: RTCPeerConnection, muxerFactory: StreamMuxerFactory, remoteAddress: string }> {
export async function initiateConnection ({ rtcConfiguration, dataChannelOptions, signal, stream: rawStream }: ConnectOptions): Promise<{ pc: RTCPeerConnection, muxerFactory: StreamMuxerFactory, remoteAddress: string }> {
const stream = pbStream(abortableDuplex(rawStream, signal)).pb(Message)
// setup peer connection
const pc = new RTCPeerConnection(rtcConfiguration)
const muxerFactory = new DataChannelMuxerFactory(pc)
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions })

const connectedPromise: DeferredPromise<void> = pDefer()
resolveOnConnected(pc, connectedPromise)
Expand Down
8 changes: 6 additions & 2 deletions src/private-to-private/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { codes } from '../error.js'
import { WebRTCMultiaddrConnection } from '../maconn.js'
import { initiateConnection, handleIncomingStream } from './handler.js'
import { WebRTCPeerListener } from './listener.js'
import type { DataChannelOpts } from '../stream.js'
import type { Connection } from '@libp2p/interface-connection'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar'
Expand All @@ -21,6 +22,7 @@ const WEBRTC_CODE = protocols('webrtc').code

export interface WebRTCTransportInit {
rtcConfiguration?: RTCConfiguration
dataChannel?: Partial<DataChannelOpts>
}

export interface WebRTCTransportComponents {
Expand All @@ -35,7 +37,7 @@ export class WebRTCTransport implements Transport, Startable {

constructor (
private readonly components: WebRTCTransportComponents,
private readonly init: WebRTCTransportInit
private readonly init: WebRTCTransportInit = {}
) {
}

Expand Down Expand Up @@ -123,6 +125,7 @@ export class WebRTCTransport implements Transport, Startable {
const { pc, muxerFactory, remoteAddress } = await initiateConnection({
stream: signalingStream,
rtcConfiguration: this.init.rtcConfiguration,
dataChannelOptions: this.init.dataChannel,
signal: options.signal
})

Expand Down Expand Up @@ -154,7 +157,8 @@ export class WebRTCTransport implements Transport, Startable {
const { pc, muxerFactory, remoteAddress } = await handleIncomingStream({
rtcConfiguration: this.init.rtcConfiguration,
connection,
stream
stream,
dataChannelOptions: this.init.dataChannel
})

await this.components.upgrader.upgradeInbound(new WebRTCMultiaddrConnection({
Expand Down
15 changes: 10 additions & 5 deletions src/private-to-public/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { fromString as uint8arrayFromString } from 'uint8arrays/from-string'
import { dataChannelError, inappropriateMultiaddr, unimplemented, invalidArgument } from '../error.js'
import { WebRTCMultiaddrConnection } from '../maconn.js'
import { DataChannelMuxerFactory } from '../muxer.js'
import { WebRTCStream } from '../stream.js'
import { type DataChannelOpts, WebRTCStream } from '../stream.js'
import { isFirefox } from '../util.js'
import * as sdp from './sdp.js'
import { genUfrag } from './util.js'
Expand Down Expand Up @@ -52,12 +52,17 @@ export interface WebRTCMetrics {
dialerEvents: CounterGroup
}

export interface WebRTCTransportDirectInit {
dataChannel?: Partial<DataChannelOpts>
}

export class WebRTCDirectTransport implements Transport {
private readonly metrics?: WebRTCMetrics
private readonly components: WebRTCDirectTransportComponents

constructor (components: WebRTCDirectTransportComponents) {
private readonly init: WebRTCTransportDirectInit
constructor (components: WebRTCDirectTransportComponents, init: WebRTCTransportDirectInit = {}) {
this.components = components
this.init = init
if (components.metrics != null) {
this.metrics = {
dialerEvents: components.metrics.registerCounterGroup('libp2p_webrtc_dialer_events_total', {
Expand Down Expand Up @@ -185,7 +190,7 @@ export class WebRTCDirectTransport implements Transport {
// we pass in undefined for these parameters.
const noise = Noise({ prologueBytes: fingerprintsPrologue })()

const wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: { direction: 'inbound', timeline: { open: 1 } } })
const wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: { direction: 'inbound', timeline: { open: 1 } }, dataChannelOptions: this.init.dataChannel })
const wrappedDuplex = {
...wrappedChannel,
sink: wrappedChannel.sink.bind(wrappedChannel),
Expand Down Expand Up @@ -231,7 +236,7 @@ export class WebRTCDirectTransport implements Transport {
// Track opened peer connection
this.metrics?.dialerEvents.increment({ peer_connection: true })

const muxerFactory = new DataChannelMuxerFactory(peerConnection, this.metrics?.dialerEvents)
const muxerFactory = new DataChannelMuxerFactory({ peerConnection, metrics: this.metrics?.dialerEvents, dataChannelOptions: this.init.dataChannel })

// For outbound connections, the remote is expected to start the noise handshake.
// Therefore, we need to secure an inbound noise connection from the remote.
Expand Down
Loading

0 comments on commit aca4422

Please sign in to comment.