-
Notifications
You must be signed in to change notification settings - Fork 5
/
subject.ts
87 lines (76 loc) · 2.03 KB
/
subject.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import { Subject, ALL, StrictSink } from './types';
export function makeSubject<T>(): Subject<T> {
let sinks: Array<StrictSink<T> | undefined> = [];
return (type: ALL, data: unknown) => {
if (type === 0) {
const sink = data as StrictSink<T>;
sinks.push(sink);
sink(0, () => {
const i = sinks.indexOf(sink);
sinks[i] = void 0;
});
} else {
let hasDeleted = false;
for (let i = 0; i < sinks.length; i++) {
if (sinks[i]) sinks[i]!(type, data);
else hasDeleted = true;
}
if (hasDeleted) {
sinks = sinks.filter(x => x !== undefined);
}
}
};
}
export function makeAsyncSubject<T>(): Subject<T> {
let sinks: Array<any | undefined> = [];
let buffer1: Array<T> = [];
let buffer2: Array<T> = [];
let bufferLength = 0;
let promise: Promise<void> | undefined;
const scheduleData = () =>
Promise.resolve().then(() => {
// We use two buffers to avoid allocating new arrays all the time
const tmp = buffer2;
const bufLen = bufferLength;
buffer2 = buffer1;
buffer1 = tmp;
bufferLength = 0;
let hasDeleted = false;
for (let i = 0; i < bufLen; i++) {
for (let j = 0; j < sinks.length; j++) {
if (sinks[j]) sinks[j](1, buffer2[i]);
else hasDeleted = true;
}
}
if (hasDeleted) {
sinks = sinks.filter(Boolean);
}
if (bufferLength > 0) {
promise = scheduleData();
} else {
promise = void 0;
}
});
return (type: ALL, data: unknown) => {
if (type === 0) {
const sink = data as any;
sinks.push(sink);
sink(0, () => {
const i = sinks.indexOf(sink);
sinks[i] = void 0;
});
if (!promise) {
promise = scheduleData();
}
} else if (type === 1) {
buffer1[bufferLength++] = data as T;
if (!promise) {
promise = scheduleData();
}
} else {
for (const sink of sinks) {
sink?.(type, data);
}
}
};
}