This repository has been archived by the owner on Jul 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 30
/
encode.ts
84 lines (67 loc) · 2.17 KB
/
encode.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import batchedBytes from 'it-batched-bytes'
import { Uint8ArrayList } from 'uint8arraylist'
import varint from 'varint'
import { allocUnsafe } from './alloc-unsafe.js'
import { type Message, MessageTypes } from './message-types.js'
import type { Source } from 'it-stream-types'
const POOL_SIZE = 10 * 1024
class Encoder {
private _pool: Uint8Array
private _poolOffset: number
constructor () {
this._pool = allocUnsafe(POOL_SIZE)
this._poolOffset = 0
}
/**
* Encodes the given message and adds it to the passed list
*/
write (msg: Message, list: Uint8ArrayList): void {
const pool = this._pool
let offset = this._poolOffset
varint.encode(msg.id << 3 | msg.type, pool, offset)
offset += varint.encode.bytes ?? 0
if ((msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) && msg.data != null) {
varint.encode(msg.data.length, pool, offset)
} else {
varint.encode(0, pool, offset)
}
offset += varint.encode.bytes ?? 0
const header = pool.subarray(this._poolOffset, offset)
if (POOL_SIZE - offset < 100) {
this._pool = allocUnsafe(POOL_SIZE)
this._poolOffset = 0
} else {
this._poolOffset = offset
}
list.append(header)
if ((msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) && msg.data != null) {
list.append(msg.data)
}
}
}
const encoder = new Encoder()
/**
* Encode and yield one or more messages
*/
export async function * encode (source: Source<Message[]>, minSendBytes: number = 0): AsyncGenerator<Uint8Array, void, undefined> {
if (minSendBytes == null || minSendBytes === 0) {
// just send the messages
for await (const messages of source) {
const list = new Uint8ArrayList()
for (const msg of messages) {
encoder.write(msg, list)
}
yield list.subarray()
}
return
}
// batch messages up for sending
yield * batchedBytes(source, {
size: minSendBytes,
serialize: (obj, list) => {
for (const m of obj) {
encoder.write(m, list)
}
}
})
}