From 58cd8063a8f4db1be482b21610b4da2999a46096 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 11 Jan 2016 11:38:13 -0800 Subject: [PATCH] feat(WebSocketSubject): add basic WebSocketSubject implementation - adds `_finalComplete`, `_finalError`, and `_finalNext` protected methods to Subject. This is to allow explicit calls to send to final destination observers, since the destination and the final set of observers can differ in implementations like WebSocketSubject --- spec/observables/dom/webSocket-spec.js | 139 ++++++++++++++++++ src/Observable.ts | 2 + src/Rx.DOM.ts | 1 + src/Subject.ts | 80 +++++----- src/add/observable/dom/webSocket.ts | 5 + src/observable/dom/webSocket.ts | 195 +++++++++++++++++++++++++ 6 files changed, 388 insertions(+), 34 deletions(-) create mode 100644 spec/observables/dom/webSocket-spec.js create mode 100644 src/add/observable/dom/webSocket.ts create mode 100644 src/observable/dom/webSocket.ts diff --git a/spec/observables/dom/webSocket-spec.js b/spec/observables/dom/webSocket-spec.js new file mode 100644 index 0000000000..9f5ebca8b3 --- /dev/null +++ b/spec/observables/dom/webSocket-spec.js @@ -0,0 +1,139 @@ +/* globals describe, it, expect, sinon, rxTestScheduler */ +var Rx = require('../../../dist/cjs/Rx.DOM'); +var Observable = Rx.Observable; + +function noop() { + // nope. +} + +describe('Observable.webSocket', function () { + beforeEach(function () { + setupMockWebSocket(); + }); + + afterEach(function () { + teardownMockWebSocket(); + }); + + it ('should send a message', function () { + var messageReceived = false; + var subject = Observable.webSocket('ws://mysocket'); + + subject.next('ping'); + + subject.subscribe(function (x) { + expect(x).toBe('pong'); + messageReceived = true; + }); + + var socket = MockWebSocket.lastSocket(); + + socket.open(); + expect(socket.lastMessageSent()).toBe('ping'); + + socket.triggerMessage('pong'); + expect(messageReceived).toBe(true); + }); +}); + +var sockets = []; + +function MockWebSocket(url, protocol) { + sockets.push(this); + this.url = url; + this.protocol = protocol; + this.sent = []; + this.handlers = {}; + this.readyState = 1; +} + +MockWebSocket.lastSocket = function () { + return sockets.length > 0 ? sockets[sockets.length - 1] : undefined; +}; + +MockWebSocket.prototype = { + send: function (data) { + this.sent.push(data); + }, + + lastMessageSent: function () { + var sent = this.sent; + return sent.length > 0 ? sent[sent.length - 1] : undefined; + }, + + triggerClose: function (e) { + this.readyState = 3; + this.trigger('close', e); + }, + + triggerError: function (err) { + this.readyState = 3; + this.trigger('error', err); + }, + + triggerMessage: function (data) { + var messageEvent = { + data: JSON.stringify(data), + origin: 'mockorigin', + ports: undefined, + source: __root__, + }; + + this.trigger('message', messageEvent); + }, + + open: function () { + this.readyState = 1; + this.trigger('open', {}); + }, + + close: function (code, reason) { + if (this.readyState < 2) { + this.readyState = 2; + this.closeCode = code; + this.closeReason = reason; + this.triggerClose(); + } + }, + + addEventListener: function (name, handler) { + var lookup = this.handlers[name] = this.handlers[name] || []; + lookup.push(handler); + }, + + removeEventListener: function (name, handler) { + var lookup = this.handlers[name]; + if (lookup) { + for (var i = lookup.length - 1; i--;) { + if (lookup[i] === handler) { + lookup.splice(i, 1); + } + } + } + }, + + trigger: function (name, e) { + if (this['on' + name]) { + this['on' + name](e); + } + + var lookup = this.handlers[name]; + if (lookup) { + for (var i = 0; i < lookup.length; i++) { + lookup[i](e); + } + } + } +} + +var __ws; +function setupMockWebSocket() { + sockets = []; + __ws = __root__.WebSocket; + __root__.WebSocket = MockWebSocket; +} + +function teardownMockWebSocket() { + __root__.WebSocket = __ws; + sockets = null; +} \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index a7bec2a5c6..c611f955c9 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -32,6 +32,7 @@ import {RangeObservable} from './observable/range'; import {InfiniteObservable} from './observable/never'; import {ErrorObservable} from './observable/throw'; import {AjaxCreationMethod} from './observable/dom/ajax'; +import {WebSocketSubject} from './observable/dom/webSocket'; /** * A representation of any set of values over any amount of time. This the most basic building block @@ -187,6 +188,7 @@ export class Observable implements CoreOperators { static range: typeof RangeObservable.create; static throw: typeof ErrorObservable.create; static timer: typeof TimerObservable.create; + static webSocket: typeof WebSocketSubject.create; static zip: typeof zipStatic; // core operators diff --git a/src/Rx.DOM.ts b/src/Rx.DOM.ts index a06c969b53..4bca9733d1 100644 --- a/src/Rx.DOM.ts +++ b/src/Rx.DOM.ts @@ -28,6 +28,7 @@ import './add/observable/timer'; import './add/operator/zip-static'; import './add/observable/dom/ajax'; +import './add/observable/dom/webSocket'; //operators import './add/operator/buffer'; diff --git a/src/Subject.ts b/src/Subject.ts index de0ce47bc0..ebc817ee34 100644 --- a/src/Subject.ts +++ b/src/Subject.ts @@ -8,9 +8,9 @@ import {rxSubscriber} from './symbol/rxSubscriber'; export class Subject extends Observable implements Observer, Subscription { - static create(source: Observable, destination: Observer): Subject { + static create: Function = (source: Observable, destination: Observer): Subject => { return new Subject(source, destination); - } + }; constructor(source?: Observable, destination?: Observer) { super(); @@ -127,13 +127,17 @@ export class Subject extends Observable implements Observer, Subscripti if (this.destination) { this.destination.next(value); } else { - let index = -1; - const observers = this.observers.slice(0); - const len = observers.length; + this._finalNext(value); + } + } - while (++index < len) { - observers[index].next(value); - } + protected _finalNext(value: T): void { + let index = -1; + const observers = this.observers.slice(0); + const len = observers.length; + + while (++index < len) { + observers[index].next(value); } } @@ -141,46 +145,54 @@ export class Subject extends Observable implements Observer, Subscripti if (this.destination) { this.destination.error(err); } else { - let index = -1; - const observers = this.observers; - const len = observers.length; + this._finalError(err); + } + } - // optimization to block our SubjectSubscriptions from - // splicing themselves out of the observers list one by one. - this.observers = null; - this.isUnsubscribed = true; + protected _finalError(err: any): void { + let index = -1; + const observers = this.observers; + const len = observers.length; - while (++index < len) { - observers[index].error(err); - } - - this.isUnsubscribed = false; + // optimization to block our SubjectSubscriptions from + // splicing themselves out of the observers list one by one. + this.observers = null; + this.isUnsubscribed = true; - this.unsubscribe(); + while (++index < len) { + observers[index].error(err); } + + this.isUnsubscribed = false; + + this.unsubscribe(); } protected _complete(): void { if (this.destination) { this.destination.complete(); } else { - let index = -1; - const observers = this.observers; - const len = observers.length; + this._finalComplete(); + } + } - // optimization to block our SubjectSubscriptions from - // splicing themselves out of the observers list one by one. - this.observers = null; - this.isUnsubscribed = true; + protected _finalComplete(): void { + let index = -1; + const observers = this.observers; + const len = observers.length; - while (++index < len) { - observers[index].complete(); - } - - this.isUnsubscribed = false; + // optimization to block our SubjectSubscriptions from + // splicing themselves out of the observers list one by one. + this.observers = null; + this.isUnsubscribed = true; - this.unsubscribe(); + while (++index < len) { + observers[index].complete(); } + + this.isUnsubscribed = false; + + this.unsubscribe(); } [rxSubscriber]() { diff --git a/src/add/observable/dom/webSocket.ts b/src/add/observable/dom/webSocket.ts new file mode 100644 index 0000000000..993f0c0a4e --- /dev/null +++ b/src/add/observable/dom/webSocket.ts @@ -0,0 +1,5 @@ +import {Observable} from '../../../Observable'; +import {WebSocketSubject} from '../../../observable/dom/webSocket'; +Observable.webSocket = WebSocketSubject.create; + +export var _void: void; diff --git a/src/observable/dom/webSocket.ts b/src/observable/dom/webSocket.ts new file mode 100644 index 0000000000..26a85211dc --- /dev/null +++ b/src/observable/dom/webSocket.ts @@ -0,0 +1,195 @@ +import {Subject} from '../../Subject'; +import {Subscriber} from '../../Subscriber'; +import {Observable} from '../../Observable'; +import {Subscription} from '../../Subscription'; +import {root} from '../../util/root'; +import {ReplaySubject} from '../../subject/ReplaySubject'; +import {Observer} from '../../Observer'; +import {tryCatch} from '../../util/tryCatch'; +import {errorObject} from '../../util/errorObject'; +import {Operator} from '../../Operator'; +import {assign} from '../../util/assign'; + +export interface WebSocketSubjectConfig { + url: string; + protocol?: string | Array; + resultSelector?: (e: MessageEvent) => T; + openObserver?: Observer; + closeObserver?: Observer; + closingObserver?: Observer; + WebSocketCtor?: { new(url: string, protocol?: string|Array): WebSocket }; +} + +export class WebSocketSubject extends Subject { + url: string; + protocol: string|Array; + socket: WebSocket; + openObserver: Observer; + closeObserver: Observer; + closingObserver: Observer; + WebSocketCtor: { new(url: string, protocol?: string|Array)}; + + resultSelector(e: MessageEvent) { + return JSON.parse(e.data); + } + + static create(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject { + return new WebSocketSubject(urlConfigOrSource); + } + + constructor(urlConfigOrSource: string | WebSocketSubjectConfig | Observable, destination?: Observer) { + if (urlConfigOrSource instanceof Observable) { + super(urlConfigOrSource, destination); + } else { + super(); + this.WebSocketCtor = root.WebSocket; + + if (typeof urlConfigOrSource === 'string') { + this.url = urlConfigOrSource; + } else { + // WARNING: config object could override important members here. + assign(this, urlConfigOrSource); + } + + if (!this.WebSocketCtor) { + throw new Error('no WebSocket constructor can be found'); + } + + this.destination = new ReplaySubject(); + } + } + + lift(operator) { + const sock = new WebSocketSubject(this, this.destination); + sock.operator = operator; + return sock; + } + + multiplex(subMsg: any, unsubMsg: any, messageFilter: (value: T) => boolean) { + return this.lift(new MultiplexOperator(this, subMsg, unsubMsg, messageFilter)); + } + + _unsubscribe() { + this.source = null; + this.isStopped = false; + this.observers = null; + this.isUnsubscribed = false; + } + + _subscribe(subscriber: Subscriber) { + const subscription = super._subscribe(subscriber); + // HACK: For some reason transpilation wasn't honoring this in arrow functions below + // Doesn't seem right, need to reinvestigate. + const self = this; + + if (self.source || !subscription || (subscription).isUnsubscribed) { + return subscription; + } + + if (self.url && !self.socket) { + const socket = new WebSocket(self.url); + self.socket = socket; + + socket.onopen = (e) => { + const openObserver = self.openObserver; + if (openObserver) { + openObserver.next(e); + } + + const queue = self.destination; + + self.destination = Subscriber.create( + (x) => socket.readyState === 1 && socket.send(x), + (e) => socket.close(e), + ( ) => { + const closingObserver = self.closingObserver; + if (closingObserver) { + closingObserver.next(undefined); + } + socket.close(); + } + ); + + if (queue && queue instanceof ReplaySubject) { + subscription.add((>queue).subscribe(self.destination)); + } + }; + + socket.onerror = (e) => self.error(e); + + socket.onclose = (e: CloseEvent) => { + const closeObserver = self.closeObserver; + if (closeObserver) { + closeObserver.next(e); + } + if (e.wasClean) { + self._finalComplete(); + } else { + self._finalError(e); + } + }; + + socket.onmessage = (e: MessageEvent) => { + const result = tryCatch(self.resultSelector)(e); + if (result === errorObject.e) { + self._finalError(errorObject.e); + } else { + self._finalNext(result); + } + }; + return subscription; + } + + return new Subscription(() => { + subscription.unsubscribe(); + if (this.observers.length === 0) { + const { socket } = this; + if (socket && socket.readyState < 2) { + socket.close(); + } + this.socket = undefined; + this.source = undefined; + this.destination = new ReplaySubject(); + } + }); + } +} + +export class MultiplexOperator implements Operator { + constructor(private socketSubject: WebSocketSubject, + private subscribeMessage: any, + private unsubscribeMessage, + private messageFilter: (data: any) => R) { + // noop + } + + call(subscriber: Subscriber) { + return new MultiplexSubscriber(subscriber, this.socketSubject, this.subscribeMessage, this.unsubscribeMessage, this.messageFilter); + } +} + +export class MultiplexSubscriber extends Subscriber { + constructor(destination: Observer, + private socketSubject: WebSocketSubject, + private subscribeMessage: any, + private unsubscribeMessage: any, + private messageFilter: (data: any) => T) { + super(destination); + + socketSubject.next(subscribeMessage); + } + + next(value: any) { + const pass = tryCatch(this.messageFilter)(value); + if (pass === errorObject) { + this.destination.error(errorObject.e); + } else if (pass) { + this.destination.next(value); + } + } + + unsubscribe() { + this.socketSubject.next(this.unsubscribeMessage); + super.unsubscribe(); + } +} \ No newline at end of file