Skip to content

Commit c6c97c2

Browse files
committed
Added transaction operator
1 parent 9a7a4b1 commit c6c97c2

File tree

3 files changed

+234
-1
lines changed

3 files changed

+234
-1
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ Just my collection of custom RxJS operators, ready to copy+paste.
44

55
## Package contents
66

7-
- `bufferRing` A basic ring buffer implementation.
7+
- `bufferRing`: A basic ring buffer implementation.
88
- `queueMap`: Like `mergeMap`, but the results get emitted in order.
99
- `lazySample`: The missing throttling operator, imho.
1010
- `bounce`: The counterpart to `debounce`. Emits only the first event in a series.
1111
- `hug`: `bounce`+`debounce`=`hug`. Emits the first and last event of a series, but nothing in between.
12+
- `transaction`: Mainly for letting multiple state changes of a `BehaviorSubject` appear as one.
1213

1314
## Disclaimer
1415

operators/transaction.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import {
2+
BehaviorSubject,
3+
MonoTypeOperatorFunction,
4+
NEVER,
5+
OperatorFunction,
6+
defer,
7+
of,
8+
pipe,
9+
} from "rxjs";
10+
import { pairwise, switchMap, filter, mergeWith, tap } from "rxjs/operators";
11+
12+
/**
13+
* Emits whenever the source produces a falsy value followed by a truthy value.
14+
* The result is `[<the falsy value>, <the truthy value>]`.
15+
*/
16+
export function filterRisingEdge<T>(): OperatorFunction<T, [T, T]> {
17+
return pipe(
18+
pairwise(),
19+
switchMap(([a, b]) => (!a && b ? of([a, b] as [T, T]) : NEVER)),
20+
);
21+
}
22+
23+
/**
24+
* This operator can be used to mute a stream while the signal is `false`.
25+
* When applied to a {@link BehaviorSubject}, this can be used to hide
26+
* intermediate states during a computation.
27+
*
28+
* @param signal A {@link BehaviorSubject} that mutes the stream when it is
29+
* `false` and opens the stream when it is `true`.
30+
* @param summarize When set to `true`, the operator will emit the most recent
31+
* value that was produced while the stream was muted, as soon as it gets
32+
* resumed.
33+
*/
34+
export function transaction<T>(
35+
signal: BehaviorSubject<boolean>,
36+
summarize: boolean = false,
37+
): MonoTypeOperatorFunction<T> {
38+
return (source) =>
39+
defer(() => {
40+
let hasLatest = false;
41+
let latest: T | undefined;
42+
return source.pipe(
43+
tap((v) => {
44+
hasLatest = true;
45+
latest = v;
46+
}),
47+
filter(() => signal.value),
48+
mergeWith(
49+
summarize
50+
? signal.pipe(
51+
filterRisingEdge(),
52+
switchMap(() => (hasLatest ? of(latest!) : NEVER)),
53+
)
54+
: NEVER,
55+
),
56+
tap(() => (hasLatest = false))
57+
);
58+
});
59+
}

