Skip to content

Commit 58cd806

Browse files
committed
feat(WebSocketSubject): add basic WebSocketSubject implementation
- adds `_finalComplete`, `_finalError`, and `_finalNext` protected methods to Subject. This is to allow explicit calls to send to final destination observers, since the destination and the final set of observers can differ in implementations like WebSocketSubject
1 parent 2ca4236 commit 58cd806

File tree

6 files changed

+388
-34
lines changed

6 files changed

+388
-34
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/* globals describe, it, expect, sinon, rxTestScheduler */
2+
var Rx = require('../../../dist/cjs/Rx.DOM');
3+
var Observable = Rx.Observable;
4+
5+
function noop() {
6+
// nope.
7+
}
8+
9+
describe('Observable.webSocket', function () {
10+
beforeEach(function () {
11+
setupMockWebSocket();
12+
});
13+
14+
afterEach(function () {
15+
teardownMockWebSocket();
16+
});
17+
18+
it ('should send a message', function () {
19+
var messageReceived = false;
20+
var subject = Observable.webSocket('ws://mysocket');
21+
22+
subject.next('ping');
23+
24+
subject.subscribe(function (x) {
25+
expect(x).toBe('pong');
26+
messageReceived = true;
27+
});
28+
29+
var socket = MockWebSocket.lastSocket();
30+
31+
socket.open();
32+
expect(socket.lastMessageSent()).toBe('ping');
33+
34+
socket.triggerMessage('pong');
35+
expect(messageReceived).toBe(true);
36+
});
37+
});
38+
39+
var sockets = [];
40+
41+
function MockWebSocket(url, protocol) {
42+
sockets.push(this);
43+
this.url = url;
44+
this.protocol = protocol;
45+
this.sent = [];
46+
this.handlers = {};
47+
this.readyState = 1;
48+
}
49+
50+
MockWebSocket.lastSocket = function () {
51+
return sockets.length > 0 ? sockets[sockets.length - 1] : undefined;
52+
};
53+
54+
MockWebSocket.prototype = {
55+
send: function (data) {
56+
this.sent.push(data);
57+
},
58+
59+
lastMessageSent: function () {
60+
var sent = this.sent;
61+
return sent.length > 0 ? sent[sent.length - 1] : undefined;
62+
},
63+
64+
triggerClose: function (e) {
65+
this.readyState = 3;
66+
this.trigger('close', e);
67+
},
68+
69+
triggerError: function (err) {
70+
this.readyState = 3;
71+
this.trigger('error', err);
72+
},
73+
74+
triggerMessage: function (data) {
75+
var messageEvent = {
76+
data: JSON.stringify(data),
77+
origin: 'mockorigin',
78+
ports: undefined,
79+
source: __root__,
80+
};
81+
82+
this.trigger('message', messageEvent);
83+
},
84+
85+
open: function () {
86+
this.readyState = 1;
87+
this.trigger('open', {});
88+
},
89+
90+
close: function (code, reason) {
91+
if (this.readyState < 2) {
92+
this.readyState = 2;
93+
this.closeCode = code;
94+
this.closeReason = reason;
95+
this.triggerClose();
96+
}
97+
},
98+
99+
addEventListener: function (name, handler) {
100+
var lookup = this.handlers[name] = this.handlers[name] || [];
101+
lookup.push(handler);
102+
},
103+
104+
removeEventListener: function (name, handler) {
105+
var lookup = this.handlers[name];
106+
if (lookup) {
107+
for (var i = lookup.length - 1; i--;) {
108+
if (lookup[i] === handler) {
109+
lookup.splice(i, 1);
110+
}
111+
}
112+
}
113+
},
114+
115+
trigger: function (name, e) {
116+
if (this['on' + name]) {
117+
this['on' + name](e);
118+
}
119+
120+
var lookup = this.handlers[name];
121+
if (lookup) {
122+
for (var i = 0; i < lookup.length; i++) {
123+
lookup[i](e);
124+
}
125+
}
126+
}
127+
}
128+
129+
var __ws;
130+
function setupMockWebSocket() {
131+
sockets = [];
132+
__ws = __root__.WebSocket;
133+
__root__.WebSocket = MockWebSocket;
134+
}
135+
136+
function teardownMockWebSocket() {
137+
__root__.WebSocket = __ws;
138+
sockets = null;
139+
}

src/Observable.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import {RangeObservable} from './observable/range';
3232
import {InfiniteObservable} from './observable/never';
3333
import {ErrorObservable} from './observable/throw';
3434
import {AjaxCreationMethod} from './observable/dom/ajax';
35+
import {WebSocketSubject} from './observable/dom/webSocket';
3536

