Skip to content

Commit

Permalink
fix(WebSocketSubject): use the supplied WebSockeCtor, and support sou…
Browse files Browse the repository at this point in the history
…rce/destination arguments in th

ReactiveX#1745, ReactiveX#1784
  • Loading branch information
trxcllnt committed Jun 28, 2016
1 parent 43d05e7 commit 3d04a11
Showing 1 changed file with 36 additions and 16 deletions.
52 changes: 36 additions & 16 deletions src/observable/dom/WebSocketSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {Subject, AnonymousSubject} from '../../Subject';
import {Subscriber} from '../../Subscriber';
import {Observable} from '../../Observable';
import {Subscription} from '../../Subscription';
import {Operator} from '../../Operator';
import {root} from '../../util/root';
import {ReplaySubject} from '../../ReplaySubject';
import {Observer, NextObserver} from '../../Observer';
Expand All @@ -25,14 +26,16 @@ export interface WebSocketSubjectConfig {
* @hide true
*/
export class WebSocketSubject<T> extends AnonymousSubject<T> {

url: string;
protocol: string|Array<string>;
socket: WebSocket;
openObserver: NextObserver<Event>;
closeObserver: NextObserver<CloseEvent>;
closingObserver: NextObserver<void>;
WebSocketCtor: { new(url: string, protocol?: string|Array<string>): WebSocket };
private _output: Subject<T> = new Subject<T>();

private _output: Subject<T>;

resultSelector(e: MessageEvent) {
return JSON.parse(e.data);
Expand All @@ -50,21 +53,29 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
}

constructor(urlConfigOrSource: string | WebSocketSubjectConfig | Observable<T>, destination?: Observer<T>) {
super();
this.WebSocketCtor = root.WebSocket;

if (typeof urlConfigOrSource === 'string') {
this.url = urlConfigOrSource;
if (urlConfigOrSource instanceof Observable) {
super(destination, <Observable<T>> urlConfigOrSource);
} else {
// WARNING: config object could override important members here.
assign(this, urlConfigOrSource);
}

if (!this.WebSocketCtor) {
throw new Error('no WebSocket constructor can be found');
super();
this.WebSocketCtor = root.WebSocket;
this._output = new Subject<T>();
if (typeof urlConfigOrSource === 'string') {
this.url = urlConfigOrSource;
} else {
// WARNING: config object could override important members here.
assign(this, urlConfigOrSource);
}
if (!this.WebSocketCtor) {
throw new Error('no WebSocket constructor can be found');
}
this.destination = new ReplaySubject();
}
}

this.destination = new ReplaySubject();
lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
const sock = new WebSocketSubject<R>(this, <any> this.destination);
sock.operator = operator;
return sock;
}

// TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures
Expand Down Expand Up @@ -102,7 +113,10 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
}

private _connectSocket() {
const socket = this.protocol ? new WebSocket(this.url, this.protocol) : new WebSocket(this.url);
const { WebSocketCtor } = this;
const socket = this.protocol ?
new WebSocketCtor(this.url, this.protocol) :
new WebSocketCtor(this.url);
this.socket = socket;
const subscription = new Subscription(() => {
this.socket = null;
Expand Down Expand Up @@ -178,6 +192,10 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
}

protected _subscribe(subscriber: Subscriber<T>): Subscription {
const { source } = this;
if (source) {
return source.subscribe(subscriber);
}
if (!this.socket) {
this._connectSocket();
}
Expand All @@ -194,12 +212,14 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
}

unsubscribe() {
const { socket } = this;
const { source, socket } = this;
if (socket && socket.readyState === 1) {
socket.close();
this.socket = null;
}
super.unsubscribe();
this.destination = new ReplaySubject();
if (!source) {
this.destination = new ReplaySubject();
}
}
}

0 comments on commit 3d04a11

Please sign in to comment.