Skip to content

Commit

Permalink
[Streaming] Add working ws websocket implementation for Node environm…
Browse files Browse the repository at this point in the history
…ent (#1334)

* add working ws websocket impl

* remove unused constants
  • Loading branch information
stevengum authored Oct 20, 2019
1 parent cf497da commit a0db677
Show file tree
Hide file tree
Showing 16 changed files with 307 additions and 14 deletions.
4 changes: 3 additions & 1 deletion libraries/botframework-streaming/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"devDependencies": {
"@types/chai": "^4.1.7",
"@types/node": "^10.12.18",
"@types/ws": "^6.0.3",
"chai": "^4.2.0",
"mocha": "^6.2.0",
"nyc": "^14.1.1",
Expand All @@ -22,7 +23,8 @@
"dependencies": {
"promise.prototype.finally": "^3.1.0",
"uuid": "^3.3.2",
"watershed": "^0.4.0"
"watershed": "^0.4.0",
"ws": "^7.1.2"
},
"engines": {
"node": ">12.3"
Expand Down
2 changes: 2 additions & 0 deletions libraries/botframework-streaming/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ export {
NodeWebSocketFactoryBase,
WebSocketClient,
WebSocketServer,
WsNodeWebSocket,
WsNodeWebSocketFactory,
} from './webSocket';
2 changes: 1 addition & 1 deletion libraries/botframework-streaming/src/interfaces/ISocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* with the WebSocket server or client.
*/
export interface ISocket {
isConnected(): boolean;
isConnected: boolean;
write(buffer: Buffer);
connect(serverAddress: string): Promise<void>;
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export class BrowserWebSocket implements ISocket {
/**
* True if the socket is currently connected.
*/
public isConnected(): boolean {
public get isConnected(): boolean {
return this.webSocket.readyState === 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@

export * from './nodeWebSocketFactory';
export * from './nodeWebSocketFactoryBase';
export * from './wsNodeWebSocketFactory';
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class NodeWebSocketFactory extends NodeWebSocketFactoryBase {
constructor() {
super();
}

/**
* Creates a NodeWebSocket instance.
* @param req
Expand All @@ -26,7 +26,7 @@ export class NodeWebSocketFactory extends NodeWebSocketFactoryBase {
public createWebSocket(req: IncomingMessage, socket: Socket, head: Buffer): NodeWebSocket {
const s = new NodeWebSocket();
s.create(req, socket, head);

return s;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { IncomingMessage } from 'http';
import { Socket } from 'net';

import { WsNodeWebSocket } from '../wsNodeWebSocket';

export class WsNodeWebSocketFactory {
/**
* Creates a WsNodeWebSocket instance.
* @param req
* @param socket
* @param head
*/
public async createWebSocket(req: IncomingMessage, socket: Socket, head: Buffer): Promise<WsNodeWebSocket> {
const s = new WsNodeWebSocket();
await s.create(req, socket, head);

return s;
}
}
1 change: 1 addition & 0 deletions libraries/botframework-streaming/src/webSocket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ export * from './nodeWebSocket';
export * from './webSocketClient';
export * from './webSocketServer';
export * from './webSocketTransport';
export * from './wsNodeWebSocket';
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class NodeWebSocket implements ISocket {
/**
* True if the socket is currently connected.
*/
public isConnected(): boolean {
public get isConnected(): boolean {
return this.connected;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class WebSocketTransport implements ITransportSender, ITransportReceiver
* @param buffer The buffered data to send out over the connection.
*/
public send(buffer: Buffer): number {
if (this._socket && this._socket.isConnected()) {
if (this._socket && this._socket.isConnected) {
this._socket.write(buffer);

return buffer.length;
Expand All @@ -61,14 +61,14 @@ export class WebSocketTransport implements ITransportSender, ITransportReceiver
* Returns true if the transport is connected to a socket.
*/
public isConnected(): boolean {
return this._socket.isConnected();
return this._socket.isConnected;
}

/**
* Close the socket this transport is connected to.
*/
public close(): void {
if (this._socket && this._socket.isConnected()) {
if (this._socket && this._socket.isConnected) {
this._socket.close();
}
}
Expand Down
139 changes: 139 additions & 0 deletions libraries/botframework-streaming/src/webSocket/wsNodeWebSocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { ISocket } from '../interfaces';
import { IncomingMessage, request } from 'http';
import { Socket } from 'net';
import * as WebSocket from 'ws';
import * as crypto from 'crypto';

const WS_SERVER = new WebSocket.Server({ noServer: true });

// Taken from watershed, these needs to be investigated.
const NONCE_LENGTH = 16;

export class WsNodeWebSocket implements ISocket {
private wsSocket: WebSocket;
private connected: boolean;

/**
* Creates a new instance of the [WsNodeWebSocket](xref:botframework-streaming.WsNodeWebSocket) class.
*
* @param socket The ws socket object to build this connection on.
*/
public constructor(wsSocket?: WebSocket) {
this.wsSocket = wsSocket;
this.connected = !!wsSocket;
}

/**
* Create and set a `ws` WebSocket with an HTTP Request, Socket and Buffer.
* @param req IncomingMessage
* @param socket Socket
* @param head Buffer
*/
public async create(req: IncomingMessage, socket: Socket, head: Buffer): Promise<void> {
return new Promise<void>((resolve, reject) => {
try {
WS_SERVER.handleUpgrade(req, socket, head, (websocket) => {
this.wsSocket = websocket;
this.connected = true;
resolve();
});
} catch (err) {
reject(err);
}
});
}

/**
* True if the socket is currently connected.
*/
public get isConnected(): boolean {
return this.connected;
}

/**
* Writes a buffer to the socket and sends it.
*
* @param buffer The buffer of data to send across the connection.
*/
public write(buffer: Buffer): void {
this.wsSocket.send(buffer);
}

/**
* Connects to the supporting socket using WebSocket protocol.
*
* @param serverAddress The address the server is listening on.
* @param port The port the server is listening on, defaults to 8082.
*/
public async connect(serverAddress, port = 8082): Promise<void> {
// Taken from WaterShed, this needs to be investigated.
const wskey = crypto.randomBytes(NONCE_LENGTH).toString('base64');
const options = {
port: port,
hostname: serverAddress,
headers: {
connection: 'upgrade',
'Sec-WebSocket-Key': wskey,
'Sec-WebSocket-Version': '13'
}
};
const req = request(options);
req.end();
req.on('upgrade', (res, socket, head): void => {
// @types/ws does not contain the signature for completeUpgrade
// https://github.com/websockets/ws/blob/0a612364e69fc07624b8010c6873f7766743a8e3/lib/websocket-server.js#L269
(WS_SERVER as any).completeUpgrade(wskey, undefined, res, socket, head, (websocket): void => {
this.wsSocket = websocket;
this.connected = true;
});
});

return new Promise<void>((resolve, reject): void => {
req.on('close', resolve);
req.on('error', reject);
});
}

/**
* Set the handler for `'data'` and `'message'` events received on the socket.
*/
public setOnMessageHandler(handler: (x: any) => void): void {
this.wsSocket.on('data', handler);
this.wsSocket.on('message', handler);
}

/**
* Close the socket.
* @remarks
* Optionally pass in a status code and string explaining why the connection is closing.
* @param code
* @param data
*/
public close(code?: number, data?: string): void {
this.connected = false;

return this.wsSocket.close(code, data);
}

/**
* Set the callback to call when encountering socket closures.
*/
public setOnCloseHandler(handler: (x: any) => void): void {
this.wsSocket.on('close', handler);
}

/**
* Set the callback to call when encountering errors.
*/
public setOnErrorHandler(handler: (x: any) => void): void {
this.wsSocket.on('error', (error): void => { if (error) { handler(error); } });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe('NodeSocket', () => {

it('starts out connected', () => {
const ns = new NodeWebSocket(new FauxSock);
expect(ns.isConnected()).to.be.true;
expect(ns.isConnected).to.be.true;
});

it('writes to the socket', () => {
Expand Down
2 changes: 1 addition & 1 deletion libraries/botframework-streaming/tests/WebSocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ describe('Streaming Extensions WebSocket Library Tests', () => {
it('knows its connected', () => {
let bs = new ws.BrowserWebSocket( new FauxSock());
bs.connect('fakeUrl');
expect(bs.isConnected()).to.be.true;
expect(bs.isConnected).to.be.true;
});

it('writes to the socket', () => {
Expand Down
39 changes: 36 additions & 3 deletions libraries/botframework-streaming/tests/helpers/fauxSock.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,32 @@ class FauxSock {
this.onmessage = undefined;
this.onerror = undefined;
this.onclose = undefined;

// `ws` specific check in WebSocketServer.completeUpgrade
this.readable = true;
this.writable = true;
}

/* Start of `ws` specific methods. */
removeListener(event, handler) {
switch (event) {
case 'error':
return;
default:
console.error(`FauxSock.removeListener(): Reached default case: ${event}`);
}
}

setTimeout(value) {
this.timeoutValue = value;
return;
}

setNoDelay() {
}
/* End of `ws` specific methods. */

isConnected() {
get isConnected() {
return this.connected;
}

Expand Down Expand Up @@ -75,14 +98,24 @@ class FauxSock {
if (action === 'close') {
this.closeHandler = handler;
}
if (action === 'end') {
this.endHandler = handler;
}
// Required for `watershed` WebSockets
if (action === 'text') {
this.textHandler = handler;
}
// Required for `watershed` WebSockets
if (action === 'binary') {
this.binaryHandler = handler;
}
if (action === 'end') {
this.endHandler = handler;
// Required for `ws` WebSockets
if (action === 'data') {
this.dataHandler = handler;
}
// Required for `ws` WebSockets
if (action === 'message') {
this._messageHandler = handler;
}
};

Expand Down
6 changes: 6 additions & 0 deletions libraries/botframework-streaming/tests/helpers/testRequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ class TestRequest {
let headers = [];
}

setMethod(verb) {
this.method = 'GET';
}

isUpgradeRequest() {
return this.upgradeRequestVal;
}

setIsUpgradeRequest(value) {
// `ws` specific check
this.method = 'GET';
this.upgradeRequestVal = value;
}

Expand Down
Loading

0 comments on commit a0db677

Please sign in to comment.