Skip to content

Commit 7d8b155

Browse files
achingbrainmaschad
andcommitted
fix: close webrtc streams without data loss (#2073)
- Gracefully close streams on muxer shutdown - Refactor initiator/recipient flows for clarity - Wait for `bufferedAmount` to be `0` before closing a datachannel - Close datachannels on both initiator and recipient - Implements FIN_ACK for closing datachannels without data loss Supersedes #2048 --------- Co-authored-by: Chad Nehemiah <chad.nehemiah94@gmail.com>
1 parent 980857c commit 7d8b155

30 files changed

+1548
-498
lines changed

packages/interface/package.json

+4
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,15 @@
164164
"it-stream-types": "^2.0.1",
165165
"multiformats": "^12.0.1",
166166
"p-defer": "^4.0.0",
167+
"race-signal": "^1.0.0",
167168
"uint8arraylist": "^2.4.3"
168169
},
169170
"devDependencies": {
170171
"@types/sinon": "^10.0.15",
171172
"aegir": "^40.0.8",
173+
"delay": "^6.0.0",
174+
"it-all": "^3.0.3",
175+
"it-drain": "^3.0.3",
172176
"sinon": "^16.0.0",
173177
"sinon-ts": "^1.0.0"
174178
}

packages/interface/src/stream-muxer/stream.ts

+45-23
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import { abortableSource } from 'abortable-iterator'
22
import { type Pushable, pushable } from 'it-pushable'
33
import defer, { type DeferredPromise } from 'p-defer'
4+
import { raceSignal } from 'race-signal'
45
import { Uint8ArrayList } from 'uint8arraylist'
56
import { CodeError } from '../errors.js'
67
import type { Direction, ReadStatus, Stream, StreamStatus, StreamTimeline, WriteStatus } from '../connection/index.js'
78
import type { AbortOptions } from '../index.js'
89
import type { Source } from 'it-stream-types'
910

