From 736979c95deaa8cc1c55709a34d4d15bc314a9f6 Mon Sep 17 00:00:00 2001 From: William Wong Date: Wed, 25 Jan 2023 10:26:34 -0800 Subject: [PATCH] fix: streaming client should connect under Node.js (#4413) * Fix NodeWebSocket.connect * Fix ESLint * Update API * Remove .only * Clean up code related to watershed * Fix ESLint * Fix tests for Node.js 12 * Fix ESLint * Add mocks * Remove .only * Use LF * Detect older signature * Fix ESLint * serverAddress become serverAddressOrHostName * Update API * Clean up * More clean up * Clean up * Leaving old code * Switching branch * Clean up * Clean up * Clean up * Manual edit --- .../etc/botframework-streaming.api.md | 2 +- .../src/webSocket/nodeWebSocket.ts | 42 ++++- .../tests/NodeWebSocket.test.js | 156 +++++++++++++++++- .../tests/NodeWebSocketFactory.test.js | 6 +- .../tests/helpers/fauxSock.js | 22 +-- .../tests/helpers/fauxSocket.js | 15 ++ .../tests/helpers/index.js | 6 + .../tests/helpers/sleep.js | 3 + .../tests/helpers/waitFor.js | 16 ++ 9 files changed, 229 insertions(+), 39 deletions(-) create mode 100644 libraries/botframework-streaming/tests/helpers/fauxSocket.js create mode 100644 libraries/botframework-streaming/tests/helpers/sleep.js create mode 100644 libraries/botframework-streaming/tests/helpers/waitFor.js diff --git a/libraries/botframework-streaming/etc/botframework-streaming.api.md b/libraries/botframework-streaming/etc/botframework-streaming.api.md index 589a085363..847524e178 100644 --- a/libraries/botframework-streaming/etc/botframework-streaming.api.md +++ b/libraries/botframework-streaming/etc/botframework-streaming.api.md @@ -445,7 +445,7 @@ export class NamedPipeServer implements IStreamingTransportServer { export class NodeWebSocket implements ISocket { constructor(wsSocket?: WebSocket_2); close(code?: number, data?: string): void; - connect(serverAddress: string, port?: number): Promise; + connect(serverAddressOrHostName: string, port?: number): Promise; create(req: INodeIncomingMessage, socket: INodeSocket, head: INodeBuffer): Promise; get isConnected(): boolean; setOnCloseHandler(handler: (x: any) => void): void; diff --git a/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts b/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts index 031a733270..b0fea76a3b 100644 --- a/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts +++ b/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts @@ -6,11 +6,13 @@ * Licensed under the MIT License. */ -import * as crypto from 'crypto'; import { IncomingMessage, request } from 'http'; +import { URL } from 'url'; +import * as crypto from 'crypto'; import * as WebSocket from 'ws'; import { INodeIncomingMessage, INodeBuffer, INodeSocket, ISocket } from '../interfaces'; + const NONCE_LENGTH = 16; /** @@ -74,23 +76,50 @@ export class NodeWebSocket implements ISocket { /** * Connects to the supporting socket using WebSocket protocol. * - * @param serverAddress The address the server is listening on. - * @param port The port the server is listening on, defaults to 8082. + * @param serverAddressOrHostName The host name or URL the server is listening on. + * @param port If `serverAddressOrHostName` is a host name, the port the server is listening on, defaults to 8082. Otherwise, this argument is ignored. * @returns A Promise that resolves when the websocket connection is closed, or rejects on an error. */ - async connect(serverAddress: string, port = 8082): Promise { + async connect(serverAddressOrHostName: string, port = 8082): Promise { + let url: URL; + + try { + url = new URL(serverAddressOrHostName); + // eslint-disable-next-line no-empty + } catch (_error) {} + + if (url?.hostname) { + return new Promise((resolve, reject) => { + const ws = (this.wsSocket = new WebSocket(url)); + + ws.once('error', ({ message }) => reject(new Error(message))); + ws.once('open', () => resolve()); + }); + } + + // [hawo]: The following logics are kept here for backward compatibility. + // + // However, there are no tests to prove the following code works. + // We tried our best to write a test and figure out how the code would work. + // + // However, there are obvious mistakes in the code that made it very unlikely to work: + // - `options.headers.upgrade` must set to `'websocket'` + // - Second argument of `WebSocket.server.completeUpgrade` should be `{}`, instead of `undefined` + // + // More readings at https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#client_handshake_request. this.wsServer = new WebSocket.Server({ noServer: true }); // Key generation per https://tools.ietf.org/html/rfc6455#section-1.3 (pg. 7) const wskey = crypto.randomBytes(NONCE_LENGTH).toString('base64'); const options = { port: port, - hostname: serverAddress, + hostname: serverAddressOrHostName, headers: { connection: 'upgrade', 'Sec-WebSocket-Key': wskey, 'Sec-WebSocket-Version': '13', }, }; + const req = request(options); req.end(); req.on('upgrade', (res, socket, head): void => { @@ -109,13 +138,12 @@ export class NodeWebSocket implements ISocket { } /** - * Set the handler for `'data'` and `'message'` events received on the socket. + * Set the handler for `'message'` events received on the socket. * * @param handler The callback to handle the "message" event. */ // eslint-disable-next-line @typescript-eslint/no-explicit-any setOnMessageHandler(handler: (x: any) => void): void { - this.wsSocket.on('data', handler); this.wsSocket.on('message', handler); } diff --git a/libraries/botframework-streaming/tests/NodeWebSocket.test.js b/libraries/botframework-streaming/tests/NodeWebSocket.test.js index 0fffdc58fc..32e8e34aa8 100644 --- a/libraries/botframework-streaming/tests/NodeWebSocket.test.js +++ b/libraries/botframework-streaming/tests/NodeWebSocket.test.js @@ -1,7 +1,10 @@ -const { NodeWebSocket } = require('../'); const { expect } = require('chai'); -const { FauxSock, TestRequest } = require('./helpers'); +const { FauxSock, FauxSocket, TestRequest, waitFor } = require('./helpers'); +const { NodeWebSocket } = require('../'); const { randomBytes } = require('crypto'); +const { Server } = require('ws'); + +const TEST_SERVER_PORT = 53978; describe('NodeWebSocket', function () { it('creates a new NodeWebSocket', function () { @@ -33,11 +36,9 @@ describe('NodeWebSocket', function () { it('can set message handlers on the socket', function () { const sock = new FauxSock(); const socket = new NodeWebSocket(sock); - expect(sock.dataHandler).to.be.undefined; - expect(sock._messageHandler).to.be.undefined; + expect(sock.messageHandler).to.be.undefined; expect(socket.setOnMessageHandler(() => {})).to.not.throw; - expect(sock.dataHandler).to.not.be.undefined; - expect(sock._messageHandler).to.not.be.undefined; + expect(sock.messageHandler).to.not.be.undefined; }); it('can set error handler on the socket', function () { @@ -57,7 +58,7 @@ describe('NodeWebSocket', function () { }); it('create() should be successful and set a WebSocket', async function () { - const sock = new FauxSock(); + const sock = new FauxSocket(); const nodeSocket = new NodeWebSocket(); const request = new TestRequest(); @@ -73,4 +74,145 @@ describe('NodeWebSocket', function () { await nodeSocket.create(request, sock, Buffer.from([])); }); + + describe('test against a WebSocket server', function () { + let serverEvents; + let server; + let serverSocket; + + beforeEach(function () { + return new Promise((resolve) => { + server = new Server({ path: '/some-path', port: TEST_SERVER_PORT }); + serverEvents = []; + + server.on('connection', function (socket) { + serverEvents.push(['connection', socket]); + + serverSocket = socket; + serverSocket.send('welcome'); + serverSocket.on('message', function (data) { + serverEvents.push(['message', data]); + }); + serverSocket.on('close', function (code, reason) { + serverEvents.push(['close', code, reason]); + }); + }); + + server.on('listening', resolve); + }); + }); + + afterEach(function () { + server.close(); + }); + + describe('connect to a WebSocket server', function () { + let events; + let nodeSocket; + + beforeEach(function () { + nodeSocket = new NodeWebSocket(); + events = []; + + const connectPromise = nodeSocket.connect(`ws://localhost:${server.address().port}/some-path`); + + nodeSocket.setOnCloseHandler((code, reason) => events.push(['close', code, reason])); + nodeSocket.setOnErrorHandler((error) => events.push(['error', error])); + nodeSocket.setOnMessageHandler((data) => events.push(['message', data])); + + return connectPromise; + }); + + afterEach(function () { + try { + nodeSocket.close(); + // eslint-disable-next-line no-empty + } catch {} + }); + + it('should connect to the server', function () { + return waitFor(() => { + expect(serverEvents).to.have.lengthOf(1); + expect(serverEvents[0]).deep.to.equals(['connection', serverSocket]); + }); + }); + + describe('isConnected property', function () { + it('should be true', function () { + expect(nodeSocket.isConnected).to.be.true; + }); + }); + + it('should receive initial message', function () { + return waitFor(() => { + expect(events).to.have.lengthOf(1); + // Initial message from server is text, not Uint8Array. + expect(events[0]).deep.to.equals(['message', 'welcome']); + }); + }); + + describe('after sending a binary message', function () { + beforeEach(function () { + nodeSocket.write(new TextEncoder().encode('morning')); + }); + + it('should send to the server', function () { + return waitFor(() => { + expect(serverEvents).to.have.lengthOf(2); + expect(new TextDecoder().decode(serverEvents[1][1])).to.equal('morning'); + }); + }); + }); + + describe('after server send a binary message', function () { + beforeEach(function () { + serverSocket.send(new TextEncoder().encode('afternoon')); + }); + + it('should receive the message', function () { + return waitFor(() => { + expect(events).to.have.lengthOf(2); + expect(new TextDecoder().decode(events[1][1])).to.equal('afternoon'); + }); + }); + }); + + it('should close with code and reason', function () { + nodeSocket.close(3000, 'bye'); + + return waitFor(() => { + expect(serverEvents).to.have.lengthOf(2); + expect(serverEvents[1]).deep.to.equals(['close', 3000, 'bye']); + }); + }); + + describe('when server close connection', function () { + beforeEach(function () { + serverSocket.close(4999, 'goodnight'); + }); + + it('should receive close event', function () { + return waitFor(() => { + expect(events).to.have.lengthOf(2); + expect(events[1]).deep.to.equals(['close', 4999, 'goodnight']); + }); + }); + }); + + describe('when server terminated the connection', function () { + beforeEach(function () { + serverSocket.terminate(); + }); + + it('should receive close event with code 1006', function () { + return waitFor(() => { + expect(events).to.have.lengthOf(2); + + // 1006 is "close outside of Web Socket". + expect(events[1]).deep.to.equals(['close', 1006, '']); + }); + }); + }); + }); + }); }); diff --git a/libraries/botframework-streaming/tests/NodeWebSocketFactory.test.js b/libraries/botframework-streaming/tests/NodeWebSocketFactory.test.js index 55db7f55e9..f29c99a69f 100644 --- a/libraries/botframework-streaming/tests/NodeWebSocketFactory.test.js +++ b/libraries/botframework-streaming/tests/NodeWebSocketFactory.test.js @@ -1,12 +1,12 @@ -const { NodeWebSocket, NodeWebSocketFactory } = require('..'); -const { FauxSock, TestRequest } = require('./helpers'); const { expect } = require('chai'); +const { FauxSocket, TestRequest } = require('./helpers'); +const { NodeWebSocket, NodeWebSocketFactory } = require('..'); const { randomBytes } = require('crypto'); describe('NodeWebSocketFactory', function () { it('createWebSocket() should create a new NodeWebSocket', async function () { const factory = new NodeWebSocketFactory(); - const sock = new FauxSock(); + const sock = new FauxSocket(); const request = new TestRequest(); request.setIsUpgradeRequest(true); request.headers = []; diff --git a/libraries/botframework-streaming/tests/helpers/fauxSock.js b/libraries/botframework-streaming/tests/helpers/fauxSock.js index e219eb3a36..cae935b9f2 100644 --- a/libraries/botframework-streaming/tests/helpers/fauxSock.js +++ b/libraries/botframework-streaming/tests/helpers/fauxSock.js @@ -74,39 +74,19 @@ class FauxSock { return this.exists; } - /** WaterShed Socket Specific? */ - destroy() { - return true; - } - - /** WaterShed Socket Specific? */ - removeAllListeners() { - return true; - } - on(action, handler) { switch (action) { case 'error': this.errorHandler = handler; break; - case 'data': - this.messageHandler = handler; - this.dataHandler = handler; // Required for `ws` WebSockets - break; case 'close': this.closeHandler = handler; break; case 'end': this.endHandler = handler; break; - case 'text': - this.textHandler = handler; // Required for `watershed` WebSockets - break; - case 'binary': - this.binaryHandler = handler; // Required for `watershed` WebSockets - break; case 'message': - this._messageHandler = handler; // Required for `ws` WebSockets + this.messageHandler = handler; // Required for `ws` WebSockets break; default: throw new Error(`TestError: Unknown action ("${action}") passed to FauxSock.on()`); diff --git a/libraries/botframework-streaming/tests/helpers/fauxSocket.js b/libraries/botframework-streaming/tests/helpers/fauxSocket.js new file mode 100644 index 0000000000..562a1e51af --- /dev/null +++ b/libraries/botframework-streaming/tests/helpers/fauxSocket.js @@ -0,0 +1,15 @@ +const { Duplex } = require('stream'); + +module.exports.FauxSocket = class FauxSocket extends Duplex { + // eslint-disable-next-line no-empty + _read() {} + + // eslint-disable-next-line no-empty + _write() {} + + // eslint-disable-next-line no-empty + setNoDelay() {} + + // eslint-disable-next-line no-empty + setTimeout() {} +}; diff --git a/libraries/botframework-streaming/tests/helpers/index.js b/libraries/botframework-streaming/tests/helpers/index.js index c6bdb28c8e..6f164a9f53 100644 --- a/libraries/botframework-streaming/tests/helpers/index.js +++ b/libraries/botframework-streaming/tests/helpers/index.js @@ -4,7 +4,13 @@ */ const { FauxSock } = require('./fauxSock'); +const { FauxSocket } = require('./fauxSocket'); +const { sleep } = require('./sleep'); const { TestRequest } = require('./testRequest'); +const { waitFor } = require('./waitFor'); module.exports.FauxSock = FauxSock; +module.exports.FauxSocket = FauxSocket; +module.exports.sleep = sleep; module.exports.TestRequest = TestRequest; +module.exports.waitFor = waitFor; diff --git a/libraries/botframework-streaming/tests/helpers/sleep.js b/libraries/botframework-streaming/tests/helpers/sleep.js new file mode 100644 index 0000000000..cb3836abca --- /dev/null +++ b/libraries/botframework-streaming/tests/helpers/sleep.js @@ -0,0 +1,3 @@ +module.exports.sleep = function sleep(durationInMS = 100) { + return new Promise((resolve) => setTimeout(resolve, durationInMS)); +}; diff --git a/libraries/botframework-streaming/tests/helpers/waitFor.js b/libraries/botframework-streaming/tests/helpers/waitFor.js new file mode 100644 index 0000000000..354528ad59 --- /dev/null +++ b/libraries/botframework-streaming/tests/helpers/waitFor.js @@ -0,0 +1,16 @@ +const { sleep } = require('./sleep'); + +module.exports.waitFor = async function waitFor(callback, { interval = 50, timeout = 1000 } = {}) { + let lastError; + + for (const startTime = Date.now(); Date.now() < startTime + timeout; ) { + try { + return callback(); + } catch (error) { + lastError = error; + await sleep(interval); + } + } + + throw lastError || new Error('timed out'); +};