-
Notifications
You must be signed in to change notification settings - Fork 29
/
index.js
93 lines (81 loc) · 2.11 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* A Subscription represents the ongoing execution of an Observable
* and the possibility to cancel such execution.
*/
class Subscription {
constructor(unsubscribe) {
this.unsubscribe = unsubscribe;
}
}
/**
* A Subscriber is both an Observer and a Subscription. It wraps a given
* Observer and enforces the Observable contract `(next)*(error|complete)?`
* by cancelling the execution whenever error or complete occurs.
*/
class Subscriber extends Subscription {
constructor(observer) {
super(function unsubscribe() {});
this.observer = observer;
}
next(x) {
this.observer.next(x);
}
error(e) {
this.observer.error(e);
this.unsubscribe();
}
complete() {
this.observer.complete();
this.unsubscribe();
}
}
/**
* An Observable is an invokable collection of values pushed to an Observer.
*/
class Observable {
constructor(subscribe) {
this.subscribe = subscribe;
}
/**
* Observable create is the only contract-abiding way of creating Observables.
*/
static create(subscribe) {
return new Observable(function internalSubscribe(observer) {
const subscriber = new Subscriber(observer);
const subscription = subscribe(subscriber);
subscriber.unsubscribe = subscription.unsubscribe.bind(subscription);
return subscription;
});
}
}
/**
* A Subject is both an Observable and an Observer.
* It is the only concept in RxJS that maintains a list of Observers.
*/
class Subject extends Observable {
constructor() {
super(function subscribe(observer) {
this.observers.push(observer);
return new Subscription(() => {
const index = this.observers.indexOf(observer);
if (index >= 0) this.observers.splice(index, 1);
});
});
this.observers = [];
}
next(x) {
this.observers.forEach((observer) => observer.next(x));
}
error(e) {
this.observers.forEach((observer) => observer.error(e));
}
complete() {
this.observers.forEach((observer) => observer.complete());
}
}
const Rx = {
Subscription: Subscription,
Observable: Observable,
Subject: Subject,
};
module.exports = Rx;