Skip to content

Commit

Permalink
fix(WebSocketSubject): handle slow WebSocket close
Browse files Browse the repository at this point in the history
The close event on WebSocket does not always occur immediately after
running the close function. If the close event is occurs after a new
WebSocket is opened, then the reset needs to be skipped. This checks to
make sure the socket being reset is the one that matches the event.

Closes ReactiveX#4650, ReactiveX#3935
  • Loading branch information
maknapp committed Dec 6, 2021
1 parent 0a64078 commit c0b5071
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 14 deletions.
64 changes: 51 additions & 13 deletions spec/observables/dom/webSocket-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ const root: any = (typeof globalThis !== 'undefined' && globalThis)
|| (typeof self !== 'undefined' && self)
|| global;

enum WebSocketState {
CONNECTING = 0,
OPEN = 1,
CLOSING = 2,
CLOSED = 3
}

/** @test {webSocket} */
describe('webSocket', () => {
let __ws: any;
Expand Down Expand Up @@ -131,15 +138,18 @@ describe('webSocket', () => {
const socket = MockWebSocket.lastSocket;
socket.open();

expect(socket.readyState).to.equal(1); // open
expect(socket.readyState).to.equal(WebSocketState.OPEN);

sinon.spy(socket, 'close');

expect(socket.close).not.have.been.called;

subject.complete();
expect(socket.close).have.been.called;
expect(socket.readyState).to.equal(3); // closed
expect(socket.readyState).to.equal(WebSocketState.CLOSING);

socket.triggerClose({ wasClean: true });
expect(socket.readyState).to.equal(WebSocketState.CLOSED);

subject.unsubscribe();
(<any>socket.close).restore();
Expand All @@ -154,7 +164,7 @@ describe('webSocket', () => {
socket.open();

expect(socket.close).have.been.called;
expect(socket.readyState).to.equal(3); // closed
expect(socket.readyState).to.equal(WebSocketState.CLOSING);

(<any>socket.close).restore();
});
Expand All @@ -168,7 +178,7 @@ describe('webSocket', () => {
socket.open();

expect(socket.close).have.been.called;
expect(socket.readyState).to.equal(3); // closed
expect(socket.readyState).to.equal(WebSocketState.CLOSING);

(<any>socket.close).restore();
});
Expand All @@ -181,7 +191,7 @@ describe('webSocket', () => {
subject.unsubscribe();

expect(socket.close).have.been.called;
expect(socket.readyState).to.equal(3); // closed
expect(socket.readyState).to.equal(WebSocketState.CLOSING);

(<any>socket.close).restore();
});
Expand All @@ -194,7 +204,36 @@ describe('webSocket', () => {
subscription.unsubscribe();

expect(socket.close).have.been.called;
expect(socket.readyState).to.equal(3); // closed
expect(socket.readyState).to.equal(WebSocketState.CLOSING);

(<any>socket.close).restore();
});

it('should close a socket that opens before the previous socket has closed', () => {
const subject = webSocket<string>('ws://mysocket');
const subscription = subject.subscribe();
const socket = MockWebSocket.lastSocket;
sinon.spy(socket, 'close');
subscription.unsubscribe();

expect(socket.close).have.been.called;
expect(socket.readyState).to.equal(WebSocketState.CLOSING);

const subscription2 = subject.subscribe();
const socket2 = MockWebSocket.lastSocket;
sinon.spy(socket2, 'close');

// Close socket after socket2 has opened
socket2.open();
expect(socket2.readyState).to.equal(WebSocketState.OPEN);
socket.triggerClose({wasClean: true});

expect(socket.readyState).to.equal(WebSocketState.CLOSED);
expect(socket2.close).have.not.been.called;

subscription2.unsubscribe();
expect(socket2.close).have.been.called;
expect(socket2.readyState).to.equal(WebSocketState.CLOSING);

(<any>socket.close).restore();
});
Expand Down Expand Up @@ -747,7 +786,7 @@ class MockWebSocket {

sent: string[] = [];
handlers: any = {};
readyState: number = 0;
readyState: WebSocketState = WebSocketState.CONNECTING;
closeCode: any;
closeReason: any;
binaryType?: string;
Expand All @@ -766,8 +805,8 @@ class MockWebSocket {
return length > 0 ? sent[length - 1] : undefined!;
}

triggerClose(e: any): void {
this.readyState = 3;
triggerClose(e: Partial<CloseEvent>): void {
this.readyState = WebSocketState.CLOSED;
this.trigger('close', e);
}

Expand All @@ -783,16 +822,15 @@ class MockWebSocket {
}

open(): void {
this.readyState = 1;
this.readyState = WebSocketState.OPEN;
this.trigger('open', {});
}

close(code: any, reason: any): void {
if (this.readyState < 2) {
this.readyState = 2;
if (this.readyState < WebSocketState.CLOSING) {
this.readyState = WebSocketState.CLOSING;
this.closeCode = code;
this.closeReason = reason;
this.triggerClose({ wasClean: true });
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/internal/observable/dom/WebSocketSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
};

socket.onclose = (e: CloseEvent) => {
this._resetState();
if (socket === this._socket) {
this._resetState();
}
const { closeObserver } = this._config;
if (closeObserver) {
closeObserver.next(e);
Expand Down

0 comments on commit c0b5071

Please sign in to comment.