11+
// copied from @libp2p/logger to break a circular dependency
1012
interface Logger {
1113
(formatter: any, ...args: any[]): void
1214
error: (formatter: any, ...args: any[]) => void
@@ -16,6 +18,7 @@ interface Logger {
1618

1719
const ERR_STREAM_RESET = 'ERR_STREAM_RESET'
1820
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE'
21+
const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000
1922

2023
export interface AbstractStreamInit {
2124
/**
@@ -68,6 +71,12 @@ export interface AbstractStreamInit {
6871
* connection when closing the writable end of the stream. (default: 500)
6972
*/
7073
closeTimeout?: number
74+
75+
/**
76+
* After the stream sink has closed, a limit on how long it takes to send
77+
* a close-write message to the remote peer.
78+
*/
79+
sendCloseWriteTimeout?: number
7180
}
7281

7382
function isPromise (res?: any): res is Promise<void> {
@@ -94,6 +103,7 @@ export abstract class AbstractStream implements Stream {
94103
private readonly onCloseWrite?: () => void
95104
private readonly onReset?: () => void
96105
private readonly onAbort?: (err: Error) => void
106+
private readonly sendCloseWriteTimeout: number
97107

98108
protected readonly log: Logger
99109

@@ -113,6 +123,7 @@ export abstract class AbstractStream implements Stream {
113123
this.timeline = {
114124
open: Date.now()
115125
}
126+
this.sendCloseWriteTimeout = init.sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT
116127

117128
this.onEnd = init.onEnd
118129
this.onCloseRead = init?.onCloseRead
@@ -128,7 +139,6 @@ export abstract class AbstractStream implements Stream {
128139
this.log.trace('source ended')
129140
}
130141

131-
this.readStatus = 'closed'
132142
this.onSourceEnd(err)
133143
}
134144
})
@@ -173,11 +183,19 @@ export abstract class AbstractStream implements Stream {
173183
}
174184
}
175185

176-
this.log.trace('sink finished reading from source')
177-
this.writeStatus = 'done'
186+
this.log.trace('sink finished reading from source, write status is "%s"', this.writeStatus)
187+
188+
if (this.writeStatus === 'writing') {
189+
this.writeStatus = 'closing'
190+
191+
this.log.trace('send close write to remote')
192+
await this.sendCloseWrite({
193+
signal: AbortSignal.timeout(this.sendCloseWriteTimeout)
194+
})
195+
196+
this.writeStatus = 'closed'
197+
}
178198

179-
this.log.trace('sink calling closeWrite')
180-
await this.closeWrite(options)
181199
this.onSinkEnd()
182200
} catch (err: any) {
183201
this.log.trace('sink ended with error, calling abort with error', err)
@@ -196,6 +214,7 @@ export abstract class AbstractStream implements Stream {
196214
}
197215

198216
this.timeline.closeRead = Date.now()
217+
this.readStatus = 'closed'
199218

200219
if (err != null && this.endErr == null) {
201220
this.endErr = err
@@ -207,6 +226,10 @@ export abstract class AbstractStream implements Stream {
207226
this.log.trace('source and sink ended')
208227
this.timeline.close = Date.now()
209228

229+
if (this.status !== 'aborted' && this.status !== 'reset') {
230+
this.status = 'closed'
231+
}
232+
210233
if (this.onEnd != null) {
211234
this.onEnd(this.endErr)
212235
}
@@ -221,6 +244,7 @@ export abstract class AbstractStream implements Stream {
221244
}
222245

223246
this.timeline.closeWrite = Date.now()
247+
this.writeStatus = 'closed'
224248

225249
if (err != null && this.endErr == null) {
226250
this.endErr = err
@@ -232,6 +256,10 @@ export abstract class AbstractStream implements Stream {
232256
this.log.trace('sink and source ended')
233257
this.timeline.close = Date.now()
234258

259+
if (this.status !== 'aborted' && this.status !== 'reset') {
260+
this.status = 'closed'
261+
}
262+
235263
if (this.onEnd != null) {
236264
this.onEnd(this.endErr)
237265
}
@@ -266,16 +294,16 @@ export abstract class AbstractStream implements Stream {
266294
const readStatus = this.readStatus
267295
this.readStatus = 'closing'
268296

269-
if (readStatus === 'ready') {
270-
this.log.trace('ending internal source queue')
271-
this.streamSource.end()
272-
}
273-
274297
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) {
275298
this.log.trace('send close read to remote')
276299
await this.sendCloseRead(options)
277300
}
278301

302+
if (readStatus === 'ready') {
303+
this.log.trace('ending internal source queue')
304+
this.streamSource.end()
305+
}
306+
279307
this.log.trace('closed readable end of stream')
280308
}
281309

@@ -286,33 +314,26 @@ export abstract class AbstractStream implements Stream {
286314

287315
this.log.trace('closing writable end of stream with starting write status "%s"', this.writeStatus)
288316

289-
const writeStatus = this.writeStatus
290-
291317
if (this.writeStatus === 'ready') {
292318
this.log.trace('sink was never sunk, sink an empty array')
293-
await this.sink([])
294-
}
295319

296-
this.writeStatus = 'closing'
320+
await raceSignal(this.sink([]), options.signal)
321+
}
297322

298-
if (writeStatus === 'writing') {
323+
if (this.writeStatus === 'writing') {
299324
// stop reading from the source passed to `.sink` in the microtask queue
300325
// - this lets any data queued by the user in the current tick get read
301326
// before we exit
302327
await new Promise((resolve, reject) => {
303328
queueMicrotask(() => {
304329
this.log.trace('aborting source passed to .sink')
305330
this.sinkController.abort()
306-
this.sinkEnd.promise.then(resolve, reject)
331+
raceSignal(this.sinkEnd.promise, options.signal)
332+
.then(resolve, reject)
307333
})
308334
})
309335
}
310336

311-
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeWrite == null) {
312-
this.log.trace('send close write to remote')
313-
await this.sendCloseWrite(options)
314-
}
315-
316337
this.writeStatus = 'closed'
317338

318339
this.log.trace('closed writable end of stream')
@@ -357,6 +378,7 @@ export abstract class AbstractStream implements Stream {
357378
const err = new CodeError('stream reset', ERR_STREAM_RESET)
358379

359380
this.status = 'reset'
381+
this.timeline.reset = Date.now()
360382
this._closeSinkAndSource(err)
361383
this.onReset?.()
362384
}
@@ -423,7 +445,7 @@ export abstract class AbstractStream implements Stream {
423445
return
424446
}
425447

426-
this.log.trace('muxer destroyed')
448+
this.log.trace('stream destroyed')
427449

428450
this._closeSinkAndSource()
429451
}
+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// copied from @libp2p/logger to break a circular dependency
2+
interface Logger {
3+
(): void
4+
error: () => void
5+
trace: () => void
6+
enabled: boolean
7+
}
8+
9+
export function logger (): Logger {
10+
const output = (): void => {}
11+
output.trace = (): void => {}
12+
output.error = (): void => {}
13+
output.enabled = false
14+
15+
return output
16+
}

0 commit comments

Comments
 (0)