-
-
Notifications
You must be signed in to change notification settings - Fork 21
/
traverse.ts
112 lines (103 loc) · 2.86 KB
/
traverse.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/
import {
concat,
from,
identity,
MonoTypeOperatorFunction,
noop,
Observable,
ObservableInput,
of,
OperatorFunction,
Subject,
} from "rxjs";
import { expand, ignoreElements, mergeMap, tap } from "rxjs/operators";
import { OperatorSubscriber } from "../OperatorSubscriber";
import { NotificationQueue } from "./NotificationQueue";
export type TraverseElement<T, M> = {
markers: ObservableInput<M>;
values: ObservableInput<T>;
};
export type TraverseFactory<T, M> = (
marker: M | undefined,
index: number
) => Observable<TraverseElement<T, M>>;
export function traverse<T, M>(options: {
concurrency?: number;
factory: TraverseFactory<T, M>;
notifier: Observable<any>;
}): Observable<T>;
export function traverse<T, M, R>(options: {
concurrency?: number;
factory: TraverseFactory<T, M>;
operator: OperatorFunction<T, R>;
}): Observable<R>;
export function traverse<T, M>(options: {
concurrency?: number;
factory: TraverseFactory<T, M>;
}): Observable<T>;
// https://github.com/palantir/tslint/issues/3906
export function traverse<T, M, R>({
concurrency = 1,
factory,
operator = identity,
notifier,
}: {
concurrency?: number;
factory: TraverseFactory<T, M>;
operator?: OperatorFunction<T, T | R>;
notifier?: Observable<any>;
}): Observable<T | R> {
return new Observable<T | R>((subscriber) => {
let queue: NotificationQueue;
let queueOperator: MonoTypeOperatorFunction<M | undefined>;
if (notifier) {
queue = new NotificationQueue(notifier);
queueOperator = identity;
} else {
const subject = new Subject<void>();
queue = new NotificationQueue(subject);
queueOperator = (markers) => {
subject.next();
return markers;
};
}
const destination = new Subject<T | R>();
destination.subscribe(subscriber);
subscriber.add(queue.connect());
of(undefined)
.pipe(
expand(
(marker: M | undefined) =>
queue.pipe(
mergeMap((index) =>
factory(marker, index).pipe(
mergeMap(({ markers, values }) =>
concat(
from(values).pipe(
operator,
tap((value) => destination.next(value)),
ignoreElements()
),
from(markers)
)
)
)
),
queueOperator
),
concurrency
)
)
.subscribe(
new OperatorSubscriber(subscriber, {
complete: () => destination.complete(),
error: (error) => destination.error(error),
next: noop,
})
);
});
}