Skip to content

Commit

Permalink
fix(WebSocketSubject): resultSelector and protocols specifications wo…
Browse files Browse the repository at this point in the history
…rk properly

- resultSelector errors will now be sent down the error path
- protocol can now be specified properly with a config object argument
  • Loading branch information
benlesh committed Jan 13, 2016
1 parent 3b1655e commit 580f69a
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 18 deletions.
178 changes: 162 additions & 16 deletions spec/observables/dom/webSocket-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
var Rx = require('../../../dist/cjs/Rx.DOM');
var Observable = Rx.Observable;

function noop() {
// nope.
}

describe('Observable.webSocket', function () {
beforeEach(function () {
setupMockWebSocket();
Expand All @@ -32,7 +28,7 @@ describe('Observable.webSocket', function () {
socket.open();
expect(socket.lastMessageSent()).toBe('ping');

socket.triggerMessage('pong');
socket.triggerMessage(JSON.stringify('pong'));
expect(messageReceived).toBe(true);
});

Expand All @@ -50,7 +46,7 @@ describe('Observable.webSocket', function () {
socket.open();

expected.forEach(function (x) {
socket.triggerMessage(x);
socket.triggerMessage(JSON.stringify(x));
});

expect(results).toEqual(expected);
Expand Down Expand Up @@ -148,6 +144,165 @@ describe('Observable.webSocket', function () {
expect(socket2).not.toBe(socket1);
expect(socket2.lastMessageSent()).toBe('yo-ho! yo-ho!');
});

it('should have a default resultSelector that parses message data as JSON', function () {
var result;
var expected = { mork: 'shazbot!' };
var subject = Observable.webSocket('ws://mysocket');

subject.subscribe(function (x) {
result = x;
});

var socket = MockWebSocket.lastSocket();
socket.open();
socket.triggerMessage(JSON.stringify(expected));

expect(result).toEqual(expected);
});

describe('with a config object', function () {
it('should send and receive messages', function () {
var messageReceived = false;
var subject = Observable.webSocket({ url: 'ws://mysocket' });

subject.next('ping');

subject.subscribe(function (x) {
expect(x).toBe('pong');
messageReceived = true;
});

var socket = MockWebSocket.lastSocket();
expect(socket.url).toBe('ws://mysocket');

socket.open();
expect(socket.lastMessageSent()).toBe('ping');

socket.triggerMessage(JSON.stringify('pong'));
expect(messageReceived).toBe(true);
});

it('should take a protocol and set it properly on the web socket', function () {
var subject = Observable.webSocket({
url: 'ws://mysocket',
protocol: 'someprotocol'
});

subject.subscribe();

var socket = MockWebSocket.lastSocket();
expect(socket.protocol).toBe('someprotocol');
});

it('should take a resultSelector', function () {
var results = [];

var subject = Observable.webSocket({
url: 'ws://mysocket',
resultSelector: function (e) {
return e.data + '!';
}
});

subject.subscribe(function (x) {
results.push(x);
});

var socket = MockWebSocket.lastSocket();
socket.open();
['ahoy', 'yarr', 'shove off'].forEach(function (x) {
socket.triggerMessage(x);
});

expect(results).toEqual(['ahoy!', 'yarr!', 'shove off!']);
});

it('if the resultSelector fails it should go down the error path', function () {

var subject = Observable.webSocket({
url: 'ws://mysocket',
resultSelector: function (e) {
throw new Error('I am a bad error');
}
});

subject.subscribe(function (x) {
expect(x).toBe('this should not happen');
}, function (err) {
expect(err).toEqual(new Error('I am a bad error'));
});

var socket = MockWebSocket.lastSocket();
socket.open();
socket.triggerMessage('weee!');
});

it('should accept a closingObserver', function () {
var calls = 0;
var subject = Observable.webSocket({
url: 'ws://mysocket',
closingObserver: {
next: function (x) {
calls++;
expect(x).toBe(undefined);
}
}
});

subject.subscribe();
var socket = MockWebSocket.lastSocket();
socket.open();

expect(calls).toBe(0);

subject.complete();
expect(calls).toBe(1);

subject.subscribe();
socket = MockWebSocket.lastSocket();
socket.open();

subject.error({ code: 1337 });
expect(calls).toBe(2);
});

it('should accept a closeObserver', function () {
var expected = [{ wasClean: true }, { wasClean: false }];
var closes = [];
var subject = Observable.webSocket({
url: 'ws://mysocket',
closeObserver: {
next: function (e) {
closes.push(e);
}
}
});

subject.subscribe();
var socket = MockWebSocket.lastSocket();
socket.open();

expect(closes.length).toBe(0);

socket.triggerClose(expected[0]);
expect(closes.length).toBe(1);


subject.subscribe(null, function (err) {
expect(err).toBe(expected[1]);
});

socket = MockWebSocket.lastSocket();
socket.open();

socket.triggerClose(expected[1]);
expect(closes.length).toBe(2);

expect(closes[0]).toBe(expected[0]);
expect(closes[1]).toBe(expected[1]);
});
});
});

var sockets = [];
Expand Down Expand Up @@ -175,15 +330,6 @@ MockWebSocket.prototype = {
return sent.length > 0 ? sent[sent.length - 1] : undefined;
},

closeDirty: function (code, reason) {
if (this.readyState < 2) {
this.readyState = 2;
this.closeCode = code;
this.closeReason = reason;
this.triggerClose({ wasClean: false });
}
},

triggerClose: function (e) {
this.readyState = 3;
this.trigger('close', e);
Expand All @@ -196,7 +342,7 @@ MockWebSocket.prototype = {

triggerMessage: function (data) {
var messageEvent = {
data: JSON.stringify(data),
data: data,
origin: 'mockorigin',
ports: undefined,
source: __root__,
Expand Down
4 changes: 2 additions & 2 deletions src/observable/dom/webSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class WebSocketSubject<T> extends Subject<T> {
}

if (self.url && !self.socket) {
const socket = new WebSocket(self.url);
const socket = self.protocol ? new WebSocket(self.url, self.protocol) : new WebSocket(self.url);
self.socket = socket;

socket.onopen = (e) => {
Expand Down Expand Up @@ -151,7 +151,7 @@ export class WebSocketSubject<T> extends Subject<T> {

socket.onmessage = (e: MessageEvent) => {
const result = tryCatch(self.resultSelector)(e);
if (result === errorObject.e) {
if (result === errorObject) {
self._finalError(errorObject.e);
} else {
self._finalNext(result);
Expand Down

0 comments on commit 580f69a

Please sign in to comment.