Skip to content

Commit

Permalink
fix: streaming client should connect under Node.js (#4413)
Browse files Browse the repository at this point in the history
* Fix NodeWebSocket.connect

* Fix ESLint

* Update API

* Remove .only

* Clean up code related to watershed

* Fix ESLint

* Fix tests for Node.js 12

* Fix ESLint

* Add mocks

* Remove .only

* Use LF

* Detect older signature

* Fix ESLint

* serverAddress become serverAddressOrHostName

* Update API

* Clean up

* More clean up

* Clean up

* Leaving old code

* Switching branch

* Clean up

* Clean up

* Clean up

* Manual edit
  • Loading branch information
compulim authored Jan 25, 2023
1 parent fe88652 commit 736979c
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ export class NamedPipeServer implements IStreamingTransportServer {
export class NodeWebSocket implements ISocket {
constructor(wsSocket?: WebSocket_2);
close(code?: number, data?: string): void;
connect(serverAddress: string, port?: number): Promise<void>;
connect(serverAddressOrHostName: string, port?: number): Promise<void>;
create(req: INodeIncomingMessage, socket: INodeSocket, head: INodeBuffer): Promise<void>;
get isConnected(): boolean;
setOnCloseHandler(handler: (x: any) => void): void;
Expand Down
42 changes: 35 additions & 7 deletions libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
* Licensed under the MIT License.
*/

import * as crypto from 'crypto';
import { IncomingMessage, request } from 'http';
import { URL } from 'url';
import * as crypto from 'crypto';
import * as WebSocket from 'ws';

import { INodeIncomingMessage, INodeBuffer, INodeSocket, ISocket } from '../interfaces';

const NONCE_LENGTH = 16;

/**
Expand Down Expand Up @@ -74,23 +76,50 @@ export class NodeWebSocket implements ISocket {
/**
* Connects to the supporting socket using WebSocket protocol.
*
* @param serverAddress The address the server is listening on.
* @param port The port the server is listening on, defaults to 8082.
* @param serverAddressOrHostName The host name or URL the server is listening on.
* @param port If `serverAddressOrHostName` is a host name, the port the server is listening on, defaults to 8082. Otherwise, this argument is ignored.
* @returns A Promise that resolves when the websocket connection is closed, or rejects on an error.
*/
async connect(serverAddress: string, port = 8082): Promise<void> {
async connect(serverAddressOrHostName: string, port = 8082): Promise<void> {
let url: URL;

try {
url = new URL(serverAddressOrHostName);
// eslint-disable-next-line no-empty
} catch (_error) {}

if (url?.hostname) {
return new Promise<void>((resolve, reject) => {
const ws = (this.wsSocket = new WebSocket(url));

ws.once('error', ({ message }) => reject(new Error(message)));
ws.once('open', () => resolve());
});
}

// [hawo]: The following logics are kept here for backward compatibility.
//
// However, there are no tests to prove the following code works.
// We tried our best to write a test and figure out how the code would work.
//
// However, there are obvious mistakes in the code that made it very unlikely to work:
// - `options.headers.upgrade` must set to `'websocket'`
// - Second argument of `WebSocket.server.completeUpgrade` should be `{}`, instead of `undefined`
//
// More readings at https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#client_handshake_request.
this.wsServer = new WebSocket.Server({ noServer: true });
// Key generation per https://tools.ietf.org/html/rfc6455#section-1.3 (pg. 7)
const wskey = crypto.randomBytes(NONCE_LENGTH).toString('base64');
const options = {
port: port,
hostname: serverAddress,
hostname: serverAddressOrHostName,
headers: {
connection: 'upgrade',
'Sec-WebSocket-Key': wskey,
'Sec-WebSocket-Version': '13',
},
};

const req = request(options);
req.end();
req.on('upgrade', (res, socket, head): void => {
Expand All @@ -109,13 +138,12 @@ export class NodeWebSocket implements ISocket {
}

/**
* Set the handler for `'data'` and `'message'` events received on the socket.
* Set the handler for `'message'` events received on the socket.
*
* @param handler The callback to handle the "message" event.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
setOnMessageHandler(handler: (x: any) => void): void {
this.wsSocket.on('data', handler);
this.wsSocket.on('message', handler);
}

Expand Down
156 changes: 149 additions & 7 deletions libraries/botframework-streaming/tests/NodeWebSocket.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
const { NodeWebSocket } = require('../');
const { expect } = require('chai');
const { FauxSock, TestRequest } = require('./helpers');
const { FauxSock, FauxSocket, TestRequest, waitFor } = require('./helpers');
const { NodeWebSocket } = require('../');
const { randomBytes } = require('crypto');
const { Server } = require('ws');

const TEST_SERVER_PORT = 53978;

describe('NodeWebSocket', function () {
it('creates a new NodeWebSocket', function () {
Expand Down Expand Up @@ -33,11 +36,9 @@ describe('NodeWebSocket', function () {
it('can set message handlers on the socket', function () {
const sock = new FauxSock();
const socket = new NodeWebSocket(sock);
expect(sock.dataHandler).to.be.undefined;
expect(sock._messageHandler).to.be.undefined;
expect(sock.messageHandler).to.be.undefined;
expect(socket.setOnMessageHandler(() => {})).to.not.throw;
expect(sock.dataHandler).to.not.be.undefined;
expect(sock._messageHandler).to.not.be.undefined;
expect(sock.messageHandler).to.not.be.undefined;
});

it('can set error handler on the socket', function () {
Expand All @@ -57,7 +58,7 @@ describe('NodeWebSocket', function () {
});

it('create() should be successful and set a WebSocket', async function () {
const sock = new FauxSock();
const sock = new FauxSocket();
const nodeSocket = new NodeWebSocket();
const request = new TestRequest();

Expand All @@ -73,4 +74,145 @@ describe('NodeWebSocket', function () {

await nodeSocket.create(request, sock, Buffer.from([]));
});

describe('test against a WebSocket server', function () {
let serverEvents;
let server;
let serverSocket;

beforeEach(function () {
return new Promise((resolve) => {
server = new Server({ path: '/some-path', port: TEST_SERVER_PORT });
serverEvents = [];

server.on('connection', function (socket) {
serverEvents.push(['connection', socket]);

serverSocket = socket;
serverSocket.send('welcome');
serverSocket.on('message', function (data) {
serverEvents.push(['message', data]);
});
serverSocket.on('close', function (code, reason) {
serverEvents.push(['close', code, reason]);
});
});

server.on('listening', resolve);
});
});

afterEach(function () {
server.close();
});

describe('connect to a WebSocket server', function () {
let events;
let nodeSocket;

beforeEach(function () {
nodeSocket = new NodeWebSocket();
events = [];

const connectPromise = nodeSocket.connect(`ws://localhost:${server.address().port}/some-path`);

nodeSocket.setOnCloseHandler((code, reason) => events.push(['close', code, reason]));
nodeSocket.setOnErrorHandler((error) => events.push(['error', error]));
nodeSocket.setOnMessageHandler((data) => events.push(['message', data]));

return connectPromise;
});

afterEach(function () {
try {
nodeSocket.close();
// eslint-disable-next-line no-empty
} catch {}
});

it('should connect to the server', function () {
return waitFor(() => {
expect(serverEvents).to.have.lengthOf(1);
expect(serverEvents[0]).deep.to.equals(['connection', serverSocket]);
});
});

describe('isConnected property', function () {
it('should be true', function () {
expect(nodeSocket.isConnected).to.be.true;
});
});

it('should receive initial message', function () {
return waitFor(() => {
expect(events).to.have.lengthOf(1);
// Initial message from server is text, not Uint8Array.
expect(events[0]).deep.to.equals(['message', 'welcome']);
});
});

describe('after sending a binary message', function () {
beforeEach(function () {
nodeSocket.write(new TextEncoder().encode('morning'));
});

it('should send to the server', function () {
return waitFor(() => {
expect(serverEvents).to.have.lengthOf(2);
expect(new TextDecoder().decode(serverEvents[1][1])).to.equal('morning');
});
});
});

describe('after server send a binary message', function () {
beforeEach(function () {
serverSocket.send(new TextEncoder().encode('afternoon'));
});

it('should receive the message', function () {
return waitFor(() => {
expect(events).to.have.lengthOf(2);
expect(new TextDecoder().decode(events[1][1])).to.equal('afternoon');
});
});
});

it('should close with code and reason', function () {
nodeSocket.close(3000, 'bye');

return waitFor(() => {
expect(serverEvents).to.have.lengthOf(2);
expect(serverEvents[1]).deep.to.equals(['close', 3000, 'bye']);
});
});

describe('when server close connection', function () {
beforeEach(function () {
serverSocket.close(4999, 'goodnight');
});

it('should receive close event', function () {
return waitFor(() => {
expect(events).to.have.lengthOf(2);
expect(events[1]).deep.to.equals(['close', 4999, 'goodnight']);
});
});
});

describe('when server terminated the connection', function () {
beforeEach(function () {
serverSocket.terminate();
});

it('should receive close event with code 1006', function () {
return waitFor(() => {
expect(events).to.have.lengthOf(2);

// 1006 is "close outside of Web Socket".
expect(events[1]).deep.to.equals(['close', 1006, '']);
});
});
});
});
});
});
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
const { NodeWebSocket, NodeWebSocketFactory } = require('..');
const { FauxSock, TestRequest } = require('./helpers');
const { expect } = require('chai');
const { FauxSocket, TestRequest } = require('./helpers');
const { NodeWebSocket, NodeWebSocketFactory } = require('..');
const { randomBytes } = require('crypto');

describe('NodeWebSocketFactory', function () {
it('createWebSocket() should create a new NodeWebSocket', async function () {
const factory = new NodeWebSocketFactory();
const sock = new FauxSock();
const sock = new FauxSocket();
const request = new TestRequest();
request.setIsUpgradeRequest(true);
request.headers = [];
Expand Down
22 changes: 1 addition & 21 deletions libraries/botframework-streaming/tests/helpers/fauxSock.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,39 +74,19 @@ class FauxSock {
return this.exists;
}

/** WaterShed Socket Specific? */
destroy() {
return true;
}

/** WaterShed Socket Specific? */
removeAllListeners() {
return true;
}

on(action, handler) {
switch (action) {
case 'error':
this.errorHandler = handler;
break;
case 'data':
this.messageHandler = handler;
this.dataHandler = handler; // Required for `ws` WebSockets
break;
case 'close':
this.closeHandler = handler;
break;
case 'end':
this.endHandler = handler;
break;
case 'text':
this.textHandler = handler; // Required for `watershed` WebSockets
break;
case 'binary':
this.binaryHandler = handler; // Required for `watershed` WebSockets
break;
case 'message':
this._messageHandler = handler; // Required for `ws` WebSockets
this.messageHandler = handler; // Required for `ws` WebSockets
break;
default:
throw new Error(`TestError: Unknown action ("${action}") passed to FauxSock.on()`);
Expand Down
15 changes: 15 additions & 0 deletions libraries/botframework-streaming/tests/helpers/fauxSocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
const { Duplex } = require('stream');

module.exports.FauxSocket = class FauxSocket extends Duplex {
// eslint-disable-next-line no-empty
_read() {}

// eslint-disable-next-line no-empty
_write() {}

// eslint-disable-next-line no-empty
setNoDelay() {}

// eslint-disable-next-line no-empty
setTimeout() {}
};
6 changes: 6 additions & 0 deletions libraries/botframework-streaming/tests/helpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
*/

const { FauxSock } = require('./fauxSock');
const { FauxSocket } = require('./fauxSocket');
const { sleep } = require('./sleep');
const { TestRequest } = require('./testRequest');
const { waitFor } = require('./waitFor');

module.exports.FauxSock = FauxSock;
module.exports.FauxSocket = FauxSocket;
module.exports.sleep = sleep;
module.exports.TestRequest = TestRequest;
module.exports.waitFor = waitFor;
3 changes: 3 additions & 0 deletions libraries/botframework-streaming/tests/helpers/sleep.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module.exports.sleep = function sleep(durationInMS = 100) {
return new Promise((resolve) => setTimeout(resolve, durationInMS));
};
16 changes: 16 additions & 0 deletions libraries/botframework-streaming/tests/helpers/waitFor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const { sleep } = require('./sleep');

module.exports.waitFor = async function waitFor(callback, { interval = 50, timeout = 1000 } = {}) {
let lastError;

for (const startTime = Date.now(); Date.now() < startTime + timeout; ) {
try {
return callback();
} catch (error) {
lastError = error;
await sleep(interval);
}
}

throw lastError || new Error('timed out');
};

0 comments on commit 736979c

Please sign in to comment.