Skip to content

Commit d25d951

Browse files
authoredNov 6, 2023
fix: delay notification of early WebRTC stream creation (#2206)
The datachannel muxer is created during set up of the `Connection` object. If we notify of early stream creation before the `Connection` object is properly configured, the early streams will be lost. This can happen when the remote opens a data channel before the local node has finished setting up it's end of the connection. The fix is to notify asynchronously which gives the upgrader enough time to finish setting up the `Connection`.
1 parent dfbe0cc commit d25d951

File tree

3 files changed

+72
-13
lines changed

3 files changed

+72
-13
lines changed
 

‎packages/transport-webrtc/src/muxer.ts

+18-8
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,24 @@ export class DataChannelMuxer implements StreamMuxer {
137137
init?.onIncomingStream?.(stream)
138138
}
139139

140-
this.init.streams.forEach(bufferedStream => {
141-
bufferedStream.onEnd = () => {
142-
this.#onStreamEnd(bufferedStream.stream, bufferedStream.channel)
143-
}
144-
145-
this.metrics?.increment({ incoming_stream: true })
146-
this.init?.onIncomingStream?.(bufferedStream.stream)
147-
})
140+
// the DataChannelMuxer constructor is called during set up of the
141+
// connection by the upgrader.
142+
//
143+
// If we invoke `init.onIncomingStream` immediately, the connection object
144+
// will not be set up yet so add a tiny delay before letting the
145+
// connection know about early streams
146+
if (this.init.streams.length > 0) {
147+
queueMicrotask(() => {
148+
this.init.streams.forEach(bufferedStream => {
149+
bufferedStream.onEnd = () => {
150+
this.#onStreamEnd(bufferedStream.stream, bufferedStream.channel)
151+
}
152+
153+
this.metrics?.increment({ incoming_stream: true })
154+
this.init?.onIncomingStream?.(bufferedStream.stream)
155+
})
156+
})
157+
}
148158
}
149159

150160
#onStreamEnd (stream: Stream, channel: RTCDataChannel): void {

‎packages/transport-webrtc/src/private-to-private/transport.ts

+10-5
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ export class WebRTCTransport implements Transport, Startable {
121121
log.trace('dialing address: %a', ma)
122122

123123
const peerConnection = new RTCPeerConnection(this.init.rtcConfiguration)
124+
const muxerFactory = new DataChannelMuxerFactory({
125+
peerConnection,
126+
dataChannelOptions: this.init.dataChannel
127+
})
124128

125129
const { remoteAddress } = await initiateConnection({
126130
peerConnection,
@@ -141,10 +145,7 @@ export class WebRTCTransport implements Transport, Startable {
141145
const connection = await options.upgrader.upgradeOutbound(webRTCConn, {
142146
skipProtection: true,
143147
skipEncryption: true,
144-
muxerFactory: new DataChannelMuxerFactory({
145-
peerConnection,
146-
dataChannelOptions: this.init.dataChannel
147-
})
148+
muxerFactory
148149
})
149150

150151
// close the connection on shut down
@@ -156,6 +157,10 @@ export class WebRTCTransport implements Transport, Startable {
156157
async _onProtocol ({ connection, stream }: IncomingStreamData): Promise<void> {
157158
const signal = AbortSignal.timeout(this.init.inboundConnectionTimeout ?? INBOUND_CONNECTION_TIMEOUT)
158159
const peerConnection = new RTCPeerConnection(this.init.rtcConfiguration)
160+
const muxerFactory = new DataChannelMuxerFactory({
161+
peerConnection,
162+
dataChannelOptions: this.init.dataChannel
163+
})
159164

160165
try {
161166
const { remoteAddress } = await handleIncomingStream({
@@ -178,7 +183,7 @@ export class WebRTCTransport implements Transport, Startable {
178183
await this.components.upgrader.upgradeInbound(webRTCConn, {
179184
skipEncryption: true,
180185
skipProtection: true,
181-
muxerFactory: new DataChannelMuxerFactory({ peerConnection, dataChannelOptions: this.init.dataChannel })
186+
muxerFactory
182187
})
183188

184189
// close the stream if SDP messages have been exchanged successfully
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/* eslint-disable @typescript-eslint/no-unused-expressions */
2+
3+
import { expect } from 'aegir/chai'
4+
import pRetry from 'p-retry'
5+
import { stubInterface } from 'sinon-ts'
6+
import { DataChannelMuxerFactory } from '../src/muxer.js'
7+
8+
describe('muxer', () => {
9+
it('should delay notification of early streams', async () => {
10+
let onIncomingStreamInvoked = false
11+
12+
// @ts-expect-error incomplete implementation
13+
const peerConnection: RTCPeerConnection = {}
14+
15+
const muxerFactory = new DataChannelMuxerFactory({
16+
peerConnection
17+
})
18+
19+
// simulate early connection
20+
// @ts-expect-error incomplete implementation
21+
const event: RTCDataChannelEvent = {
22+
channel: stubInterface<RTCDataChannel>({
23+
readyState: 'connecting'
24+
})
25+
}
26+
peerConnection.ondatachannel?.(event)
27+
28+
muxerFactory.createStreamMuxer({
29+
onIncomingStream: () => {
30+
onIncomingStreamInvoked = true
31+
}
32+
})
33+
34+
expect(onIncomingStreamInvoked).to.be.false()
35+
36+
await pRetry(() => {
37+
if (!onIncomingStreamInvoked) {
38+
throw new Error('onIncomingStreamInvoked was still false')
39+
}
40+
})
41+
42+
expect(onIncomingStreamInvoked).to.be.true()
43+
})
44+
})

0 commit comments

Comments
 (0)
Please sign in to comment.