-
Notifications
You must be signed in to change notification settings - Fork 43
/
stream.ts
79 lines (65 loc) · 2.29 KB
/
stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import { Stream } from '@libp2p/interface-connection'
import { abortableSource } from 'abortable-iterator'
import { pipe } from 'it-pipe'
import { pushable, Pushable } from 'it-pushable'
import { encode, decode } from 'it-length-prefixed'
import { Uint8ArrayList } from 'uint8arraylist'
type OutboundStreamOpts = {
/** Max size in bytes for pushable buffer. If full, will throw on .push */
maxBufferSize?: number
}
type InboundStreamOpts = {
/** Max size in bytes for reading messages from the stream */
maxDataLength?: number
}
export class OutboundStream {
private readonly pushable: Pushable<Uint8Array>
private readonly closeController: AbortController
private readonly maxBufferSize: number
constructor(private readonly rawStream: Stream, errCallback: (e: Error) => void, opts: OutboundStreamOpts) {
this.pushable = pushable({ objectMode: false })
this.closeController = new AbortController()
this.maxBufferSize = opts.maxBufferSize ?? Infinity
pipe(
abortableSource(this.pushable, this.closeController.signal, { returnOnAbort: true }),
(source) => encode(source),
this.rawStream
).catch(errCallback)
}
get protocol(): string {
// TODO remove this non-nullish assertion after https://github.com/libp2p/js-libp2p-interfaces/pull/265 is incorporated
return this.rawStream.stat.protocol!
}
push(data: Uint8Array): void {
if (this.pushable.readableLength > this.maxBufferSize) {
throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`)
}
this.pushable.push(data)
}
close(): void {
this.closeController.abort()
// similar to pushable.end() but clear the internal buffer
this.pushable.return()
this.rawStream.close()
}
}
export class InboundStream {
public readonly source: AsyncIterable<Uint8ArrayList>
private readonly rawStream: Stream
private readonly closeController: AbortController
constructor(rawStream: Stream, opts: InboundStreamOpts = {}) {
this.rawStream = rawStream
this.closeController = new AbortController()
this.source = abortableSource(
pipe(this.rawStream, (source) => decode(source, opts)),
this.closeController.signal,
{
returnOnAbort: true
}
)
}
close(): void {
this.closeController.abort()
this.rawStream.close()
}
}