tests/transaction.test.ts

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
import { BehaviorSubject, Subject } from "rxjs";
2+
import { filterRisingEdge, transaction } from "../operators/transaction";
3+
4+
describe("filterRisingEdge", () => {
5+
it("generates the stream correctly (on booleans)", () => {
6+
const res: [boolean, boolean][] = [];
7+
const source = new Subject<boolean>();
8+
source.pipe(filterRisingEdge()).subscribe(i => {
9+
res.push(i);
10+
});
11+
source.next(false);
12+
expect(res).toEqual([]);
13+
source.next(false);
14+
expect(res).toEqual([]);
15+
source.next(true);
16+
expect(res).toEqual([[false, true]]);
17+
source.next(true);
18+
expect(res).toEqual([[false, true]]);
19+
source.next(false);
20+
expect(res).toEqual([[false, true]]);
21+
source.next(true);
22+
expect(res).toEqual([[false, true], [false, true]]);
23+
});
24+
25+
it("generates the stream correctly (on numbers)", () => {
26+
const res: [number, number][] = [];
27+
const source = new Subject<number>();
28+
source.pipe(filterRisingEdge()).subscribe(i => {
29+
res.push(i);
30+
});
31+
source.next(0);
32+
expect(res).toEqual([]);
33+
source.next(0);
34+
expect(res).toEqual([]);
35+
source.next(1);
36+
expect(res).toEqual([[0, 1]]);
37+
source.next(2);
38+
expect(res).toEqual([[0, 1]]);
39+
source.next(0);
40+
expect(res).toEqual([[0, 1]]);
41+
source.next(3);
42+
expect(res).toEqual([[0, 1], [0, 3]]);
43+
});
44+
});
45+
46+
describe("transaction", () => {
47+
it("generates the stream correctly (summarize = false)", () => {
48+
const res: number[] = [];
49+
const source = new BehaviorSubject<number>(0);
50+
const signal = new BehaviorSubject<boolean>(true);
51+
source.pipe(transaction(signal)).subscribe(i => {
52+
res.push(i);
53+
});
54+
signal.next(false);
55+
source.next(1);
56+
source.next(2);
57+
signal.next(true);
58+
source.next(3);
59+
expect(res).toEqual([0, 3]);
60+
});
61+
62+
it("generates the stream correctly (summarize = true)", () => {
63+
const res: number[] = [];
64+
const source = new BehaviorSubject<number>(0);
65+
const signal = new BehaviorSubject<boolean>(true);
66+
source.pipe(transaction(signal, true)).subscribe(i => {
67+
res.push(i);
68+
});
69+
signal.next(false);
70+
source.next(1);
71+
source.next(2);
72+
signal.next(true);
73+
source.next(3);
74+
expect(res).toEqual([0, 2, 3]);
75+
});
76+
77+
it("generates no summary without a previous emission", () => {
78+
const res: number[] = [];
79+
const source = new Subject<number>();
80+
const signal = new BehaviorSubject<boolean>(false);
81+
source.pipe(transaction(signal, true)).subscribe(i => {
82+
res.push(i);
83+
});
84+
signal.next(true);
85+
expect(res).toEqual([]);
86+
});
87+
88+
it("does not generate the summary twice", () => {
89+
const res: number[] = [];
90+
const source = new Subject<number>();
91+
const signal = new BehaviorSubject<boolean>(false);
92+
source.pipe(transaction(signal, true)).subscribe(i => {
93+
res.push(i);
94+
});
95+
source.next(1);
96+
signal.next(true);
97+
signal.next(false);
98+
signal.next(true);
99+
expect(res).toEqual([1]);
100+
});
101+
102+
it("does not generate the summary when nothing was emitted during pause", () => {
103+
const res: number[] = [];
104+
const source = new Subject<number>();
105+
const signal = new BehaviorSubject<boolean>(true);
106+
source.pipe(transaction(signal, true)).subscribe(i => {
107+
res.push(i);
108+
});
109+
source.next(1);
110+
signal.next(false);
111+
signal.next(true);
112+
expect(res).toEqual([1]);
113+
});
114+
115+
it("generates the stream correctly (on Subject)", () => {
116+
const res: number[] = [];
117+
const source = new Subject<number>();
118+
const signal = new BehaviorSubject<boolean>(true);
119+
source.pipe(transaction(signal)).subscribe(i => {
120+
res.push(i);
121+
});
122+
signal.next(false);
123+
source.next(1);
124+
source.next(2);
125+
signal.next(true);
126+
source.next(3);
127+
expect(res).toEqual([3]);
128+
});
129+
130+
it("generates the stream correctly (on Subject, summarize)", () => {
131+
const res: number[] = [];
132+
const source = new Subject<number>();
133+
const signal = new BehaviorSubject<boolean>(true);
134+
source.pipe(transaction(signal, true)).subscribe(i => {
135+
res.push(i);
136+
});
137+
signal.next(false);
138+
source.next(1);
139+
source.next(2);
140+
signal.next(true);
141+
source.next(3);
142+
expect(res).toEqual([2, 3]);
143+
});
144+
145+
it('does not summarize repeatedly', () => {
146+
const res: number[] = [];
147+
const source = new Subject<number>();
148+
const signal = new BehaviorSubject<boolean>(true);
149+
source.pipe(transaction(signal, true)).subscribe(i => {
150+
res.push(i);
151+
});
152+
signal.next(false);
153+
source.next(1);
154+
source.next(2);
155+
signal.next(true);
156+
signal.next(true);
157+
source.next(3);
158+
expect(res).toEqual([2, 3]);
159+
});
160+
161+
it('emits latest value in summary, even if it is falsy', () => {
162+
const res: boolean[] = [];
163+
const source = new Subject<boolean>();
164+
const signal = new BehaviorSubject<boolean>(true);
165+
source.pipe(transaction(signal, true)).subscribe(i => {
166+
res.push(i);
167+
});
168+
signal.next(false);
169+
source.next(false);
170+
signal.next(true);
171+
expect(res).toEqual([false]);
172+
});
173+
});

0 commit comments

Comments
 (0)