3637
/**
3738
* A representation of any set of values over any amount of time. This the most basic building block
@@ -187,6 +188,7 @@ export class Observable<T> implements CoreOperators<T> {
187188
static range: typeof RangeObservable.create;
188189
static throw: typeof ErrorObservable.create;
189190
static timer: typeof TimerObservable.create;
191+
static webSocket: typeof WebSocketSubject.create;
190192
static zip: typeof zipStatic;
191193

192194
// core operators

src/Rx.DOM.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import './add/observable/timer';
2828
import './add/operator/zip-static';
2929

3030
import './add/observable/dom/ajax';
31+
import './add/observable/dom/webSocket';
3132

3233
//operators
3334
import './add/operator/buffer';

src/Subject.ts

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import {rxSubscriber} from './symbol/rxSubscriber';
88

99
export class Subject<T> extends Observable<T> implements Observer<T>, Subscription {
1010

11-
static create<T>(source: Observable<T>, destination: Observer<T>): Subject<T> {
11+
static create: Function = <T>(source: Observable<T>, destination: Observer<T>): Subject<T> => {
1212
return new Subject<T>(source, destination);
13-
}
13+
};
1414

1515
constructor(source?: Observable<T>, destination?: Observer<T>) {
1616
super();
@@ -127,60 +127,72 @@ export class Subject<T> extends Observable<T> implements Observer<T>, Subscripti
127127
if (this.destination) {
128128
this.destination.next(value);
129129
} else {
130-
let index = -1;
131-
const observers = this.observers.slice(0);
132-
const len = observers.length;
130+
this._finalNext(value);
131+
}
132+
}
133133

134-
while (++index < len) {
135-
observers[index].next(value);
136-
}
134+
protected _finalNext(value: T): void {
135+
let index = -1;
136+
const observers = this.observers.slice(0);
137+
const len = observers.length;
138+
139+
while (++index < len) {
140+
observers[index].next(value);
137141
}
138142
}
139143

140144
protected _error(err: any): void {
141145
if (this.destination) {
142146
this.destination.error(err);
143147
} else {
144-
let index = -1;
145-
const observers = this.observers;
146-
const len = observers.length;
148+
this._finalError(err);
149+
}
150+
}
147151

148-
// optimization to block our SubjectSubscriptions from
149-
// splicing themselves out of the observers list one by one.
150-
this.observers = null;
151-
this.isUnsubscribed = true;
152+
protected _finalError(err: any): void {
153+
let index = -1;
154+
const observers = this.observers;
155+
const len = observers.length;
152156

153-
while (++index < len) {
154-
observers[index].error(err);
155-
}
156-
157-
this.isUnsubscribed = false;
157+
// optimization to block our SubjectSubscriptions from
158+
// splicing themselves out of the observers list one by one.
159+
this.observers = null;
160+
this.isUnsubscribed = true;
158161

159-
this.unsubscribe();
162+
while (++index < len) {
163+
observers[index].error(err);
160164
}
165+
166+
this.isUnsubscribed = false;
167+
168+
this.unsubscribe();
161169
}
162170

163171
protected _complete(): void {
164172
if (this.destination) {
165173
this.destination.complete();
166174
} else {
167-
let index = -1;
168-
const observers = this.observers;
169-
const len = observers.length;
175+
this._finalComplete();
176+
}
177+
}
170178

171-
// optimization to block our SubjectSubscriptions from
172-
// splicing themselves out of the observers list one by one.
173-
this.observers = null;
174-
this.isUnsubscribed = true;
179+
protected _finalComplete(): void {
180+
let index = -1;
181+
const observers = this.observers;
182+
const len = observers.length;
175183

176-
while (++index < len) {
177-
observers[index].complete();
178-
}
179-
180-
this.isUnsubscribed = false;
184+
// optimization to block our SubjectSubscriptions from
185+
// splicing themselves out of the observers list one by one.
186+
this.observers = null;
187+
this.isUnsubscribed = true;
181188

182-
this.unsubscribe();
189+
while (++index < len) {
190+
observers[index].complete();
183191
}
192+
193+
this.isUnsubscribed = false;
194+
195+
this.unsubscribe();
184196
}
185197

186198
[rxSubscriber]() {

src/add/observable/dom/webSocket.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import {Observable} from '../../../Observable';
2+
import {WebSocketSubject} from '../../../observable/dom/webSocket';
3+
Observable.webSocket = WebSocketSubject.create;
4+
5+
export var _void: void;

0 commit comments

Comments
 (0)