From 904d61780ab0c9398703e6c7bfcb2ba8776933a5 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 12 Jan 2016 15:57:55 -0800 Subject: [PATCH] feat(multiplex): add multiplex operator to WebSocketSubject - fix to ensure expected web socket close behavior --- spec/observables/dom/webSocket-spec.js | 39 +++++++++++++ src/observable/dom/webSocket.ts | 76 +++++++++++--------------- 2 files changed, 72 insertions(+), 43 deletions(-) diff --git a/spec/observables/dom/webSocket-spec.js b/spec/observables/dom/webSocket-spec.js index b39736c3ad..81b8185455 100644 --- a/spec/observables/dom/webSocket-spec.js +++ b/spec/observables/dom/webSocket-spec.js @@ -303,6 +303,45 @@ describe('Observable.webSocket', function () { expect(closes[1]).toBe(expected[1]); }); }); + + describe('multiplex', function () { + it('should multiplex over the websocket', function () { + var results = []; + var subject = Observable.webSocket('ws://websocket'); + var source = subject.multiplex(function () { + return { sub: 'foo'}; + }, function () { + return { unsub: 'foo' }; + }, function (value) { + return value.name === 'foo'; + }); + + var sub = source.subscribe(function (x) { + results.push(x.value); + }); + var socket = MockWebSocket.lastSocket(); + socket.open(); + + expect(socket.lastMessageSent()).toEqual({ sub: 'foo' }); + + [1, 2, 3, 4, 5].map(function (x) { + return { + name: x % 3 === 0 ? 'bar' : 'foo', + value: x + }; + }).forEach(function (x) { + socket.triggerMessage(JSON.stringify(x)); + }); + + expect(results).toEqual([1, 2, 4, 5]); + + spyOn(socket, 'close').and.callThrough(); + sub.unsubscribe(); + expect(socket.lastMessageSent()).toEqual({ unsub: 'foo' }); + + expect(socket.close).toHaveBeenCalled(); + }); + }); }); var sockets = []; diff --git a/src/observable/dom/webSocket.ts b/src/observable/dom/webSocket.ts index 3b5902b7d6..ccee0b59da 100644 --- a/src/observable/dom/webSocket.ts +++ b/src/observable/dom/webSocket.ts @@ -65,8 +65,38 @@ export class WebSocketSubject extends Subject { return sock; } - multiplex(subMsg: any, unsubMsg: any, messageFilter: (value: T) => boolean) { - return this.lift(new MultiplexOperator(this, subMsg, unsubMsg, messageFilter)); + // TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures + multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) { + const self = this; + return new Observable(observer => { + const result = tryCatch(subMsg)(); + if (result === errorObject) { + observer.error(errorObject.e); + } else { + self.next(result); + } + + const subscription = self.subscribe(x => { + const result = tryCatch(messageFilter)(x); + if (result === errorObject) { + observer.error(errorObject.e); + } else if (result) { + observer.next(x); + } + }, + err => observer.error(err), + () => observer.complete()); + + return () => { + const result = tryCatch(unsubMsg)(); + if (result === errorObject) { + observer.error(errorObject.e); + } else { + self.next(result); + } + subscription.unsubscribe(); + }; + }); } _unsubscribe() { @@ -157,12 +187,11 @@ export class WebSocketSubject extends Subject { self._finalNext(result); } }; - return subscription; } return new Subscription(() => { subscription.unsubscribe(); - if (this.observers.length === 0) { + if (!this.observers || this.observers.length === 0) { const { socket } = this; if (socket && socket.readyState < 2) { socket.close(); @@ -173,43 +202,4 @@ export class WebSocketSubject extends Subject { } }); } -} - -export class MultiplexOperator implements Operator { - constructor(private socketSubject: WebSocketSubject, - private subscribeMessage: any, - private unsubscribeMessage, - private messageFilter: (data: any) => R) { - // noop - } - - call(subscriber: Subscriber) { - return new MultiplexSubscriber(subscriber, this.socketSubject, this.subscribeMessage, this.unsubscribeMessage, this.messageFilter); - } -} - -export class MultiplexSubscriber extends Subscriber { - constructor(destination: Observer, - private socketSubject: WebSocketSubject, - private subscribeMessage: any, - private unsubscribeMessage: any, - private messageFilter: (data: any) => T) { - super(destination); - - socketSubject.next(subscribeMessage); - } - - next(value: any) { - const pass = tryCatch(this.messageFilter)(value); - if (pass === errorObject) { - this.destination.error(errorObject.e); - } else if (pass) { - this.destination.next(value); - } - } - - unsubscribe() { - this.socketSubject.next(this.unsubscribeMessage); - super.unsubscribe(); - } } \ No newline at end of file