From a0db67794dabec4d377a3facef3e2cbb21ec5342 Mon Sep 17 00:00:00 2001 From: Steven Gum <14935595+stevengum@users.noreply.github.com> Date: Sun, 20 Oct 2019 08:11:26 -0700 Subject: [PATCH] [Streaming] Add working ws websocket implementation for Node environment (#1334) * add working ws websocket impl * remove unused constants --- libraries/botframework-streaming/package.json | 4 +- libraries/botframework-streaming/src/index.ts | 2 + .../src/interfaces/ISocket.ts | 2 +- .../src/webSocket/browserWebSocket.ts | 2 +- .../src/webSocket/factories/index.ts | 1 + .../factories/nodeWebSocketFactory.ts | 4 +- .../factories/wsNodeWebSocketFactory.ts | 27 ++++ .../src/webSocket/index.ts | 1 + .../src/webSocket/nodeWebSocket.ts | 2 +- .../src/webSocket/webSocketTransport.ts | 6 +- .../src/webSocket/wsNodeWebSocket.ts | 139 ++++++++++++++++++ .../tests/NodeWebSocket.test.js | 2 +- .../tests/WebSocket.test.js | 2 +- .../tests/helpers/fauxSock.js | 39 ++++- .../tests/helpers/testRequest.js | 6 + .../tests/wsNodeWebSocket.test.js | 82 +++++++++++ 16 files changed, 307 insertions(+), 14 deletions(-) create mode 100644 libraries/botframework-streaming/src/webSocket/factories/wsNodeWebSocketFactory.ts create mode 100644 libraries/botframework-streaming/src/webSocket/wsNodeWebSocket.ts create mode 100644 libraries/botframework-streaming/tests/wsNodeWebSocket.test.js diff --git a/libraries/botframework-streaming/package.json b/libraries/botframework-streaming/package.json index 21fdabffaa..a1d0d0af6b 100644 --- a/libraries/botframework-streaming/package.json +++ b/libraries/botframework-streaming/package.json @@ -10,6 +10,7 @@ "devDependencies": { "@types/chai": "^4.1.7", "@types/node": "^10.12.18", + "@types/ws": "^6.0.3", "chai": "^4.2.0", "mocha": "^6.2.0", "nyc": "^14.1.1", @@ -22,7 +23,8 @@ "dependencies": { "promise.prototype.finally": "^3.1.0", "uuid": "^3.3.2", - "watershed": "^0.4.0" + "watershed": "^0.4.0", + "ws": "^7.1.2" }, "engines": { "node": ">12.3" diff --git a/libraries/botframework-streaming/src/index.ts b/libraries/botframework-streaming/src/index.ts index 5f9f2f000f..de4cd68c68 100644 --- a/libraries/botframework-streaming/src/index.ts +++ b/libraries/botframework-streaming/src/index.ts @@ -20,4 +20,6 @@ export { NodeWebSocketFactoryBase, WebSocketClient, WebSocketServer, + WsNodeWebSocket, + WsNodeWebSocketFactory, } from './webSocket'; diff --git a/libraries/botframework-streaming/src/interfaces/ISocket.ts b/libraries/botframework-streaming/src/interfaces/ISocket.ts index 862e627ea7..be36b7efe0 100644 --- a/libraries/botframework-streaming/src/interfaces/ISocket.ts +++ b/libraries/botframework-streaming/src/interfaces/ISocket.ts @@ -11,7 +11,7 @@ * with the WebSocket server or client. */ export interface ISocket { - isConnected(): boolean; + isConnected: boolean; write(buffer: Buffer); connect(serverAddress: string): Promise; close(); diff --git a/libraries/botframework-streaming/src/webSocket/browserWebSocket.ts b/libraries/botframework-streaming/src/webSocket/browserWebSocket.ts index b7e16041ce..8e2b0073cd 100644 --- a/libraries/botframework-streaming/src/webSocket/browserWebSocket.ts +++ b/libraries/botframework-streaming/src/webSocket/browserWebSocket.ts @@ -52,7 +52,7 @@ export class BrowserWebSocket implements ISocket { /** * True if the socket is currently connected. */ - public isConnected(): boolean { + public get isConnected(): boolean { return this.webSocket.readyState === 1; } diff --git a/libraries/botframework-streaming/src/webSocket/factories/index.ts b/libraries/botframework-streaming/src/webSocket/factories/index.ts index df0f5961b4..dc920f3854 100644 --- a/libraries/botframework-streaming/src/webSocket/factories/index.ts +++ b/libraries/botframework-streaming/src/webSocket/factories/index.ts @@ -8,3 +8,4 @@ export * from './nodeWebSocketFactory'; export * from './nodeWebSocketFactoryBase'; +export * from './wsNodeWebSocketFactory'; diff --git a/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactory.ts b/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactory.ts index 0985cbe16a..a9f62b8eb8 100644 --- a/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactory.ts +++ b/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactory.ts @@ -16,7 +16,7 @@ export class NodeWebSocketFactory extends NodeWebSocketFactoryBase { constructor() { super(); } - + /** * Creates a NodeWebSocket instance. * @param req @@ -26,7 +26,7 @@ export class NodeWebSocketFactory extends NodeWebSocketFactoryBase { public createWebSocket(req: IncomingMessage, socket: Socket, head: Buffer): NodeWebSocket { const s = new NodeWebSocket(); s.create(req, socket, head); - + return s; } } diff --git a/libraries/botframework-streaming/src/webSocket/factories/wsNodeWebSocketFactory.ts b/libraries/botframework-streaming/src/webSocket/factories/wsNodeWebSocketFactory.ts new file mode 100644 index 0000000000..e0bf708a46 --- /dev/null +++ b/libraries/botframework-streaming/src/webSocket/factories/wsNodeWebSocketFactory.ts @@ -0,0 +1,27 @@ +/** + * @module botframework-streaming + */ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +import { IncomingMessage } from 'http'; +import { Socket } from 'net'; + +import { WsNodeWebSocket } from '../wsNodeWebSocket'; + +export class WsNodeWebSocketFactory { + /** + * Creates a WsNodeWebSocket instance. + * @param req + * @param socket + * @param head + */ + public async createWebSocket(req: IncomingMessage, socket: Socket, head: Buffer): Promise { + const s = new WsNodeWebSocket(); + await s.create(req, socket, head); + + return s; + } +} diff --git a/libraries/botframework-streaming/src/webSocket/index.ts b/libraries/botframework-streaming/src/webSocket/index.ts index 8644a1bb16..7685d49f2c 100644 --- a/libraries/botframework-streaming/src/webSocket/index.ts +++ b/libraries/botframework-streaming/src/webSocket/index.ts @@ -13,3 +13,4 @@ export * from './nodeWebSocket'; export * from './webSocketClient'; export * from './webSocketServer'; export * from './webSocketTransport'; +export * from './wsNodeWebSocket'; diff --git a/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts b/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts index bbb3fba0e9..96c5f5c800 100644 --- a/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts +++ b/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts @@ -41,7 +41,7 @@ export class NodeWebSocket implements ISocket { /** * True if the socket is currently connected. */ - public isConnected(): boolean { + public get isConnected(): boolean { return this.connected; } diff --git a/libraries/botframework-streaming/src/webSocket/webSocketTransport.ts b/libraries/botframework-streaming/src/webSocket/webSocketTransport.ts index a0652637ea..3320054b4a 100644 --- a/libraries/botframework-streaming/src/webSocket/webSocketTransport.ts +++ b/libraries/botframework-streaming/src/webSocket/webSocketTransport.ts @@ -48,7 +48,7 @@ export class WebSocketTransport implements ITransportSender, ITransportReceiver * @param buffer The buffered data to send out over the connection. */ public send(buffer: Buffer): number { - if (this._socket && this._socket.isConnected()) { + if (this._socket && this._socket.isConnected) { this._socket.write(buffer); return buffer.length; @@ -61,14 +61,14 @@ export class WebSocketTransport implements ITransportSender, ITransportReceiver * Returns true if the transport is connected to a socket. */ public isConnected(): boolean { - return this._socket.isConnected(); + return this._socket.isConnected; } /** * Close the socket this transport is connected to. */ public close(): void { - if (this._socket && this._socket.isConnected()) { + if (this._socket && this._socket.isConnected) { this._socket.close(); } } diff --git a/libraries/botframework-streaming/src/webSocket/wsNodeWebSocket.ts b/libraries/botframework-streaming/src/webSocket/wsNodeWebSocket.ts new file mode 100644 index 0000000000..c251a928dc --- /dev/null +++ b/libraries/botframework-streaming/src/webSocket/wsNodeWebSocket.ts @@ -0,0 +1,139 @@ +/** + * @module botframework-streaming + */ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +import { ISocket } from '../interfaces'; +import { IncomingMessage, request } from 'http'; +import { Socket } from 'net'; +import * as WebSocket from 'ws'; +import * as crypto from 'crypto'; + +const WS_SERVER = new WebSocket.Server({ noServer: true }); + +// Taken from watershed, these needs to be investigated. +const NONCE_LENGTH = 16; + +export class WsNodeWebSocket implements ISocket { + private wsSocket: WebSocket; + private connected: boolean; + + /** + * Creates a new instance of the [WsNodeWebSocket](xref:botframework-streaming.WsNodeWebSocket) class. + * + * @param socket The ws socket object to build this connection on. + */ + public constructor(wsSocket?: WebSocket) { + this.wsSocket = wsSocket; + this.connected = !!wsSocket; + } + + /** + * Create and set a `ws` WebSocket with an HTTP Request, Socket and Buffer. + * @param req IncomingMessage + * @param socket Socket + * @param head Buffer + */ + public async create(req: IncomingMessage, socket: Socket, head: Buffer): Promise { + return new Promise((resolve, reject) => { + try { + WS_SERVER.handleUpgrade(req, socket, head, (websocket) => { + this.wsSocket = websocket; + this.connected = true; + resolve(); + }); + } catch (err) { + reject(err); + } + }); + } + + /** + * True if the socket is currently connected. + */ + public get isConnected(): boolean { + return this.connected; + } + + /** + * Writes a buffer to the socket and sends it. + * + * @param buffer The buffer of data to send across the connection. + */ + public write(buffer: Buffer): void { + this.wsSocket.send(buffer); + } + + /** + * Connects to the supporting socket using WebSocket protocol. + * + * @param serverAddress The address the server is listening on. + * @param port The port the server is listening on, defaults to 8082. + */ + public async connect(serverAddress, port = 8082): Promise { + // Taken from WaterShed, this needs to be investigated. + const wskey = crypto.randomBytes(NONCE_LENGTH).toString('base64'); + const options = { + port: port, + hostname: serverAddress, + headers: { + connection: 'upgrade', + 'Sec-WebSocket-Key': wskey, + 'Sec-WebSocket-Version': '13' + } + }; + const req = request(options); + req.end(); + req.on('upgrade', (res, socket, head): void => { + // @types/ws does not contain the signature for completeUpgrade + // https://github.com/websockets/ws/blob/0a612364e69fc07624b8010c6873f7766743a8e3/lib/websocket-server.js#L269 + (WS_SERVER as any).completeUpgrade(wskey, undefined, res, socket, head, (websocket): void => { + this.wsSocket = websocket; + this.connected = true; + }); + }); + + return new Promise((resolve, reject): void => { + req.on('close', resolve); + req.on('error', reject); + }); + } + + /** + * Set the handler for `'data'` and `'message'` events received on the socket. + */ + public setOnMessageHandler(handler: (x: any) => void): void { + this.wsSocket.on('data', handler); + this.wsSocket.on('message', handler); + } + + /** + * Close the socket. + * @remarks + * Optionally pass in a status code and string explaining why the connection is closing. + * @param code + * @param data + */ + public close(code?: number, data?: string): void { + this.connected = false; + + return this.wsSocket.close(code, data); + } + + /** + * Set the callback to call when encountering socket closures. + */ + public setOnCloseHandler(handler: (x: any) => void): void { + this.wsSocket.on('close', handler); + } + + /** + * Set the callback to call when encountering errors. + */ + public setOnErrorHandler(handler: (x: any) => void): void { + this.wsSocket.on('error', (error): void => { if (error) { handler(error); } }); + } +} \ No newline at end of file diff --git a/libraries/botframework-streaming/tests/NodeWebSocket.test.js b/libraries/botframework-streaming/tests/NodeWebSocket.test.js index 6478a856ba..47b4884fbb 100644 --- a/libraries/botframework-streaming/tests/NodeWebSocket.test.js +++ b/libraries/botframework-streaming/tests/NodeWebSocket.test.js @@ -19,7 +19,7 @@ describe('NodeSocket', () => { it('starts out connected', () => { const ns = new NodeWebSocket(new FauxSock); - expect(ns.isConnected()).to.be.true; + expect(ns.isConnected).to.be.true; }); it('writes to the socket', () => { diff --git a/libraries/botframework-streaming/tests/WebSocket.test.js b/libraries/botframework-streaming/tests/WebSocket.test.js index c63d3d5dec..d8ad86cdeb 100644 --- a/libraries/botframework-streaming/tests/WebSocket.test.js +++ b/libraries/botframework-streaming/tests/WebSocket.test.js @@ -231,7 +231,7 @@ describe('Streaming Extensions WebSocket Library Tests', () => { it('knows its connected', () => { let bs = new ws.BrowserWebSocket( new FauxSock()); bs.connect('fakeUrl'); - expect(bs.isConnected()).to.be.true; + expect(bs.isConnected).to.be.true; }); it('writes to the socket', () => { diff --git a/libraries/botframework-streaming/tests/helpers/fauxSock.js b/libraries/botframework-streaming/tests/helpers/fauxSock.js index c66867e4c2..18b1bdc76b 100644 --- a/libraries/botframework-streaming/tests/helpers/fauxSock.js +++ b/libraries/botframework-streaming/tests/helpers/fauxSock.js @@ -18,9 +18,32 @@ class FauxSock { this.onmessage = undefined; this.onerror = undefined; this.onclose = undefined; + + // `ws` specific check in WebSocketServer.completeUpgrade + this.readable = true; + this.writable = true; + } + + /* Start of `ws` specific methods. */ + removeListener(event, handler) { + switch (event) { + case 'error': + return; + default: + console.error(`FauxSock.removeListener(): Reached default case: ${event}`); + } + } + + setTimeout(value) { + this.timeoutValue = value; + return; + } + + setNoDelay() { } + /* End of `ws` specific methods. */ - isConnected() { + get isConnected() { return this.connected; } @@ -75,14 +98,24 @@ class FauxSock { if (action === 'close') { this.closeHandler = handler; } + if (action === 'end') { + this.endHandler = handler; + } + // Required for `watershed` WebSockets if (action === 'text') { this.textHandler = handler; } + // Required for `watershed` WebSockets if (action === 'binary') { this.binaryHandler = handler; } - if (action === 'end') { - this.endHandler = handler; + // Required for `ws` WebSockets + if (action === 'data') { + this.dataHandler = handler; + } + // Required for `ws` WebSockets + if (action === 'message') { + this._messageHandler = handler; } }; diff --git a/libraries/botframework-streaming/tests/helpers/testRequest.js b/libraries/botframework-streaming/tests/helpers/testRequest.js index 17c1f1c467..778add1d19 100644 --- a/libraries/botframework-streaming/tests/helpers/testRequest.js +++ b/libraries/botframework-streaming/tests/helpers/testRequest.js @@ -8,11 +8,17 @@ class TestRequest { let headers = []; } + setMethod(verb) { + this.method = 'GET'; + } + isUpgradeRequest() { return this.upgradeRequestVal; } setIsUpgradeRequest(value) { + // `ws` specific check + this.method = 'GET'; this.upgradeRequestVal = value; } diff --git a/libraries/botframework-streaming/tests/wsNodeWebSocket.test.js b/libraries/botframework-streaming/tests/wsNodeWebSocket.test.js new file mode 100644 index 0000000000..755aa787e8 --- /dev/null +++ b/libraries/botframework-streaming/tests/wsNodeWebSocket.test.js @@ -0,0 +1,82 @@ +const { WsNodeWebSocket } = require('../'); +const { expect } = require('chai'); +const { FauxSock, TestRequest } = require('./helpers'); +const { randomBytes } = require('crypto'); + +describe('WsNodeWebSocket', () => { + it('creates a new WsNodeWebSocket', () => { + const wsSocket = new WsNodeWebSocket(new FauxSock); + expect(wsSocket).to.be.instanceOf(WsNodeWebSocket); + expect(wsSocket.close()).to.not.throw; + }); + + it('requires a valid URL', () => { + try { + const wsSocket = new WsNodeWebSocket(new FauxSock); + } catch (error) { + expect(error.message).to.equal('Invalid URL: fakeURL'); + } + }); + + it('starts out connected', () => { + const wsSocket = new WsNodeWebSocket(new FauxSock); + expect(wsSocket.isConnected).to.be.true; + }); + + it('writes to the socket', () => { + const wsSocket = new WsNodeWebSocket(new FauxSock); + const buff = Buffer.from('hello'); + expect(wsSocket.write(buff)).to.not.throw; + }); + + it('attempts to open a connection', () => { + const wsSocket = new WsNodeWebSocket(new FauxSock); + expect(wsSocket.connect().catch((error) => { + expect(error.message).to.equal('connect ECONNREFUSED 127.0.0.1:8082'); + })); + }); + + it('can set message handlers on the socket', () => { + const sock = new FauxSock(); + const wsSocket = new WsNodeWebSocket(sock); + expect(sock.dataHandler).to.be.undefined; + expect(sock._messageHandler).to.be.undefined; + expect(wsSocket.setOnMessageHandler(() => { })).to.not.throw; + expect(sock.dataHandler).to.not.be.undefined; + expect(sock._messageHandler).to.not.be.undefined; + }); + + it('can set error handler on the socket', () => { + const sock = new FauxSock(); + const wsSocket = new WsNodeWebSocket(sock); + expect(sock.errorHandler).to.be.undefined; + expect(wsSocket.setOnErrorHandler(() => { })).to.not.throw; + expect(sock.errorHandler).to.not.be.undefined; + }); + + it('can set end handler on the socket', () => { + const sock = new FauxSock(); + const wsSocket = new WsNodeWebSocket(sock); + expect(sock.closeHandler).to.be.undefined; + expect(wsSocket.setOnCloseHandler(() => { })).to.not.throw; + expect(sock.closeHandler).to.not.be.undefined; + }); + + it('create() should be successful and set a WebSocket', async () => { + const sock = new FauxSock(); + const nodeSocket = new WsNodeWebSocket(); + const request = new TestRequest(); + + // Configure a proper upgrade request for `ws`. + request.setIsUpgradeRequest(true); + request.headers = { upgrade: 'websocket' }; + // Use Node.js `crypto` module to calculate a valid 'sec-websocket-key' value. + // The key must pass this RegExp: + // https://github.com/websockets/ws/blob/0a612364e69fc07624b8010c6873f7766743a8e3/lib/websocket-server.js#L12 + request.headers['sec-websocket-key'] = randomBytes(16).toString('base64'); + request.headers['sec-websocket-version'] = '13'; + request.headers['sec-websocket-protocol'] = ''; + + await nodeSocket.create(request, sock, Buffer.from([])); + }); +});