diff --git a/libraries/botbuilder-core/src/transcriptLogger.ts b/libraries/botbuilder-core/src/transcriptLogger.ts index 80f0246b14..0b19995fa2 100644 --- a/libraries/botbuilder-core/src/transcriptLogger.ts +++ b/libraries/botbuilder-core/src/transcriptLogger.ts @@ -47,15 +47,14 @@ export class TranscriptLoggerMiddleware implements Middleware { // hook up onSend pipeline context.onSendActivities(async (ctx: TurnContext, activities: Partial[], next2: () => Promise) => { - // run full pipeline + // Run full pipeline. const responses: ResourceResponse[] = await next2(); activities.map((a: Partial, index: number) => { const clonedActivity = this.cloneActivity(a); - // If present, set the id of the cloned activity to the id received from the server. - if (index < responses.length) { - clonedActivity.id = responses[index].id; - } + clonedActivity.id = responses && responses[index] ? + responses[index].id : + clonedActivity.id; // For certain channels, a ResourceResponse with an id is not always sent to the bot. // This fix uses the timestamp on the activity to populate its id for logging the transcript. diff --git a/libraries/botbuilder-core/tests/transcriptMiddleware.test.js b/libraries/botbuilder-core/tests/transcriptMiddleware.test.js index 9c263b3603..88173710eb 100644 --- a/libraries/botbuilder-core/tests/transcriptMiddleware.test.js +++ b/libraries/botbuilder-core/tests/transcriptMiddleware.test.js @@ -136,6 +136,199 @@ describe(`TranscriptLoggerMiddleware`, function () { }) }); + it(`should not error for sent activities if no ResourceResponses are received`, async () => { + class NoResourceResponseAdapter extends TestAdapter { + constructor(logic) { + super(logic); + } + + // Send activities but don't pass the ResourceResponses to the TranscriptLoggerMiddleware + async sendActivities(context, activities) { + await super.sendActivities(context, activities); + } + } + + let conversationId = null; + const transcriptStore = new MemoryTranscriptStore(); + const adapter = new NoResourceResponseAdapter(async (context) => { + conversationId = context.activity.conversation.id; + const typingActivity = { + type: ActivityTypes.Typing, + relatesTo: context.activity.relatesTo + }; + + await context.sendActivity(typingActivity); + await context.sendActivity(`echo:${context.activity.text}`); + + }).use(new TranscriptLoggerMiddleware(transcriptStore)); + + await adapter.send('foo') + .assertReply(activity => assert.equal(activity.type, ActivityTypes.Typing)) + .assertReply('echo:foo') + .send('bar') + .assertReply(activity => assert.equal(activity.type, ActivityTypes.Typing)) + .assertReply('echo:bar'); + + const pagedResult = await transcriptStore.getTranscriptActivities('test', conversationId); + assert.equal(pagedResult.items.length, 6); + assert.equal(pagedResult.items[0].text, 'foo'); + assert.equal(pagedResult.items[1].type, ActivityTypes.Typing); + assert.equal(pagedResult.items[2].text, 'echo:foo'); + assert.equal(pagedResult.items[3].text, 'bar'); + assert.equal(pagedResult.items[4].type, ActivityTypes.Typing); + assert.equal(pagedResult.items[5].text, 'echo:bar'); + pagedResult.items.forEach(a => { + assert(a.timestamp); + }); + }); + + it(`should not error for sent activities if another handler does not return next()`, async () => { + class NoResourceResponseMiddleware { + async onTurn(context, next) { + context.onSendActivities(async (context, activities, next) => { + // In SendActivitiesHandlers developers are supposed to call: + // return next(); + // If this is not returned, then the next handler will not have the ResourceResponses[]. + const responses = await next(); + }); + + // Run the bot's application logic. + await next(); + } + } + + let conversationId = null; + const transcriptStore = new MemoryTranscriptStore(); + const adapter = new TestAdapter(async (context) => { + conversationId = context.activity.conversation.id; + const typingActivity = { + type: ActivityTypes.Typing, + relatesTo: context.activity.relatesTo + }; + + await context.sendActivity(typingActivity); + await context.sendActivity(`echo:${context.activity.text}`); + + }); + + // Register both middleware + adapter.use(new TranscriptLoggerMiddleware(transcriptStore)); + adapter.use(new NoResourceResponseMiddleware()); + + await adapter.send('foo') + .assertReply(activity => assert.equal(activity.type, ActivityTypes.Typing)) + .assertReply('echo:foo') + .send('bar') + .assertReply(activity => assert.equal(activity.type, ActivityTypes.Typing)) + .assertReply('echo:bar'); + + const pagedResult = await transcriptStore.getTranscriptActivities('test', conversationId); + assert.equal(pagedResult.items.length, 6); + assert.equal(pagedResult.items[0].text, 'foo'); + assert.equal(pagedResult.items[1].type, ActivityTypes.Typing); + assert.equal(pagedResult.items[2].text, 'echo:foo'); + assert.equal(pagedResult.items[3].text, 'bar'); + assert.equal(pagedResult.items[4].type, ActivityTypes.Typing); + assert.equal(pagedResult.items[5].text, 'echo:bar'); + pagedResult.items.forEach(a => { + assert(a.timestamp); + }); + }); + + it(`should not error for sent activities if another handler does not return an array`, async () => { + class NoResourceResponseMiddleware { + async onTurn(context, next) { + context.onSendActivities(async (context, activities, next) => { + // In SendActivitiesHandlers developers are supposed to call: + // return next(); + // If this is not returned, then the next handler will not have the ResourceResponses[]. + const responses = await next(); + + return {}; + }); + + // Run the bot's application logic. + await next(); + } + } + + let conversationId = null; + const transcriptStore = new MemoryTranscriptStore(); + const adapter = new TestAdapter(async (context) => { + conversationId = context.activity.conversation.id; + await context.sendActivity(`echo:${context.activity.text}`); + }); + + // Register both middleware + adapter.use(new TranscriptLoggerMiddleware(transcriptStore)); + adapter.use(new NoResourceResponseMiddleware()); + + await adapter.send('foo') + .assertReply('echo:foo'); + + const pagedResult = await transcriptStore.getTranscriptActivities('test', conversationId); + assert.equal(pagedResult.items.length, 2); + assert.equal(pagedResult.items[0].text, 'foo'); + assert.equal(pagedResult.items[1].text, 'echo:foo'); + pagedResult.items.forEach(a => { + assert(a.id); + assert(a.timestamp); + }); + }); + + it(`should not error when logging sent activities and return the actual value from next()`, async () => { + // This middleware should receive 1 from `next()` + class AssertionMiddleware { + async onTurn(context, next) { + context.onSendActivities(async (context, activities, next) => { + const notResourceResponses = await next(); + assert.strictEqual(notResourceResponses, 1); + }); + + await next(); + } + } + // This middleware returns the value 1 from its registered SendActivitiesHandler. + // The TranscriptLoggerMiddleware should return this value to the next registered SendActivitiesHandler. + class Returns1Middleware { + async onTurn(context, next) { + context.onSendActivities(async (context, activities, next) => { + // In SendActivitiesHandlers developers are supposed to call: + // return next(); + // If this is not returned, then the next handler will not have the ResourceResponses[]. + const responses = await next(); + + return 1; + }); + + await next(); + } + } + + let conversationId = null; + const transcriptStore = new MemoryTranscriptStore(); + const adapter = new TestAdapter(async (context) => { + conversationId = context.activity.conversation.id; + await context.sendActivity(`echo:${context.activity.text}`); + }); + + adapter.use(new AssertionMiddleware()); + adapter.use(new TranscriptLoggerMiddleware(transcriptStore)); + adapter.use(new Returns1Middleware()); + + await adapter.send('foo') + .assertReply('echo:foo'); + + const pagedResult = await transcriptStore.getTranscriptActivities('test', conversationId); + assert.equal(pagedResult.items.length, 2); + assert.equal(pagedResult.items[0].text, 'foo'); + assert.equal(pagedResult.items[1].text, 'echo:foo'); + pagedResult.items.forEach(a => { + assert(a.id); + assert(a.timestamp); + }); + }); + describe('\'s error handling', function () { const originalConsoleError = console.error; @@ -241,10 +434,11 @@ describe(`TranscriptLoggerMiddleware`, function () { }) done(); }); - }); + }) + .catch(err => done(err)); }); - it(`should use outgoing activity's timestamp for activity id when activity id and resourceResponse is empty`, function(done) { + it(`should use outgoing activity's timestamp for activity id when activity id and resourceResponse is empty`, async () => { let conversationId, timestamp; const transcriptStore = new MemoryTranscriptStore(); @@ -265,37 +459,31 @@ describe(`TranscriptLoggerMiddleware`, function () { text: 'foo' }; - adapter - .send(fooActivity) + await adapter.send(fooActivity) // sent activities do not contain the id returned from the service, so it should be undefined here .assertReply(activity => { assert.equal(activity.id, undefined); assert.equal(activity.text, 'echo:foo'); assert.equal(activity.timestamp, timestamp); - }) - .then(() => { - transcriptStore.getTranscriptActivities('test', conversationId).then(pagedResult => { - assert.equal(pagedResult.items.length, 2); - assert.equal(pagedResult.items[0].text, 'foo'); - // Transcript activities should have the id present on the activity when received - assert.equal(pagedResult.items[0].id, 'testFooId'); - - assert.equal(pagedResult.items[1].text, 'echo:foo'); - // Sent Activities in the transcript store should use the timestamp on the bot's outgoing activities - // to log the activity when the following cases are true: - // 1. The outgoing Activity.id is falsey - // 2. The ResourceResponse.id is falsey (some channels may not emit a ResourceResponse with an id value) - // 3. The outgoing Activity.timestamp exists - // Activity.Id ends with Activity.Timestamp and generated id starts with 'g_' - assert.ok(pagedResult.items[1].id.endsWith(timestamp.getTime().toString())); - assert.ok(pagedResult.items[1].id.startsWith('g_')); - //assert.equal(pagedResult.items[1].id, timestamp.getTime().toString()); - pagedResult.items.forEach(a => { - assert(a.timestamp); - }); - done(); - }); }); + + const pagedResult = await transcriptStore.getTranscriptActivities('test', conversationId); + assert.equal(pagedResult.items.length, 2); + assert.equal(pagedResult.items[0].text, 'foo'); + // Transcript activities should have the id present on the activity when received + assert.equal(pagedResult.items[0].id, 'testFooId'); + + assert.equal(pagedResult.items[1].text, 'echo:foo'); + // Sent Activities in the transcript store should use the timestamp on the bot's outgoing activities + // to log the activity when the following cases are true: + // 1. The outgoing Activity.id is falsey + // 2. The ResourceResponse.id is falsey (some channels may not emit a ResourceResponse with an id value) + // 3. The outgoing Activity.timestamp exists + assert(pagedResult.items[1].id.indexOf(timestamp.getTime().toString())); + assert(pagedResult.items[1].id.startsWith('g_')); + pagedResult.items.forEach(a => { + assert(a.timestamp); + }); }); it(`should use current server time for activity id when activity and resourceResponse id is empty and no activity timestamp exists`, function(done) { diff --git a/libraries/botbuilder-dialogs/tests/confirmPrompt.test.js b/libraries/botbuilder-dialogs/tests/confirmPrompt.test.js index a24efe0b3d..18b0b214d6 100644 --- a/libraries/botbuilder-dialogs/tests/confirmPrompt.test.js +++ b/libraries/botbuilder-dialogs/tests/confirmPrompt.test.js @@ -521,7 +521,7 @@ describe('ConfirmPrompt', function () { .assertReply(`The result found is 'true'.`); }); - it('should recogize valid number and default to en if locale is null.', async function () { + it('should recognize valid number and default to en if locale is null.', async function () { const adapter = new TestAdapter(async (turnContext) => { turnContext.activity.locale = null; diff --git a/libraries/botbuilder/package.json b/libraries/botbuilder/package.json index 3ba74d9fa7..037f3b5981 100644 --- a/libraries/botbuilder/package.json +++ b/libraries/botbuilder/package.json @@ -24,6 +24,7 @@ "@types/node": "^10.12.18", "botbuilder-core": "4.1.6", "botframework-connector": "4.1.6", + "botframework-streaming": "4.1.6", "filenamify": "^4.1.0", "fs-extra": "^7.0.1" }, @@ -40,7 +41,7 @@ "uuid": "^3.3.2" }, "scripts": { - "test": "tsc && nyc mocha tests/", + "test": "tsc && nyc mocha --recursive \"tests/**/*.test.js\"", "build": "tsc", "build-docs": "typedoc --theme markdown --entryPoint botbuilder --excludePrivate --includeDeclarations --ignoreCompilerErrors --module amd --out ..\\..\\doc\\botbuilder .\\lib\\index.d.ts ..\\botbuilder-core\\lib\\index.d.ts ..\\botframework-schema\\lib\\index.d.ts --hideGenerator --name \"Bot Builder SDK\" --readme none", "clean": "erase /q /s .\\lib", diff --git a/libraries/botbuilder/src/botFrameworkAdapter.ts b/libraries/botbuilder/src/botFrameworkAdapter.ts index cd94439f04..2fe5b14d2b 100644 --- a/libraries/botbuilder/src/botFrameworkAdapter.ts +++ b/libraries/botbuilder/src/botFrameworkAdapter.ts @@ -6,9 +6,14 @@ * Licensed under the MIT License. */ +import { STATUS_CODES } from 'http'; +import * as os from 'os'; + import { Activity, ActivityTypes, BotAdapter, BotCallbackHandlerKey, ChannelAccount, ConversationAccount, ConversationParameters, ConversationReference, ConversationsResult, IUserTokenProvider, ResourceResponse, TokenResponse, TurnContext } from 'botbuilder-core'; import { AuthenticationConstants, ChannelValidation, ConnectorClient, EmulatorApiClient, GovernmentConstants, GovernmentChannelValidation, JwtTokenValidation, MicrosoftAppCredentials, AppCredentials, CertificateAppCredentials, SimpleCredentialProvider, TokenApiClient, TokenStatus, TokenApiModels } from 'botframework-connector'; -import * as os from 'os'; +import { INodeBuffer, INodeSocket, IReceiveRequest, ISocket, IStreamingTransportServer, NamedPipeServer, NodeWebSocketFactory, NodeWebSocketFactoryBase, RequestHandler, StreamingResponse, WebSocketServer } from 'botframework-streaming'; + +import { StreamingHttpClient, TokenResolver } from './streaming'; export enum StatusCodes { OK = 200, @@ -129,6 +134,11 @@ export interface BotFrameworkAdapterSettings { */ channelService?: string; + /** + * Optional. Used to pass in a NodeWebSocketFactoryBase instance. + */ + webSocketFactory?: NodeWebSocketFactoryBase; + /** * Optional. Certificate thumbprint to authenticate the appId against AAD. */ @@ -170,6 +180,13 @@ export const USER_AGENT: string = `Microsoft-BotFramework/3.1 BotBuilder/${ pjso const OAUTH_ENDPOINT = 'https://api.botframework.com'; const US_GOV_OAUTH_ENDPOINT = 'https://api.botframework.azure.us'; +// Streaming-specific constants +const defaultPipeName = 'bfv4.pipes'; +const VERSION_PATH: string = '/api/version'; +const MESSAGES_PATH: string = '/api/messages'; +const GET: string = 'GET'; +const POST: string = 'POST'; + // This key is exported internally so that the TeamsActivityHandler will not overwrite any already set InvokeResponses. export const INVOKE_RESPONSE_KEY: symbol = Symbol('invokeResponse'); @@ -207,13 +224,18 @@ export const INVOKE_RESPONSE_KEY: symbol = Symbol('invokeResponse'); * }; * ``` */ -export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvider { +export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvider, RequestHandler { protected readonly credentials: AppCredentials; protected readonly credentialsProvider: SimpleCredentialProvider; protected readonly settings: BotFrameworkAdapterSettings; private isEmulatingOAuthCards: boolean; + // Streaming-specific properties: + private logic: (context: TurnContext) => Promise; + private streamingServer: IStreamingTransportServer; + private webSocketFactory: NodeWebSocketFactoryBase; + /** * Creates a new instance of the [BotFrameworkAdapter](xref:botbuilder.BotFrameworkAdapter) class. * @@ -260,6 +282,11 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide this.credentials.oAuthScope = GovernmentConstants.ToChannelFromBotOAuthScope; } + // If a NodeWebSocketFactoryBase was passed in, set it on the BotFrameworkAdapter. + if (this.settings.webSocketFactory) { + this.webSocketFactory = this.settings.webSocketFactory; + } + // Relocate the tenantId field used by MS Teams to a new location (from channelData to conversation) // This will only occur on activities from teams that include tenant info in channelData but NOT in conversation, // thus should be future friendly. However, once the the transition is complete. we can remove this. @@ -737,6 +764,7 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide * ``` */ public async processActivity(req: WebRequest, res: WebResponse, logic: (context: TurnContext) => Promise): Promise { + let body: any; let status: number; let processError: Error; @@ -872,6 +900,9 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide if (!activity.conversation || !activity.conversation.id) { throw new Error(`BotFrameworkAdapter.sendActivity(): missing conversation id.`); } + if (activity && BotFrameworkAdapter.isStreamingServiceUrl(activity.serviceUrl)) { + TokenResolver.checkForOAuthCards(this, context, activity as Activity); + } const client: ConnectorClient = this.createConnectorClient(activity.serviceUrl); if (activity.type === 'trace' && activity.channelId !== 'emulator') { // Just eat activity @@ -930,6 +961,23 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide * Override this in a derived class to create a mock connector client for unit testing. */ public createConnectorClient(serviceUrl: string): ConnectorClient { + if (BotFrameworkAdapter.isStreamingServiceUrl(serviceUrl)) { + + // Check if we have a streaming server. Otherwise, requesting a connector client + // for a non-existent streaming connection results in an error + if (!this.streamingServer) { + throw new Error(`Cannot create streaming connector client for serviceUrl ${serviceUrl} without a streaming connection. Call 'useWebSocket' or 'useNamedPipe' to start a streaming connection.`) + } + + return new ConnectorClient( + this.credentials, + { + baseUri: serviceUrl, + userAgent: USER_AGENT, + httpClient: new StreamingHttpClient(this.streamingServer) + }); + } + const client: ConnectorClient = new ConnectorClient(this.credentials, { baseUri: serviceUrl, userAgent: USER_AGENT} ); return client; } @@ -1007,6 +1055,217 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide protected createContext(request: Partial): TurnContext { return new TurnContext(this as any, request); } + + /** + * Checks the validity of the request and attempts to map it the correct virtual endpoint, + * then generates and returns a response if appropriate. + * @param request A ReceiveRequest from the connected channel. + * @returns A response created by the BotAdapter to be sent to the client that originated the request. + */ + public async processRequest(request: IReceiveRequest): Promise { + let response = new StreamingResponse(); + + if (!request) { + response.statusCode = StatusCodes.BAD_REQUEST; + response.setBody(`No request provided.`); + return response; + } + + if (!request.verb || !request.path) { + response.statusCode = StatusCodes.BAD_REQUEST; + response.setBody(`Request missing verb and/or path. Verb: ${ request.verb }. Path: ${ request.path }`); + return response; + } + + if (request.verb.toLocaleUpperCase() !== POST && request.verb.toLocaleUpperCase() !== GET) { + response.statusCode = StatusCodes.METHOD_NOT_ALLOWED; + response.setBody(`Invalid verb received. Only GET and POST are accepted. Verb: ${ request.verb }`); + } + + if (request.path.toLocaleLowerCase() === VERSION_PATH) { + return await this.handleVersionRequest(request, response); + } + + // Convert the StreamingRequest into an activity the Adapter can understand. + let body: Activity; + try { + body = await this.readRequestBodyAsString(request); + + } catch (error) { + response.statusCode = StatusCodes.BAD_REQUEST; + response.setBody(`Request body missing or malformed: ${ error }`); + return response; + } + + if (request.path.toLocaleLowerCase() !== MESSAGES_PATH) { + response.statusCode = StatusCodes.NOT_FOUND; + response.setBody(`Path ${ request.path.toLocaleLowerCase() } not not found. Expected ${ MESSAGES_PATH }}.`); + return response; + } + + if (request.verb.toLocaleUpperCase() !== POST) { + response.statusCode = StatusCodes.METHOD_NOT_ALLOWED; + response.setBody(`Invalid verb received for ${ request.verb.toLocaleLowerCase() }. Only GET and POST are accepted. Verb: ${ request.verb }`); + return response; + } + + try { + let context = new TurnContext(this, body); + await this.runMiddleware(context, this.logic); + + if (body.type === ActivityTypes.Invoke) { + let invokeResponse: any = context.turnState.get(INVOKE_RESPONSE_KEY); + + if (invokeResponse && invokeResponse.value) { + const value: InvokeResponse = invokeResponse.value; + response.statusCode = value.status; + response.setBody(value.body); + } else { + response.statusCode = StatusCodes.NOT_IMPLEMENTED; + } + } else { + response.statusCode = StatusCodes.OK; + } + } catch (error) { + response.statusCode = StatusCodes.INTERNAL_SERVER_ERROR; + response.setBody(error); + return response; + } + + return response; + } + + /** + * Connects the handler to a Named Pipe server and begins listening for incoming requests. + * @param pipeName The name of the named pipe to use when creating the server. + * @param logic The logic that will handle incoming requests. + */ + public async useNamedPipe(logic: (context: TurnContext) => Promise, pipeName: string = defaultPipeName): Promise { + if (!logic) { + throw new Error('Bot logic needs to be provided to `useNamedPipe`'); + } + + this.logic = logic; + + this.streamingServer = new NamedPipeServer(pipeName, this); + await this.streamingServer.start(); + } + + /** + * Process the initial request to establish a long lived connection via a streaming server. + * @param req The connection request. + * @param socket The raw socket connection between the bot (server) and channel/caller (client). + * @param head The first packet of the upgraded stream. + * @param logic The logic that handles incoming streaming requests for the lifetime of the WebSocket connection. + */ + public async useWebSocket(req: WebRequest, socket: INodeSocket, head: INodeBuffer, logic: (context: TurnContext) => Promise): Promise { + // Use the provided NodeWebSocketFactoryBase on BotFrameworkAdapter construction, + // otherwise create a new NodeWebSocketFactory. + const webSocketFactory = this.webSocketFactory || new NodeWebSocketFactory(); + + if (!logic) { + throw new Error('Streaming logic needs to be provided to `useWebSocket`'); + } + + this.logic = logic; + + try { + await this.authenticateConnection(req, this.settings.channelService); + } catch (err) { + // If the authenticateConnection call fails, send back the correct error code and close + // the connection. + if (typeof(err.message) === 'string' && err.message.toLowerCase().startsWith('unauthorized')) { + abortWebSocketUpgrade(socket, 401); + } else if (typeof(err.message) === 'string' && err.message.toLowerCase().startsWith(`'authheader'`)) { + abortWebSocketUpgrade(socket, 400); + } else { + abortWebSocketUpgrade(socket, 500); + } + + // Re-throw the error so the developer will know what occurred. + throw err; + } + + const nodeWebSocket = await webSocketFactory.createWebSocket(req, socket, head); + + await this.startWebSocket(nodeWebSocket); + } + + private async authenticateConnection(req: WebRequest, channelService?: string): Promise { + if (!this.credentials.appId) { + // auth is disabled + return; + } + + const authHeader: string = req.headers.authorization || req.headers.Authorization || ''; + const channelIdHeader: string = req.headers.channelid || req.headers.ChannelId || req.headers.ChannelID || ''; + // Validate the received Upgrade request from the channel. + const claims = await JwtTokenValidation.validateAuthHeader(authHeader, this.credentialsProvider, channelService, channelIdHeader); + + // Add serviceUrl from claim to static cache to trigger token refreshes. + const serviceUrl = claims.getClaimValue(AuthenticationConstants.ServiceUrlClaim); + MicrosoftAppCredentials.trustServiceUrl(serviceUrl); + + if (!claims.isAuthenticated) { throw new Error('Unauthorized Access. Request is not authorized'); } + } + + /** + * Connects the handler to a WebSocket server and begins listening for incoming requests. + * @param socket The socket to use when creating the server. + */ + private async startWebSocket(socket: ISocket): Promise{ + this.streamingServer = new WebSocketServer(socket, this); + await this.streamingServer.start(); + } + + private async readRequestBodyAsString(request: IReceiveRequest): Promise { + const contentStream = request.streams[0]; + return await contentStream.readAsJson(); + } + + private async handleVersionRequest(request: IReceiveRequest, response: StreamingResponse): Promise { + if (request.verb.toLocaleUpperCase() === GET) { + response.statusCode = StatusCodes.OK; + + if (!this.credentials.appId) { + response.setBody({ UserAgent: USER_AGENT }); + return response; + } + + let token = ''; + try { + token = await this.credentials.getToken(); + + } catch (err) { + /** + * In reality a missing BotToken will cause the channel to close the connection, + * but we still send the response and allow the channel to make that decision + * instead of proactively disconnecting. This allows the channel to know why + * the connection has been closed and make the choice not to make endless reconnection + * attempts that will end up right back here. + */ + console.error(err.message); + } + response.setBody({ UserAgent: USER_AGENT, BotToken: token }); + + } else { + response.statusCode = StatusCodes.METHOD_NOT_ALLOWED; + response.setBody(`Invalid verb received for path: ${ request.path }. Only GET is accepted. Verb: ${ request.verb }`); + } + + return response; + } + + /** + * Determine if the serviceUrl was sent via an Http/Https connection or Streaming + * This can be determined by looking at the ServiceUrl property: + * (1) All channels that send messages via http/https are not streaming + * (2) Channels that send messages via streaming have a ServiceUrl that does not begin with http/https. + * @param serviceUrl the serviceUrl provided in the resquest. + */ + private static isStreamingServiceUrl(serviceUrl: string): boolean { + return serviceUrl && !serviceUrl.toLowerCase().startsWith('http'); + } } /** @@ -1052,4 +1311,13 @@ function delay(timeout: number): Promise { return new Promise((resolve) => { setTimeout(resolve, timeout); }); +} + +function abortWebSocketUpgrade(socket: INodeSocket, code: number) { + if (socket.writable) { + const connectionHeader = `Connection: 'close'\r\n`; + socket.write(`HTTP/1.1 ${code} ${STATUS_CODES[code]}\r\n${connectionHeader}\r\n`); + } + + socket.destroy(); } \ No newline at end of file diff --git a/libraries/botbuilder/src/index.ts b/libraries/botbuilder/src/index.ts index 4c82ac3b65..1286d564f3 100644 --- a/libraries/botbuilder/src/index.ts +++ b/libraries/botbuilder/src/index.ts @@ -19,6 +19,7 @@ export { BotFrameworkHttpClient } from './botFrameworkHttpClient'; export { ChannelServiceHandler } from './channelServiceHandler'; export * from './fileTranscriptStore'; export * from './inspectionMiddleware'; +export * from './streaming'; export * from './teamsActivityHandler'; export * from './teamsActivityHelpers'; export * from './teamsInfo'; diff --git a/libraries/botframework-streaming/src/adapters/index.ts b/libraries/botbuilder/src/streaming/index.ts similarity index 69% rename from libraries/botframework-streaming/src/adapters/index.ts rename to libraries/botbuilder/src/streaming/index.ts index a1913a5c47..e13ad8a6a3 100644 --- a/libraries/botframework-streaming/src/adapters/index.ts +++ b/libraries/botbuilder/src/streaming/index.ts @@ -1,11 +1,10 @@ /** - * @module botframework-streaming + * @module botbuilder */ /** * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. */ -export * from './streamingAdapter'; export * from './streamingHttpClient'; export * from './tokenResolver'; diff --git a/libraries/botframework-streaming/src/adapters/streamingHttpClient.ts b/libraries/botbuilder/src/streaming/streamingHttpClient.ts similarity index 89% rename from libraries/botframework-streaming/src/adapters/streamingHttpClient.ts rename to libraries/botbuilder/src/streaming/streamingHttpClient.ts index 1f7847fd02..e289a7bfc3 100644 --- a/libraries/botframework-streaming/src/adapters/streamingHttpClient.ts +++ b/libraries/botbuilder/src/streaming/streamingHttpClient.ts @@ -1,15 +1,13 @@ /** - * @module botframework-streaming + * @module botbuilder */ /** * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. */ -import { WebResource, HttpOperationResponse, HttpClient } from '@azure/ms-rest-js'; - -import { IStreamingTransportServer } from '../interfaces'; -import { StreamingRequest } from '../streamingRequest'; +import { WebResource, HttpOperationResponse, HttpClient } from 'botframework-connector/node_modules/@azure/ms-rest-js'; +import { IStreamingTransportServer, StreamingRequest } from 'botframework-streaming'; export class StreamingHttpClient implements HttpClient { private readonly server: IStreamingTransportServer; diff --git a/libraries/botframework-streaming/src/adapters/tokenResolver.ts b/libraries/botbuilder/src/streaming/tokenResolver.ts similarity index 96% rename from libraries/botframework-streaming/src/adapters/tokenResolver.ts rename to libraries/botbuilder/src/streaming/tokenResolver.ts index 4f5546706f..9db3fce193 100644 --- a/libraries/botframework-streaming/src/adapters/tokenResolver.ts +++ b/libraries/botbuilder/src/streaming/tokenResolver.ts @@ -1,12 +1,12 @@ /** - * @module botframework-streaming + * @module botbuilder */ /** * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. */ -import { BotFrameworkAdapter } from 'botbuilder'; +import { BotFrameworkAdapter } from '../botFrameworkAdapter'; import { Activity, ActivityTypes, diff --git a/libraries/botbuilder/tests/streaming/botFrameworkAdapterStreaming.test.js b/libraries/botbuilder/tests/streaming/botFrameworkAdapterStreaming.test.js new file mode 100644 index 0000000000..a75afc2c7d --- /dev/null +++ b/libraries/botbuilder/tests/streaming/botFrameworkAdapterStreaming.test.js @@ -0,0 +1,318 @@ +const { Socket } = require('net'); + +const { expect } = require('chai'); +const { spy } = require('sinon'); +const { ActivityHandler, ActivityTypes } = require('botbuilder-core'); + +const { BotFrameworkAdapter, StatusCodes } = require('../../'); + +// Import Helper Classes +const { MockHttpRequest } = require('./mockHttpRequest'); +const { MockNetSocket } = require('./mockNetSocket'); +const { MockContentStream, MockStreamingRequest } = require('./mockStreamingRequest'); + +const createNetSocket = (readable = true, writable = true) => { + return new Socket({ readable, writable }); +}; + +class TestAdapterSettings { + constructor(appId, appPassword) { + this.appId = appId; + this.appPassword = appPassword; + } +} + +describe('BotFrameworkAdapter Streaming tests', () => { + + it('has the correct status codes', () => { + expect(StatusCodes.OK).to.equal(200); + expect(StatusCodes.BAD_REQUEST).to.equal(400); + expect(StatusCodes.UNAUTHORIZED).to.equal(401); + expect(StatusCodes.NOT_FOUND).to.equal(404); + expect(StatusCodes.METHOD_NOT_ALLOWED).to.equal(405); + expect(StatusCodes.UPGRADE_REQUIRED).to.equal(426); + expect(StatusCodes.INTERNAL_SERVER_ERROR).to.equal(500); + expect(StatusCodes.NOT_IMPLEMENTED).to.equal(501); + }); + + it('gets constructed properly', () => { + const adapter = new BotFrameworkAdapter(); + + expect(adapter).to.be.instanceOf(BotFrameworkAdapter); + }); + + it('starts and stops a namedpipe server', () => { + const adapter = new BotFrameworkAdapter(); + + adapter.useNamedPipe('PipeyMcPipeface', async (context) => { + await bot.run(context); + }); + expect(adapter.streamingServer.disconnect()).to.not.throw; + }); + + it('starts and stops a websocket server', async () => { + const bot = new ActivityHandler(); + const adapter = new BotFrameworkAdapter(new TestAdapterSettings()); + const request = new MockHttpRequest(); + const realSocket = createNetSocket(); + + await adapter.useWebSocket(request, realSocket, Buffer.from([]), async (context) => { + await bot.run(context); + }); + }); + + it('returns a connector client', async () => { + const bot = new ActivityHandler(); + const adapter = new BotFrameworkAdapter(new TestAdapterSettings()); + const request = new MockHttpRequest(); + const realSocket = createNetSocket(); + + await adapter.useWebSocket(request, realSocket, Buffer.from([]), async (context) => { + await bot.run(context); + }); + const cc = adapter.createConnectorClient('urn:test'); + expect(cc.baseUri).to.equal('urn:test'); + }); + + describe('useWebSocket()', () => { + it('connects', async () => { + const bot = new ActivityHandler(); + const adapter = new BotFrameworkAdapter(new TestAdapterSettings()); + const request = new MockHttpRequest(); + const realSocket = createNetSocket(); + + const writeSpy = spy(realSocket, 'write'); + await adapter.useWebSocket(request, realSocket, Buffer.from([]), async (context) => { + await bot.run(context); + }); + expect(writeSpy.called).to.be.true; + }); + + it('returns status code 401 when request is not authorized', async () => { + const bot = new ActivityHandler(); + const settings = new TestAdapterSettings('appId', 'password'); + const adapter = new BotFrameworkAdapter(settings); + const request = new MockHttpRequest(); + request.setHeader('authorization', 'donttustme'); + + const socket = new MockNetSocket(); + const writeSpy = spy(socket, 'write'); + const destroySpy = spy(socket, 'destroy'); + + await adapter.useWebSocket(request, socket, Buffer.from([]), async (context) => { + await bot.run(context); + throw new Error('useWebSocket should have thrown an error'); + }).catch(err => { + expect(err.message).to.equal('Unauthorized. No valid identity.'); + const socketResponse = MockNetSocket.createNonSuccessResponse(401); + expect(writeSpy.called).to.be.true; + expect(writeSpy.calledWithExactly(socketResponse)).to.be.true; + expect(destroySpy.calledOnceWithExactly()).to.be.true; + }); + }); + }); + + describe('processRequest()', () => { + it('returns a 400 when the request is missing verb', async () => { + const adapter = new BotFrameworkAdapter(); + const request = new MockStreamingRequest({ + verb: undefined + }); + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(400); + }); + + it('returns a 400 when the request is missing path', async () => { + const adapter = new BotFrameworkAdapter(); + const request = new MockStreamingRequest({ + path: undefined, + }); + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(400); + }); + + it('returns a 400 when the request body is missing', async () => { + const adapter = new BotFrameworkAdapter(); + const request = new MockStreamingRequest({ + streams: undefined + }); + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(400); + }); + + it('returns user agent information when a GET hits the version endpoint', async () => { + const adapter = new BotFrameworkAdapter(); + const request = new MockStreamingRequest({ + path: '/api/version', + verb: 'GET' + }); + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(200); + expect(response.streams[0].content).to.not.be.undefined; + }); + + it('returns user agent information from cache when a GET hits the version endpoint more than once', async () => { + const adapter = new BotFrameworkAdapter(); + const request = new MockStreamingRequest({ + path: '/api/version', + verb: 'GET' + }); + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(200); + expect(response.streams[0].content).to.not.be.undefined; + + const response2 = await adapter.processRequest(request); + expect(response2.statusCode).to.equal(200); + expect(response2.streams[0].content).to.not.be.undefined; + }); + + it('should return 405 for unsupported methods to valid paths', async () => { + const adapter = new BotFrameworkAdapter(); + const request = new MockStreamingRequest({ + path: '/api/version', + verb: 'UPDATE' + }); + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(405); + }); + + it('should return 404 for unsupported paths with valid methods', async () => { + const adapter = new BotFrameworkAdapter(); + const request = new MockStreamingRequest({ + path: '/api/supersecretbackdoor' + }); + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(404); + }); + + it('processes a well formed request when there is no middleware with a non-Invoke activity type', async () => { + const bot = new ActivityHandler(); + const adapter = new BotFrameworkAdapter(); + const mockStream = async () => ({ type: ActivityTypes.Message, serviceUrl: 'somewhere/', channelId: 'test' }); + const request = new MockStreamingRequest({ + streams: [new MockContentStream({ readAsJson: mockStream })] + }); + + adapter.logic = async (context) => { + await bot.run(context); + }; + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(200); + }); + + it('returns a 501 when activity type is invoke, but the activity is invalid', async () => { + const bot = new ActivityHandler(); + const adapter = new BotFrameworkAdapter(); + const request = new MockStreamingRequest(); + + adapter.logic = async (context) => { + await bot.run(context); + }; + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(501); + }); + + it('returns a 500 when BotFrameworkAdapter.logic is not callable', async () => { + const adapter = new BotFrameworkAdapter(); + const request = new MockStreamingRequest(); + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(500); + }); + + it('returns a 500 and calls middleware when BotFrameworkAdapter.logic is not callable', async () => { + let middlewareCalled = false; + const middleware = { + async onTurn(context, next) { + middlewareCalled = true; + await next(); + } + }; + + const adapter = new BotFrameworkAdapter(); + adapter.use(middleware); + const request = new MockStreamingRequest(); + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(500); + expect(middlewareCalled).to.be.true; + }); + + it('executes middleware', async () => { + const bot = new ActivityHandler(); + const adapter = new BotFrameworkAdapter(); + + let middlewareCalled = false; + const middleware = { + async onTurn(context, next) { + middlewareCalled = true; + return next(); + } + }; + + adapter.use(middleware); + + const runSpy = spy(bot, 'run'); + const request = new MockStreamingRequest(); + + adapter.logic = async (context) => { + await bot.run(context); + }; + + const response = await adapter.processRequest(request); + expect(response.statusCode).to.equal(501); + expect(runSpy.called).to.be.true; + expect(middlewareCalled).to.be.true; + }); + }); + + it('sends a request', async () => { + const bot = new ActivityHandler(); + const adapter = new BotFrameworkAdapter(new TestAdapterSettings()); + const request = new MockHttpRequest(); + const realSocket = createNetSocket(); + + const writeSpy = spy(realSocket, 'write'); + + await adapter.useWebSocket(request, realSocket, Buffer.from([]), async (context) => { + await bot.run(context); + }); + + const connection = adapter.createConnectorClient('fakeUrl'); + connection.sendRequest({ method: 'POST', url: 'testResultDotCom', body: 'Test body!' }); + expect(writeSpy.called).to.be.true; + }).timeout(2000); + + describe('private methods', () => { + it('should identify streaming connections', function () { + const serviceUrls = [ + 'urn:botframework:WebSocket:wss://beep.com', + 'URN:botframework:WebSocket:http://beep.com', + ]; + + serviceUrls.forEach(serviceUrl => { + expect(BotFrameworkAdapter.isStreamingServiceUrl(serviceUrl)).to.be.true; + }); + }); + + it('should identify http connections', function () { + const serviceUrls = [ + 'http://yayay.com', + 'HTTPS://yayay.com', + ]; + + serviceUrls.forEach(serviceUrl => { + expect(BotFrameworkAdapter.isStreamingServiceUrl(serviceUrl)).to.be.false; + }); + }); + }); +}); diff --git a/libraries/botbuilder/tests/streaming/mockHttpRequest.js b/libraries/botbuilder/tests/streaming/mockHttpRequest.js new file mode 100644 index 0000000000..c118b24591 --- /dev/null +++ b/libraries/botbuilder/tests/streaming/mockHttpRequest.js @@ -0,0 +1,32 @@ +const { randomBytes } = require('crypto'); + +class MockHttpRequest { + constructor(options = {}) { + const config = Object.assign({ + method: 'GET', + headers: { + 'upgrade': 'websocket', + 'sec-websocket-key': randomBytes(16).toString('base64'), + 'sec-websocket-version': '13', + 'sec-websocket-protocol': '' + } + }, options); + + this.method = config.method; + this.headers = config.headers; + } + + setHeader(key, value) { + this.headers[key] = value; + } + + streams(value) { + this.streamsVal = value; + } + + streams() { + return this.streamsVal; + } +} + +module.exports.MockHttpRequest = MockHttpRequest; diff --git a/libraries/botbuilder/tests/streaming/mockNetSocket.js b/libraries/botbuilder/tests/streaming/mockNetSocket.js new file mode 100644 index 0000000000..04379ff575 --- /dev/null +++ b/libraries/botbuilder/tests/streaming/mockNetSocket.js @@ -0,0 +1,18 @@ +const { STATUS_CODES } = require('http'); + +class MockNetSocket { + constructor(readable = true, writable = true) { + this.readable = readable; + this.writable = writable; + } + + write(response) { } + + destroy(err) { } +} + +MockNetSocket.createNonSuccessResponse = (code) => { + return `HTTP/1.1 ${code} ${STATUS_CODES[code]}\r\nConnection: 'close'\r\n\r\n`; +}; + +module.exports.MockNetSocket = MockNetSocket; diff --git a/libraries/botbuilder/tests/streaming/mockStreamingRequest.js b/libraries/botbuilder/tests/streaming/mockStreamingRequest.js new file mode 100644 index 0000000000..6cdbdad6e8 --- /dev/null +++ b/libraries/botbuilder/tests/streaming/mockStreamingRequest.js @@ -0,0 +1,30 @@ +const { ActivityTypes } = require('botbuilder-core'); + +// Mock of botframework-streaming/src/ContentStream +class MockContentStream { + constructor(options = {}) { + const config = Object.assign({ + readAsJson: async () => ({ type: ActivityTypes.Invoke, serviceUrl: 'somewhere/', channelId: 'test' }) + }, options); + + this.readAsJson = config.readAsJson; + } +} + +// Mock of botframework-streaming/src/interfaces/IReceiveRequest +class MockStreamingRequest { + constructor(options = {}) { + const config = Object.assign({ + verb: 'POST', + path: '/api/messages', + streams: [new MockContentStream()] + }, options); + + this.verb = config.verb; + this.path = config.path; + this.streams = config.streams; + } +} + +module.exports.MockContentStream = MockContentStream; +module.exports.MockStreamingRequest = MockStreamingRequest; diff --git a/libraries/botframework-streaming/tests/tokenResolver.test.js b/libraries/botbuilder/tests/streaming/tokenResolver.test.js similarity index 95% rename from libraries/botframework-streaming/tests/tokenResolver.test.js rename to libraries/botbuilder/tests/streaming/tokenResolver.test.js index a621c0158f..2f475cbfa3 100644 --- a/libraries/botframework-streaming/tests/tokenResolver.test.js +++ b/libraries/botbuilder/tests/streaming/tokenResolver.test.js @@ -1,266 +1,266 @@ -const assert = require('assert'); -const { TurnContext, CardFactory, BotCallbackHandlerKey } = require('botbuilder-core'); -const { StreamingAdapter, TokenResolver } = require('../'); - -class MockAdapter extends StreamingAdapter { - constructor(botLogic, getUserTokenCallback) { - super(undefined); - - this.botLogic = async (ctx) => { botLogic(ctx); }; - this.getUserTokenCallback = getUserTokenCallback; - } - - createTurnContext(activity) { - const context = new TurnContext(this, activity); - context.turnState.set(BotCallbackHandlerKey, this.botLogic); - return context; - } - - getUserToken(context, connectionName, magicCode) { - return Promise.resolve(this.getUserTokenCallback()); - } -} - -function createOAuthCardActivity() { - let activity = { - activityId: '1234', - channelId: 'test', - serviceUrl: 'urn:botframework.com:websocket:wss://channel.com/blah', - user: { id: 'user', name: 'User Name' }, - bot: { id: 'bot', name: 'Bot Name' }, - conversation: { - id: 'convo1', - properties: { - 'foo': 'bar' - } - }, - attachments: [], - }; - activity.attachments.push(CardFactory.oauthCard('foo', 'sign-in')); - return activity; -} - -describe(`TokenResolver`, function () { - this.timeout(50000000); - - it(`should throw on empty connectionName`, async function () { - const returnTokenResponse = () => { return { token: '1234', connectionName: 'foo' }; }; - const botLogic= (ctx) => { - if (ctx.activity.type === 'event' && ctx.activity.value.token) { - gotToken = true; - } - }; - const adapter = new MockAdapter(botLogic, returnTokenResponse); - const activity = createOAuthCardActivity(); - activity.attachments[0].content.connectionName = undefined; - const context = adapter.createTurnContext(activity); - - try - { - TokenResolver.checkForOAuthCards(adapter, context, activity); - assert(false, 'did not throw when should have'); - } - catch(e) - { - assert(e.message === 'The OAuthPrompt\'s ConnectionName property is missing a value.', 'did not receive token'); - } - }); - - it(`no attachements is a no-op`, async function () { - let fail = false; - const returnTokenResponse = () => { fail = true; return { token: '1234', connectionName: 'foo' }; }; - const botLogic= (ctx) => { - fail = true; - }; - const adapter = new MockAdapter(botLogic, returnTokenResponse); - const activity = createOAuthCardActivity(); - activity.attachments = []; - const context = adapter.createTurnContext(activity); - const log = []; - - TokenResolver.checkForOAuthCards(adapter, context, activity, log); - - assert(!fail, 'called bot methods'); - assert(log.length === 0, 'logged actions, should be zero'); - }); - - it(`should get the token`, async function () { - let gotToken = false; - const returnTokenResponse = () => { return { token: '1234', connectionName: 'foo' }; }; - let doneResolve, doneReject; - let done = new Promise((resolve, reject) => { - doneResolve = resolve; - doneReject = reject; - }); - const botLogic= (ctx) => { - if (ctx.activity.type === 'event' && ctx.activity.value.token) { - gotToken = true; - doneResolve('done'); - } else { - doneReject('error'); - } - }; - const adapter = new MockAdapter(botLogic, returnTokenResponse); - const activity = createOAuthCardActivity(); - const context = adapter.createTurnContext(activity); - const log = []; - - TokenResolver.checkForOAuthCards(adapter, context, activity, log); - - await done; - - assert(gotToken, 'did not receive token'); - }); - - it(`should call onTurnError with process throw Error`, async function () { - let calledOnTurnError = false; - const returnTokenResponse = () => { return { token: '1234', connectionName: 'foo' }; }; - let doneResolve, doneReject; - let done = new Promise((resolve, reject) => { - doneResolve = resolve; - doneReject = reject; - }); - const botLogic= (ctx) => { - if (ctx.activity.type === 'event' && ctx.activity.value.token) { - throw 'this is the error'; - } else { - doneReject('error'); - } - }; - const adapter = new MockAdapter(botLogic, returnTokenResponse); - adapter.onTurnError = async (context, error) => { - calledOnTurnError = true; - doneResolve('done'); - }; - const activity = createOAuthCardActivity(); - const context = adapter.createTurnContext(activity); - const log = []; - - TokenResolver.checkForOAuthCards(adapter, context, activity, log); - - await done; - - assert(calledOnTurnError, 'did not receive error'); - }); - - it(`should call onTurnError with process throw other`, async function () { - let calledOnTurnError = false; - const returnTokenResponse = () => { return { token: '1234', connectionName: 'foo' }; }; - let doneResolve, doneReject; - let done = new Promise((resolve, reject) => { - doneResolve = resolve; - doneReject = reject; - }); - const botLogic= (ctx) => { - if (ctx.activity.type === 'event' && ctx.activity.value.token) { - throw new Error('this is the error'); - } else { - doneReject('error'); - } - }; - const adapter = new MockAdapter(botLogic, returnTokenResponse); - adapter.onTurnError = async (context, error) => { - calledOnTurnError = true; - doneResolve('done'); - }; - const activity = createOAuthCardActivity(); - const context = adapter.createTurnContext(activity); - const log = []; - - TokenResolver.checkForOAuthCards(adapter, context, activity, log); - - await done; - - assert(calledOnTurnError, 'did not receive error'); - }); - - it(`should get the token on the second try`, async function () { - let gotToken = false; - let i = 0; - const returnTokenResponse = () => - { - i++; - if ( i < 2 ) - return undefined; - return { token: '1234', connectionName: 'foo' }; - }; - let doneResolve, doneReject; - let done = new Promise((resolve, reject) => { - doneResolve = resolve; - doneReject = reject; - }); - const botLogic= (ctx) => { - if (ctx.activity.type === 'event' && ctx.activity.value.token) { - gotToken = true; - doneResolve('done'); - } else { - doneReject('error'); - } - }; - const adapter = new MockAdapter(botLogic, returnTokenResponse); - const activity = createOAuthCardActivity(); - const context = adapter.createTurnContext(activity); - - TokenResolver.checkForOAuthCards(adapter, context, activity); - - await done; - - assert(gotToken, 'did not receive token'); - }); - - it(`should end polling`, async function () { - let doneResolve, doneReject; - let done = new Promise((resolve, reject) => { - doneResolve = resolve; - doneReject = reject; - }); - const returnTokenResponse = () => - { - doneResolve('done'); - return { properties: { tokenPollingSettings: { timeout: 0 } } }; - }; - const botLogic= (ctx) => { - }; - const adapter = new MockAdapter(botLogic, returnTokenResponse); - const activity = createOAuthCardActivity(); - const context = adapter.createTurnContext(activity); - const log = []; - - TokenResolver.checkForOAuthCards(adapter, context, activity, log); - - await done; - - assert(log.indexOf('End polling') !== -1, 'did not end polling'); - }); - - it(`should change interval polling`, async function () { - let doneResolve, doneReject; - let done = new Promise((resolve, reject) => { - doneResolve = resolve; - doneReject = reject; - }); - let i = 0; - const returnTokenResponse = () => - { - i++; - if (i < 2) { - return { properties: { tokenPollingSettings: { interval: 100 } } }; - } else { - doneResolve('done'); - return { properties: { tokenPollingSettings: { timeout: 0 } } }; - } - }; - const botLogic= (ctx) => { - }; - const adapter = new MockAdapter(botLogic, returnTokenResponse); - const activity = createOAuthCardActivity(); - const context = adapter.createTurnContext(activity); - const log = []; - - TokenResolver.checkForOAuthCards(adapter, context, activity, log); - - await done; - - assert(log.indexOf('Changing polling interval to 100') !== -1, 'did not end polling'); - }); -}); +const assert = require('assert'); +const { BotCallbackHandlerKey, CardFactory, TurnContext } = require('botbuilder-core'); +const { BotFrameworkAdapter, TokenResolver } = require('../../'); + +class MockAdapter extends BotFrameworkAdapter { + constructor(botLogic, getUserTokenCallback) { + super(undefined); + + this.botLogic = async (ctx) => { botLogic(ctx); }; + this.getUserTokenCallback = getUserTokenCallback; + } + + createTurnContext(activity) { + const context = new TurnContext(this, activity); + context.turnState.set(BotCallbackHandlerKey, this.botLogic); + return context; + } + + getUserToken(context, connectionName, magicCode) { + return Promise.resolve(this.getUserTokenCallback()); + } +} + +function createOAuthCardActivity() { + let activity = { + activityId: '1234', + channelId: 'test', + serviceUrl: 'urn:botframework.com:websocket:wss://channel.com/blah', + user: { id: 'user', name: 'User Name' }, + bot: { id: 'bot', name: 'Bot Name' }, + conversation: { + id: 'convo1', + properties: { + 'foo': 'bar' + } + }, + attachments: [], + }; + activity.attachments.push(CardFactory.oauthCard('foo', 'sign-in')); + return activity; +} + +describe(`TokenResolver`, function () { + this.timeout(50000000); + + it(`should throw on empty connectionName`, async function () { + const returnTokenResponse = () => { return { token: '1234', connectionName: 'foo' }; }; + const botLogic= (ctx) => { + if (ctx.activity.type === 'event' && ctx.activity.value.token) { + gotToken = true; + } + }; + const adapter = new MockAdapter(botLogic, returnTokenResponse); + const activity = createOAuthCardActivity(); + activity.attachments[0].content.connectionName = undefined; + const context = adapter.createTurnContext(activity); + + try + { + TokenResolver.checkForOAuthCards(adapter, context, activity); + assert(false, 'did not throw when should have'); + } + catch(e) + { + assert(e.message === 'The OAuthPrompt\'s ConnectionName property is missing a value.', 'did not receive token'); + } + }); + + it(`no attachements is a no-op`, async function () { + let fail = false; + const returnTokenResponse = () => { fail = true; return { token: '1234', connectionName: 'foo' }; }; + const botLogic= (ctx) => { + fail = true; + }; + const adapter = new MockAdapter(botLogic, returnTokenResponse); + const activity = createOAuthCardActivity(); + activity.attachments = []; + const context = adapter.createTurnContext(activity); + const log = []; + + TokenResolver.checkForOAuthCards(adapter, context, activity, log); + + assert(!fail, 'called bot methods'); + assert(log.length === 0, 'logged actions, should be zero'); + }); + + it(`should get the token`, async function () { + let gotToken = false; + const returnTokenResponse = () => { return { token: '1234', connectionName: 'foo' }; }; + let doneResolve, doneReject; + let done = new Promise((resolve, reject) => { + doneResolve = resolve; + doneReject = reject; + }); + const botLogic= (ctx) => { + if (ctx.activity.type === 'event' && ctx.activity.value.token) { + gotToken = true; + doneResolve('done'); + } else { + doneReject('error'); + } + }; + const adapter = new MockAdapter(botLogic, returnTokenResponse); + const activity = createOAuthCardActivity(); + const context = adapter.createTurnContext(activity); + const log = []; + + TokenResolver.checkForOAuthCards(adapter, context, activity, log); + + await done; + + assert(gotToken, 'did not receive token'); + }); + + it(`should call onTurnError with process throw Error`, async function () { + let calledOnTurnError = false; + const returnTokenResponse = () => { return { token: '1234', connectionName: 'foo' }; }; + let doneResolve, doneReject; + let done = new Promise((resolve, reject) => { + doneResolve = resolve; + doneReject = reject; + }); + const botLogic= (ctx) => { + if (ctx.activity.type === 'event' && ctx.activity.value.token) { + throw 'this is the error'; + } else { + doneReject('error'); + } + }; + const adapter = new MockAdapter(botLogic, returnTokenResponse); + adapter.onTurnError = async (context, error) => { + calledOnTurnError = true; + doneResolve('done'); + }; + const activity = createOAuthCardActivity(); + const context = adapter.createTurnContext(activity); + const log = []; + + TokenResolver.checkForOAuthCards(adapter, context, activity, log); + + await done; + + assert(calledOnTurnError, 'did not receive error'); + }); + + it(`should call onTurnError with process throw other`, async function () { + let calledOnTurnError = false; + const returnTokenResponse = () => { return { token: '1234', connectionName: 'foo' }; }; + let doneResolve, doneReject; + let done = new Promise((resolve, reject) => { + doneResolve = resolve; + doneReject = reject; + }); + const botLogic= (ctx) => { + if (ctx.activity.type === 'event' && ctx.activity.value.token) { + throw new Error('this is the error'); + } else { + doneReject('error'); + } + }; + const adapter = new MockAdapter(botLogic, returnTokenResponse); + adapter.onTurnError = async (context, error) => { + calledOnTurnError = true; + doneResolve('done'); + }; + const activity = createOAuthCardActivity(); + const context = adapter.createTurnContext(activity); + const log = []; + + TokenResolver.checkForOAuthCards(adapter, context, activity, log); + + await done; + + assert(calledOnTurnError, 'did not receive error'); + }); + + it(`should get the token on the second try`, async function () { + let gotToken = false; + let i = 0; + const returnTokenResponse = () => + { + i++; + if ( i < 2 ) + return undefined; + return { token: '1234', connectionName: 'foo' }; + }; + let doneResolve, doneReject; + let done = new Promise((resolve, reject) => { + doneResolve = resolve; + doneReject = reject; + }); + const botLogic= (ctx) => { + if (ctx.activity.type === 'event' && ctx.activity.value.token) { + gotToken = true; + doneResolve('done'); + } else { + doneReject('error'); + } + }; + const adapter = new MockAdapter(botLogic, returnTokenResponse); + const activity = createOAuthCardActivity(); + const context = adapter.createTurnContext(activity); + + TokenResolver.checkForOAuthCards(adapter, context, activity); + + await done; + + assert(gotToken, 'did not receive token'); + }); + + it(`should end polling`, async function () { + let doneResolve, doneReject; + let done = new Promise((resolve, reject) => { + doneResolve = resolve; + doneReject = reject; + }); + const returnTokenResponse = () => + { + doneResolve('done'); + return { properties: { tokenPollingSettings: { timeout: 0 } } }; + }; + const botLogic= (ctx) => { + }; + const adapter = new MockAdapter(botLogic, returnTokenResponse); + const activity = createOAuthCardActivity(); + const context = adapter.createTurnContext(activity); + const log = []; + + TokenResolver.checkForOAuthCards(adapter, context, activity, log); + + await done; + + assert(log.indexOf('End polling') !== -1, 'did not end polling'); + }); + + it(`should change interval polling`, async function () { + let doneResolve, doneReject; + let done = new Promise((resolve, reject) => { + doneResolve = resolve; + doneReject = reject; + }); + let i = 0; + const returnTokenResponse = () => + { + i++; + if (i < 2) { + return { properties: { tokenPollingSettings: { interval: 100 } } }; + } else { + doneResolve('done'); + return { properties: { tokenPollingSettings: { timeout: 0 } } }; + } + }; + const botLogic= (ctx) => { + }; + const adapter = new MockAdapter(botLogic, returnTokenResponse); + const activity = createOAuthCardActivity(); + const context = adapter.createTurnContext(activity); + const log = []; + + TokenResolver.checkForOAuthCards(adapter, context, activity, log); + + await done; + + assert(log.indexOf('Changing polling interval to 100') !== -1, 'did not end polling'); + }); +}); diff --git a/libraries/botbuilder/tests/teamsActivityHandler.test.js b/libraries/botbuilder/tests/teamsActivityHandler.test.js index 6d2c0b18cf..304643617a 100644 --- a/libraries/botbuilder/tests/teamsActivityHandler.test.js +++ b/libraries/botbuilder/tests/teamsActivityHandler.test.js @@ -848,4 +848,4 @@ describe('TeamsActivityHandler', () => { assert(handleTeamsSigninVerifyStateCalled, 'handleTeamsSigninVerifyState handler not called'); }); }); -}); +}); \ No newline at end of file diff --git a/libraries/botframework-streaming/.eslintrc.json b/libraries/botframework-streaming/.eslintrc.json new file mode 100644 index 0000000000..0fc96d5693 --- /dev/null +++ b/libraries/botframework-streaming/.eslintrc.json @@ -0,0 +1,33 @@ +{ + "parser": "@typescript-eslint/parser", + "plugins": [ + "@typescript-eslint", + "only-warn" + ], + "extends": ["plugin:@typescript-eslint/recommended"], + "parserOptions": { + "ecmaVersion": 9, + "sourceType": "module", + "ecmaFeatures": { + "impliedStrict": true + } + }, + "rules": { + "semi": ["error", "always"], + "no-return-await": 0, + "space-before-function-paren": ["error", { + "named": "never", + "anonymous": "never", + "asyncArrow": "always" + }], + "quotes": ["error", "single", {"allowTemplateLiterals": true}], + "template-curly-spacing": ["error", "always"], + "@typescript-eslint/indent": ["error", 4], + "@typescript-eslint/interface-name-prefix": 0, + "@typescript-eslint/no-explicit-any": 0, + "@typescript-eslint/no-object-literal-type-assertion": ["error", { + "allowAsParameter": true + }], + "@typescript-eslint/no-use-before-define": ["error", { "functions": false, "classes": true }] + } +} \ No newline at end of file diff --git a/libraries/botframework-streaming/package.json b/libraries/botframework-streaming/package.json index 8be71b270b..78d3a8e4d0 100644 --- a/libraries/botframework-streaming/package.json +++ b/libraries/botframework-streaming/package.json @@ -22,31 +22,31 @@ "main": "lib/index.js", "typings": "lib/index.d.js", "dependencies": { - "@azure/ms-rest-js": "1.2.6", - "botbuilder": "4.1.6", - "botbuilder-core": "4.1.6", "uuid": "^3.3.2", - "watershed": "^0.4.0", "ws": "^7.1.2" }, "devDependencies": { "@types/chai": "^4.1.7", "@types/node": "^10.12.18", "@types/ws": "^6.0.3", + "@typescript-eslint/eslint-plugin": "^1.10.2", + "@typescript-eslint/parser": "^1.10.2", "chai": "^4.2.0", + "eslint": "^5.16.0", + "eslint-plugin-only-warn": "^1.0.1", "mocha": "^6.2.0", "nyc": "^14.1.1", "sinon": "^7.4.1", "ts-node": "^4.1.0", - "tslint": "^5.16.0", - "tslint-microsoft-contrib": "^5.2.1", "typescript": "3.5.3" }, "scripts": { - "test": "tsc && nyc mocha tests/", "build": "tsc", "clean": "erase /q /s .\\lib", - "set-version": "npm version --allow-same-version ${Version}" + "eslint": "eslint ./src/*.ts ./src/**/*.ts", + "eslint-fix": "eslint ./src/*.ts ./src/**/*.ts --fix", + "set-version": "npm version --allow-same-version ${Version}", + "test": "tsc && nyc mocha tests/" }, "files": [ "/lib", diff --git a/libraries/botframework-streaming/src/adapters/streamingAdapter.ts b/libraries/botframework-streaming/src/adapters/streamingAdapter.ts deleted file mode 100644 index 3caf7b58ed..0000000000 --- a/libraries/botframework-streaming/src/adapters/streamingAdapter.ts +++ /dev/null @@ -1,656 +0,0 @@ -/** - * @module botframework-streaming - */ -/** - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - */ - -import { - BotFrameworkAdapter, - BotFrameworkAdapterSettings, - InvokeResponse, - INVOKE_RESPONSE_KEY, - StatusCodes, - WebRequest, - WebResponse -} from 'botbuilder'; -import { - Activity, ActivityTypes, BotCallbackHandlerKey, - IUserTokenProvider, ResourceResponse, TurnContext } from 'botbuilder-core'; -import { AuthenticationConstants, ChannelValidation, ConnectorClient, GovernmentConstants, GovernmentChannelValidation, JwtTokenValidation, MicrosoftAppCredentials, SimpleCredentialProvider } from 'botbuilder/node_modules/botframework-connector'; -import { IncomingMessage } from 'http'; -import * as os from 'os'; - -import { StreamingHttpClient } from './streamingHttpClient'; -import { TokenResolver } from './tokenResolver'; - -import { IReceiveRequest, ISocket, IStreamingTransportServer } from '../interfaces'; -import { NamedPipeServer } from '../namedPipe'; -import { StreamingResponse } from '../streamingResponse'; -import { NodeWebSocketFactory, NodeWebSocketFactoryBase, WebSocketServer } from '../webSocket'; - -// Retrieve additional information, i.e., host operating system, host OS release, architecture, Node.js version -const ARCHITECTURE: any = os.arch(); -const TYPE: any = os.type(); -const RELEASE: any = os.release(); -const NODE_VERSION: any = process.version; - -// tslint:disable-next-line:no-var-requires no-require-imports -const pjson: any = require('../../package.json'); -const USER_AGENT: string = `Microsoft-BotFramework/3.1 BotBuilder/${ pjson.version } ` + - `(Node.js,Version=${ NODE_VERSION }; ${ TYPE } ${ RELEASE }; ${ ARCHITECTURE })`; - -const defaultPipeName = 'bfv4.pipes'; -const VERSION_PATH: string = '/api/version'; -const MESSAGES_PATH: string = '/api/messages'; -const GET: string = 'GET'; -const POST: string = 'POST'; - - -export interface StreamingAdapterSettings extends BotFrameworkAdapterSettings { - /** - * Optional. The option to determine if this adapter accepts WebSocket connections - */ - enableWebSockets?: boolean; - - /** - * Optional. Used to pass in a NodeWebSocketFactoryBase instance. Allows bot to accept WebSocket connections. - */ - webSocketFactory?: NodeWebSocketFactoryBase; -} - -export class StreamingAdapter extends BotFrameworkAdapter implements IUserTokenProvider { - protected readonly credentials: MicrosoftAppCredentials; - protected readonly credentialsProvider: SimpleCredentialProvider; - protected readonly settings: StreamingAdapterSettings; - - private logic: (context: TurnContext) => Promise; - private streamingServer: IStreamingTransportServer; - private _isEmulatingOAuthCards: boolean; - private webSocketFactory: NodeWebSocketFactoryBase; - - /** - * Creates a new instance of the [BotFrameworkAdapter](xref:botbuilder.BotFrameworkAdapter) class. - * - * @param settings Optional. The settings to use for this adapter instance. - * - * @remarks - * If the `settings` parameter does not include - * [channelService](xref:botbuilder.BotFrameworkAdapterSettings.channelService) or - * [openIdMetadata](xref:botbuilder.BotFrameworkAdapterSettings.openIdMetadata) values, the - * constructor checks the process' environment variables for these values. These values may be - * set when a bot is provisioned on Azure and if so are required for the bot to work properly - * in the global cloud or in a national cloud. - * - * The [BotFrameworkAdapterSettings](xref:botbuilder.BotFrameworkAdapterSettings) class defines - * the available adapter settings. - */ - constructor(settings?: Partial) { - super(settings); - - this._isEmulatingOAuthCards = false; - - // If the developer wants to use WebSockets, but didn't provide a WebSocketFactory, - // create a NodeWebSocketFactory. - if (this.settings.enableWebSockets && !this.settings.webSocketFactory) { - this.webSocketFactory = new NodeWebSocketFactory(); - } - - if (this.settings.webSocketFactory) { - this.webSocketFactory = this.settings.webSocketFactory; - } - - // Relocate the tenantId field used by MS Teams to a new location (from channelData to conversation) - // This will only occur on activities from teams that include tenant info in channelData but NOT in conversation, - // thus should be future friendly. However, once the the transition is complete. we can remove this. - this.use(async(context, next) => { - if (context.activity.channelId === 'msteams' && context.activity && context.activity.conversation && !context.activity.conversation.tenantId && context.activity.channelData && context.activity.channelData.tenant) { - context.activity.conversation.tenantId = context.activity.channelData.tenant.id; - } - await next(); - }); - - } - - /** - * Asynchronously creates a turn context and runs the middleware pipeline for an incoming activity. - * - * @param req An Express or Restify style request object. - * @param res An Express or Restify style response object. - * @param logic The function to call at the end of the middleware pipeline. - * - * @remarks - * This is the main way a bot receives incoming messages and defines a turn in the conversation. This method: - * - * 1. Parses and authenticates an incoming request. - * - The activity is read from the body of the incoming request. An error will be returned - * if the activity can't be parsed. - * - The identity of the sender is authenticated as either the Emulator or a valid Microsoft - * server, using the bot's `appId` and `appPassword`. The request is rejected if the sender's - * identity is not verified. - * 1. Creates a [TurnContext](xref:botbuilder-core.TurnContext) object for the received activity. - * - This object is wrapped with a [revocable proxy](https://www.ecma-international.org/ecma-262/6.0/#sec-proxy.revocable). - * - When this method completes, the proxy is revoked. - * 1. Sends the turn context through the adapter's middleware pipeline. - * 1. Sends the turn context to the `logic` function. - * - The bot may perform additional routing or processing at this time. - * Returning a promise (or providing an `async` handler) will cause the adapter to wait for any asynchronous operations to complete. - * - After the `logic` function completes, the promise chain set up by the middleware is resolved. - * - * > [!TIP] - * > If you see the error `TypeError: Cannot perform 'set' on a proxy that has been revoked` - * > in your bot's console output, the likely cause is that an async function was used - * > without using the `await` keyword. Make sure all async functions use await! - * - * Middleware can _short circuit_ a turn. When this happens, subsequent middleware and the - * `logic` function is not called; however, all middleware prior to this point still run to completion. - * For more information about the middleware pipeline, see the - * [how bots work](https://docs.microsoft.com/azure/bot-service/bot-builder-basics) and - * [middleware](https://docs.microsoft.com/azure/bot-service/bot-builder-concept-middleware) articles. - * Use the adapter's [use](xref:botbuilder-core.BotAdapter.use) method to add middleware to the adapter. - * - * For example: - * ```JavaScript - * server.post('/api/messages', (req, res) => { - * // Route received request to adapter for processing - * adapter.processActivity(req, res, async (context) => { - * // Process any messages received - * if (context.activity.type === ActivityTypes.Message) { - * await context.sendActivity(`Hello World`); - * } - * }); - * }); - * ``` - */ - public async processActivity(req: WebRequest, res: WebResponse, logic: (context: TurnContext) => Promise): Promise { - if (this.settings.enableWebSockets && req.method === GET && (req.headers.Upgrade || req.headers.upgrade)) { - return this.useWebSocket(req, res, logic); - } - - let body: any; - let status: number; - let processError: Error; - try { - // Parse body of request - status = 400; - const request = await parseRequest(req); - - // Authenticate the incoming request - status = 401; - const authHeader: string = req.headers.authorization || req.headers.Authorization || ''; - await this.authenticateRequest(request, authHeader); - - // Process received activity - status = 500; - const context: TurnContext = this.createContext(request); - context.turnState.set(BotCallbackHandlerKey, logic); - await this.runMiddleware(context, logic); - - // Retrieve cached invoke response. - if (request.type === ActivityTypes.Invoke) { - const invokeResponse: any = context.turnState.get(INVOKE_RESPONSE_KEY); - if (invokeResponse && invokeResponse.value) { - const value: InvokeResponse = invokeResponse.value; - status = value.status; - body = value.body; - } else { - status = 501; - } - } else { - status = 200; - } - } catch (err) { - // Catch the error to try and throw the stacktrace out of processActivity() - processError = err; - body = err.toString(); - } - - // Return status - res.status(status); - if (body) { res.send(body); } - res.end(); - - // Check for an error - if (status >= 400) { - if (processError && (processError as Error).stack) { - throw new Error(`BotFrameworkAdapter.processActivity(): ${ status } ERROR\n ${ processError.stack }`); - } else { - throw new Error(`BotFrameworkAdapter.processActivity(): ${ status } ERROR`); - } - } - } - - /** - * Asynchronously creates a turn context and runs the middleware pipeline for an incoming activity. - * - * @param activity The activity to process. - * @param logic The function to call at the end of the middleware pipeline. - * - * @remarks - * This is the main way a bot receives incoming messages and defines a turn in the conversation. This method: - * - * 1. Creates a [TurnContext](xref:botbuilder-core.TurnContext) object for the received activity. - * - This object is wrapped with a [revocable proxy](https://www.ecma-international.org/ecma-262/6.0/#sec-proxy.revocable). - * - When this method completes, the proxy is revoked. - * 1. Sends the turn context through the adapter's middleware pipeline. - * 1. Sends the turn context to the `logic` function. - * - The bot may perform additional routing or processing at this time. - * Returning a promise (or providing an `async` handler) will cause the adapter to wait for any asynchronous operations to complete. - * - After the `logic` function completes, the promise chain set up by the middleware is resolved. - * - * Middleware can _short circuit_ a turn. When this happens, subsequent middleware and the - * `logic` function is not called; however, all middleware prior to this point still run to completion. - * For more information about the middleware pipeline, see the - * [how bots work](https://docs.microsoft.com/azure/bot-service/bot-builder-basics) and - * [middleware](https://docs.microsoft.com/azure/bot-service/bot-builder-concept-middleware) articles. - * Use the adapter's [use](xref:botbuilder-core.BotAdapter.use) method to add middleware to the adapter. - */ - public async processActivityDirect(activity: Activity, logic: (context: TurnContext) => Promise): Promise { - let processError: Error; - try { - // Process activity - const context: TurnContext = this.createContext(activity); - context.turnState.set(BotCallbackHandlerKey, logic); - await this.runMiddleware(context, logic); - } catch (err) { - // Catch the error to try and throw the stacktrace out of processActivity() - processError = err; - } - - if (processError) { - if (processError && (processError as Error).stack) { - throw new Error(`BotFrameworkAdapter.processActivity(): ${ status } ERROR\n ${ processError.stack }`); - } else { - throw new Error(`BotFrameworkAdapter.processActivity(): ${ status } ERROR`); - } - } - } - - /** - * Asynchronously sends a set of outgoing activities to a channel server. - * - * This method supports the framework and is not intended to be called directly for your code. - * Use the turn context's [sendActivity](xref:botbuilder-core.TurnContext.sendActivity) or - * [sendActivities](xref:botbuilder-core.TurnContext.sendActivities) method from your bot code. - * - * @param context The context object for the turn. - * @param activities The activities to send. - * - * @returns An array of [ResourceResponse](xref:) - * - * @remarks - * The activities will be sent one after another in the order in which they're received. A - * response object will be returned for each sent activity. For `message` activities this will - * contain the ID of the delivered message. - */ - public async sendActivities(context: TurnContext, activities: Partial[]): Promise { - const responses: ResourceResponse[] = []; - for (let i = 0; i < activities.length; i++) { - const activity: Partial = activities[i]; - switch (activity.type) { - case 'delay': - await delay(typeof activity.value === 'number' ? activity.value : 1000); - responses.push({} as ResourceResponse); - break; - case 'invokeResponse': - // Cache response to context object. This will be retrieved when turn completes. - context.turnState.set(INVOKE_RESPONSE_KEY, activity); - responses.push({} as ResourceResponse); - break; - default: - if (!activity.serviceUrl) { throw new Error(`BotFrameworkAdapter.sendActivity(): missing serviceUrl.`); } - if (!activity.conversation || !activity.conversation.id) { - throw new Error(`BotFrameworkAdapter.sendActivity(): missing conversation id.`); - } - if (StreamingAdapter.isFromStreamingConnection(activity as Activity)) { - TokenResolver.checkForOAuthCards(this, context, activity as Activity); - } - const client: ConnectorClient = this.createConnectorClient(activity.serviceUrl); - if (activity.type === 'trace' && activity.channelId !== 'emulator') { - // Just eat activity - responses.push({} as ResourceResponse); - } else if (activity.replyToId) { - responses.push(await client.conversations.replyToActivity( - activity.conversation.id, - activity.replyToId, - activity as Activity - )); - } else { - responses.push(await client.conversations.sendToConversation( - activity.conversation.id, - activity as Activity - )); - } - break; - } - } - return responses; - } - - /** - * Creates a connector client. - * - * @param serviceUrl The client's service URL. - * - * @remarks - * Override this in a derived class to create a mock connector client for unit testing. - */ - public createConnectorClient(serviceUrl: string): ConnectorClient { - - if (StreamingAdapter.isStreamingServiceUrl(serviceUrl)) { - - // Check if we have a streaming server. Otherwise, requesting a connector client - // for a non-existent streaming connection results in an error - if (!this.streamingServer) { - throw new Error(`Cannot create streaming connector client for serviceUrl ${serviceUrl} without a streaming connection. Call 'useWebSocket' or 'useNamedPipe' to start a streaming connection.`) - } - - return new ConnectorClient( - this.credentials, - { - baseUri: serviceUrl, - userAgent: USER_AGENT, - httpClient: new StreamingHttpClient(this.streamingServer) - }); - } - - const client: ConnectorClient = new ConnectorClient(this.credentials, { baseUri: serviceUrl, userAgent: USER_AGENT} ); - return client; - } - - /** - * Checks the validity of the request and attempts to map it the correct virtual endpoint, - * then generates and returns a response if appropriate. - * @param request A ReceiveRequest from the connected channel. - * @returns A response created by the BotAdapter to be sent to the client that originated the request. - */ - public async processRequest(request: IReceiveRequest): Promise { - let response = new StreamingResponse(); - - if (!request) { - response.statusCode = StatusCodes.BAD_REQUEST; - response.setBody(`No request provided.`); - return response; - } - - if (!request.verb || !request.path) { - response.statusCode = StatusCodes.BAD_REQUEST; - response.setBody(`Request missing verb and/or path. Verb: ${ request.verb }. Path: ${ request.path }`); - return response; - } - - if (request.verb.toLocaleUpperCase() !== POST && request.verb.toLocaleUpperCase() !== GET) { - response.statusCode = StatusCodes.METHOD_NOT_ALLOWED; - response.setBody(`Invalid verb received. Only GET and POST are accepted. Verb: ${ request.verb }`); - } - - if (request.path.toLocaleLowerCase() === VERSION_PATH) { - return await this.handleVersionRequest(request, response); - } - - // Convert the StreamingRequest into an activity the Adapter can understand. - let body: Activity; - try { - body = await this.readRequestBodyAsString(request); - - } catch (error) { - response.statusCode = StatusCodes.BAD_REQUEST; - response.setBody(`Request body missing or malformed: ${ error }`); - return response; - } - - if (request.path.toLocaleLowerCase() !== MESSAGES_PATH) { - response.statusCode = StatusCodes.NOT_FOUND; - response.setBody(`Path ${ request.path.toLocaleLowerCase() } not not found. Expected ${ MESSAGES_PATH }}.`); - return response; - } - - if (request.verb.toLocaleUpperCase() !== POST) { - response.statusCode = StatusCodes.METHOD_NOT_ALLOWED; - response.setBody(`Invalid verb received for ${ request.verb.toLocaleLowerCase() }. Only GET and POST are accepted. Verb: ${ request.verb }`); - return response; - } - - try { - let context = new TurnContext(this, body); - await this.runMiddleware(context, this.logic); - - if (body.type === ActivityTypes.Invoke) { - let invokeResponse: any = context.turnState.get(INVOKE_RESPONSE_KEY); - - if (invokeResponse && invokeResponse.value) { - const value: InvokeResponse = invokeResponse.value; - response.statusCode = value.status; - response.setBody(value.body); - } else { - response.statusCode = StatusCodes.NOT_IMPLEMENTED; - } - } else { - response.statusCode = StatusCodes.OK; - } - } catch (error) { - response.statusCode = StatusCodes.INTERNAL_SERVER_ERROR; - response.setBody(error); - return response; - } - - return response; - } - - private async handleVersionRequest(request: IReceiveRequest, response: StreamingResponse): Promise { - if (request.verb.toLocaleUpperCase() === GET) { - response.statusCode = StatusCodes.OK; - - if (!this.credentials.appId) { - response.setBody({ UserAgent: USER_AGENT }); - return response; - } - - let token = ''; - try { - token = await this.credentials.getToken(); - - } catch (err) { - /** - * In reality a missing BotToken will cause the channel to close the connection, - * but we still send the response and allow the channel to make that decision - * instead of proactively disconnecting. This allows the channel to know why - * the connection has been closed and make the choice not to make endless reconnection - * attempts that will end up right back here. - */ - console.error(err.message); - } - response.setBody({ UserAgent: USER_AGENT, BotToken: token }); - - } else { - response.statusCode = StatusCodes.METHOD_NOT_ALLOWED; - response.setBody(`Invalid verb received for path: ${ request.path }. Only GET is accepted. Verb: ${ request.verb }`); - } - - return response; - } - - /** - * Allows for the overriding of authentication in unit tests. - * @param request Received request. - * @param authHeader Received authentication header. - */ - protected async authenticateRequest(request: Partial, authHeader: string): Promise { - const claims = await JwtTokenValidation.authenticateRequest( - request as Activity, authHeader, - this.credentialsProvider, - this.settings.channelService - ); - if (!claims.isAuthenticated) { throw new Error('Unauthorized Access. Request is not authorized'); } - } - - /** - * Checks the environment and can set a flag to emulate OAuth cards. - * - * @param context The context object for the turn. - * - * @remarks - * Override this in a derived class to control how OAuth cards are emulated for unit testing. - */ - protected checkEmulatingOAuthCards(context: TurnContext): void { - if (!this._isEmulatingOAuthCards && - context.activity.channelId === 'emulator' && - (!this.credentials.appId)) { - this._isEmulatingOAuthCards = true; - } - } - - /** - * Determine if the Activity was sent via an Http/Https connection or Streaming - * This can be determined by looking at the ServiceUrl property: - * (1) All channels that send messages via http/https are not streaming - * (2) Channels that send messages via streaming have a ServiceUrl that does not begin with http/https. - * @param activity the activity. - */ - private static isFromStreamingConnection(activity: Activity): boolean { - return activity && this.isStreamingServiceUrl(activity.serviceUrl); - } - - /** - * Determine if the serviceUrl was sent via an Http/Https connection or Streaming - * This can be determined by looking at the ServiceUrl property: - * (1) All channels that send messages via http/https are not streaming - * (2) Channels that send messages via streaming have a ServiceUrl that does not begin with http/https. - * @param serviceUrl the serviceUrl provided in the resquest. - */ - private static isStreamingServiceUrl(serviceUrl: string): boolean { - return serviceUrl && !serviceUrl.toLowerCase().startsWith('http'); - } - - private async authenticateConnection(req: WebRequest, channelService?: string): Promise { - if (!this.credentials.appId) { - // auth is disabled - return; - } - - const authHeader: string = req.headers.authorization || req.headers.Authorization || ''; - const channelIdHeader: string = req.headers.channelid || req.headers.ChannelId || req.headers.ChannelID || ''; - // Validate the received Upgrade request from the channel. - const claims = await JwtTokenValidation.validateAuthHeader(authHeader, this.credentialsProvider, channelService, channelIdHeader); - - // Add serviceUrl from claim to static cache to trigger token refreshes. - const serviceUrl = claims.getClaimValue(AuthenticationConstants.ServiceUrlClaim); - MicrosoftAppCredentials.trustServiceUrl(serviceUrl); - - if (!claims.isAuthenticated) { throw new Error('Unauthorized Access. Request is not authorized'); } - } - - /** - * Connects the handler to a Named Pipe server and begins listening for incoming requests. - * @param pipeName The name of the named pipe to use when creating the server. - * @param logic The logic that will handle incoming requests. - */ - private async useNamedPipe(pipeName: string = defaultPipeName, logic: (context: TurnContext) => Promise): Promise{ - if (!logic) { - throw new Error('Bot logic needs to be provided to `useNamedPipe`'); - } - - this.logic = logic; - - this.streamingServer = new NamedPipeServer(pipeName, this); - await this.streamingServer.start(); - } - - /** - * Process the initial request to establish a long lived connection via a streaming server. - * @param req The connection request. - * @param res The response sent on error or connection termination. - * @param logic The logic that will handle incoming requests. - */ - private async useWebSocket(req: WebRequest, res: WebResponse, logic: (context: TurnContext) => Promise): Promise { - if (!logic) { - throw new Error('Streaming logic needs to be provided to `useWebSocket`'); - } - - if (!this.webSocketFactory || !this.webSocketFactory.createWebSocket) { - throw new Error('BotFrameworkAdapter must have a WebSocketFactory in order to support streaming.'); - } - - this.logic = logic; - - // Restify-specific check. - if (typeof((res as any).claimUpgrade) !== 'function') { - throw new Error('ClaimUpgrade is required for creating WebSocket connection.'); - } - - try { - await this.authenticateConnection(req, this.settings.channelService); - } catch (err) { - // Set the correct status code for the socket to send back to the channel. - res.status(StatusCodes.UNAUTHORIZED); - res.send(err.message); - // Re-throw the error so the developer will know what occurred. - throw err; - } - - const upgrade = (res as any).claimUpgrade(); - const socket = this.webSocketFactory.createWebSocket(req as IncomingMessage, upgrade.socket, upgrade.head); - - await this.startWebSocket(socket); - } - - /** - * Connects the handler to a WebSocket server and begins listening for incoming requests. - * @param socket The socket to use when creating the server. - */ - private async startWebSocket(socket: ISocket): Promise{ - this.streamingServer = new WebSocketServer(socket, this); - await this.streamingServer.start(); - } - - private async readRequestBodyAsString(request: IReceiveRequest): Promise { - const contentStream = request.streams[0]; - return await contentStream.readAsJson(); - } -} - -/** - * Handles incoming webhooks from the botframework - * @private - * @param req incoming web request - */ -function parseRequest(req: WebRequest): Promise { - return new Promise((resolve: any, reject: any): void => { - function returnActivity(activity: Activity): void { - if (typeof activity !== 'object') { throw new Error(`BotFrameworkAdapter.parseRequest(): invalid request body.`); } - if (typeof activity.type !== 'string') { throw new Error(`BotFrameworkAdapter.parseRequest(): missing activity type.`); } - if (typeof activity.timestamp === 'string') { activity.timestamp = new Date(activity.timestamp); } - if (typeof activity.localTimestamp === 'string') { activity.localTimestamp = new Date(activity.localTimestamp); } - if (typeof activity.expiration === 'string') { activity.expiration = new Date(activity.expiration); } - resolve(activity); - } - - if (req.body) { - try { - returnActivity(req.body); - } catch (err) { - reject(err); - } - } else { - let requestData = ''; - req.on('data', (chunk: string) => { - requestData += chunk; - }); - req.on('end', () => { - try { - req.body = JSON.parse(requestData); - returnActivity(req.body); - } catch (err) { - reject(err); - } - }); - } - }); -} - -function delay(timeout: number): Promise { - return new Promise((resolve) => { - setTimeout(resolve, timeout); - }); -} \ No newline at end of file diff --git a/libraries/botframework-streaming/src/index.ts b/libraries/botframework-streaming/src/index.ts index 11f2ca2e9a..b81e41f5b6 100644 --- a/libraries/botframework-streaming/src/index.ts +++ b/libraries/botframework-streaming/src/index.ts @@ -6,10 +6,18 @@ * Licensed under the MIT License. */ -export { StreamingAdapter, StreamingHttpClient, TokenResolver } from './adapters'; export { ContentStream } from './contentStream'; export { HttpContent } from './httpContentStream'; -export { IStreamingTransportServer, IStreamingTransportClient, ISocket, IReceiveRequest, IReceiveResponse } from './interfaces'; +export { + INodeBuffer, + INodeIncomingMessage, + INodeSocket, + IReceiveRequest, + IReceiveResponse, + ISocket, + IStreamingTransportClient, + IStreamingTransportServer, +} from './interfaces'; export { NamedPipeClient, NamedPipeServer } from './namedPipe'; export { RequestHandler } from './requestHandler'; export { StreamingRequest } from './streamingRequest'; @@ -21,7 +29,5 @@ export { NodeWebSocketFactory, NodeWebSocketFactoryBase, WebSocketClient, - WebSocketServer, - WsNodeWebSocket, - WsNodeWebSocketFactory, + WebSocketServer } from './webSocket'; diff --git a/libraries/botframework-streaming/src/interfaces/INodeBuffer.ts b/libraries/botframework-streaming/src/interfaces/INodeBuffer.ts new file mode 100644 index 0000000000..b600e0eff1 --- /dev/null +++ b/libraries/botframework-streaming/src/interfaces/INodeBuffer.ts @@ -0,0 +1,14 @@ +/** + * @module botframework-streaming + */ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +/** + * Represents a Buffer from the `buffer` module in Node.js. + * + * This interface supports the framework and is not intended to be called directly for your code. + */ +export interface INodeBuffer { } diff --git a/libraries/botframework-streaming/src/interfaces/INodeIncomingMessage.ts b/libraries/botframework-streaming/src/interfaces/INodeIncomingMessage.ts new file mode 100644 index 0000000000..af82eeaf6d --- /dev/null +++ b/libraries/botframework-streaming/src/interfaces/INodeIncomingMessage.ts @@ -0,0 +1,24 @@ +/** + * @module botframework-streaming + */ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +/** + * Represents a IncomingMessage from the `http` module in Node.js. + * + * This interface supports the framework and is not intended to be called directly for your code. + */ +export interface INodeIncomingMessage { + /*** + * Optional. The request headers. + */ + headers?: any; + + /*** + * Optional. The request method. + */ + method?: any; +} diff --git a/libraries/botframework-streaming/src/interfaces/INodeSocket.ts b/libraries/botframework-streaming/src/interfaces/INodeSocket.ts new file mode 100644 index 0000000000..21ae2b37ae --- /dev/null +++ b/libraries/botframework-streaming/src/interfaces/INodeSocket.ts @@ -0,0 +1,18 @@ +/** + * @module botframework-streaming + */ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +/** + * Represents a Socket from the `net` module in Node.js. + * + * This interface supports the framework and is not intended to be called directly for your code. + */ +export interface INodeSocket { + writable: boolean; + write(str: string, cb?: Function): boolean; + destroy(error?: Error): void; +} diff --git a/libraries/botframework-streaming/src/interfaces/index.ts b/libraries/botframework-streaming/src/interfaces/index.ts index df6b74c679..6da74891e6 100644 --- a/libraries/botframework-streaming/src/interfaces/index.ts +++ b/libraries/botframework-streaming/src/interfaces/index.ts @@ -5,6 +5,10 @@ * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. */ + +export * from './INodeBuffer'; +export * from './INodeIncomingMessage'; +export * from './INodeSocket'; export * from './IReceiveRequest'; export * from './IReceiveResponse'; export * from './ISocket'; diff --git a/libraries/botframework-streaming/src/payloadTransport/index.ts b/libraries/botframework-streaming/src/payloadTransport/index.ts index 461b0d0e0c..c37e2c7d66 100644 --- a/libraries/botframework-streaming/src/payloadTransport/index.ts +++ b/libraries/botframework-streaming/src/payloadTransport/index.ts @@ -10,5 +10,5 @@ export * from './payloadReceiver'; export * from './payloadSender'; export * from './payloadReceiver'; export * from './payloadSender'; -export * from './transportDisconnectedEventArgs'; +export * from './transportDisconnectedEvent'; export * from './transportDisconnectedEventHandler'; diff --git a/libraries/botframework-streaming/src/payloadTransport/payloadReceiver.ts b/libraries/botframework-streaming/src/payloadTransport/payloadReceiver.ts index 50eb5b28e1..75846e5c1f 100644 --- a/libraries/botframework-streaming/src/payloadTransport/payloadReceiver.ts +++ b/libraries/botframework-streaming/src/payloadTransport/payloadReceiver.ts @@ -10,7 +10,7 @@ import { PayloadTypes } from '../payloads/payloadTypes'; import { HeaderSerializer } from '../payloads/headerSerializer'; import { SubscribableStream } from '../subscribableStream'; import { PayloadConstants } from '../payloads/payloadConstants'; -import { TransportDisconnectedEventArgs } from './TransportDisconnectedEventArgs'; +import { TransportDisconnectedEvent } from './transportDisconnectedEvent'; import { ITransportReceiver } from '../interfaces/ITransportReceiver'; import { IHeader } from '../interfaces/IHeader'; @@ -57,7 +57,7 @@ export class PayloadReceiver { * * @param e Event arguments to include when broadcasting disconnection event. */ - public disconnect(e?: TransportDisconnectedEventArgs): void { + public disconnect(e?: TransportDisconnectedEvent): void { let didDisconnect; try { if (this.isConnected) { @@ -67,13 +67,13 @@ export class PayloadReceiver { } } catch (error) { this.isConnected = false; - this.disconnected(this, new TransportDisconnectedEventArgs(error.message)); + this.disconnected(this, new TransportDisconnectedEvent(error.message)); } this._receiver = null; this.isConnected = false; if (didDisconnect) { - this.disconnected(this, e || TransportDisconnectedEventArgs.Empty); + this.disconnected(this, e || TransportDisconnectedEvent.Empty); } } @@ -122,7 +122,7 @@ export class PayloadReceiver { } } catch (error) { isClosed = true; - this.disconnect(new TransportDisconnectedEventArgs(error.message)); + this.disconnect(new TransportDisconnectedEvent(error.message)); } } } diff --git a/libraries/botframework-streaming/src/payloadTransport/payloadSender.ts b/libraries/botframework-streaming/src/payloadTransport/payloadSender.ts index 68a1bc95f2..554192406e 100644 --- a/libraries/botframework-streaming/src/payloadTransport/payloadSender.ts +++ b/libraries/botframework-streaming/src/payloadTransport/payloadSender.ts @@ -9,7 +9,7 @@ import { HeaderSerializer } from '../payloads/headerSerializer'; import { SubscribableStream } from '../subscribableStream'; import { PayloadConstants } from '../payloads/payloadConstants'; -import { TransportDisconnectedEventArgs } from './transportDisconnectedEventArgs'; +import { TransportDisconnectedEvent } from './transportDisconnectedEvent'; import { TransportDisconnectedEventHandler } from './transportDisconnectedEventHandler'; import { ITransportSender } from '../interfaces/ITransportSender'; import { IHeader } from '../interfaces/IHeader'; @@ -57,13 +57,13 @@ export class PayloadSender { * * @param e The disconnected event arguments to include in the disconnected event broadcast. */ - public disconnect(e?: TransportDisconnectedEventArgs): void { + public disconnect(e?: TransportDisconnectedEvent): void { if (this.isConnected) { this.sender.close(); this.sender = null; if (this.disconnected) { - this.disconnected(this, e || TransportDisconnectedEventArgs.Empty); + this.disconnected(this, e || TransportDisconnectedEvent.Empty); } } } @@ -87,7 +87,7 @@ export class PayloadSender { } } } catch (e) { - this.disconnect(new TransportDisconnectedEventArgs(e.message)); + this.disconnect(new TransportDisconnectedEvent(e.message)); } } } diff --git a/libraries/botframework-streaming/src/payloadTransport/transportDisconnectedEvent.ts b/libraries/botframework-streaming/src/payloadTransport/transportDisconnectedEvent.ts new file mode 100644 index 0000000000..55f79a2998 --- /dev/null +++ b/libraries/botframework-streaming/src/payloadTransport/transportDisconnectedEvent.ts @@ -0,0 +1,29 @@ +/** + * @module botframework-streaming + */ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +export class TransportDisconnectedEvent { + /** + * A new and empty TransportDisconnectedEvent. + */ + public static Empty: TransportDisconnectedEvent = new TransportDisconnectedEvent(); + + /** + * The reason the disconnection event fired, in plain text. + */ + public reason: string; + + /** + * Indicates a transport disconnected with the reason provided via the constructor. + * This class is used for communicating disconnection events between the + * PayloadReceiver and PayloadSender. + * @param reason The reason the disconnection event fired, in plain text. + */ + public constructor(reason?: string) { + this.reason = reason; + } +} diff --git a/libraries/botframework-streaming/src/payloadTransport/transportDisconnectedEventArgs.ts b/libraries/botframework-streaming/src/payloadTransport/transportDisconnectedEventArgs.ts deleted file mode 100644 index 11487cb676..0000000000 --- a/libraries/botframework-streaming/src/payloadTransport/transportDisconnectedEventArgs.ts +++ /dev/null @@ -1,15 +0,0 @@ -/** - * @module botframework-streaming - */ -/** - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - */ -export class TransportDisconnectedEventArgs { - public static Empty: TransportDisconnectedEventArgs = new TransportDisconnectedEventArgs(); - public reason: string; - - public constructor(reason?: string) { - this.reason = reason; - } -} diff --git a/libraries/botframework-streaming/src/payloadTransport/transportDisconnectedEventHandler.ts b/libraries/botframework-streaming/src/payloadTransport/transportDisconnectedEventHandler.ts index 9d7ad02121..5012fbfad3 100644 --- a/libraries/botframework-streaming/src/payloadTransport/transportDisconnectedEventHandler.ts +++ b/libraries/botframework-streaming/src/payloadTransport/transportDisconnectedEventHandler.ts @@ -5,6 +5,7 @@ * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. */ -import { TransportDisconnectedEventArgs } from './transportDisconnectedEventArgs'; -export type TransportDisconnectedEventHandler = (sender: any, e: TransportDisconnectedEventArgs) => void; +import { TransportDisconnectedEvent } from './transportDisconnectedEvent'; + +export type TransportDisconnectedEventHandler = (sender: any, e: TransportDisconnectedEvent) => void; diff --git a/libraries/botframework-streaming/src/webSocket/factories/index.ts b/libraries/botframework-streaming/src/webSocket/factories/index.ts index dc920f3854..df0f5961b4 100644 --- a/libraries/botframework-streaming/src/webSocket/factories/index.ts +++ b/libraries/botframework-streaming/src/webSocket/factories/index.ts @@ -8,4 +8,3 @@ export * from './nodeWebSocketFactory'; export * from './nodeWebSocketFactoryBase'; -export * from './wsNodeWebSocketFactory'; diff --git a/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactory.ts b/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactory.ts index a9f62b8eb8..7ed3747503 100644 --- a/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactory.ts +++ b/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactory.ts @@ -1,32 +1,32 @@ -/** - * @module botframework-streaming - */ -/** - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - */ - -import { IncomingMessage } from 'http'; -import { Socket } from 'net'; - -import { NodeWebSocket } from '../nodeWebSocket'; -import { NodeWebSocketFactoryBase } from './nodeWebSocketFactoryBase'; - -export class NodeWebSocketFactory extends NodeWebSocketFactoryBase { - constructor() { - super(); - } - - /** - * Creates a NodeWebSocket instance. - * @param req - * @param socket - * @param head - */ - public createWebSocket(req: IncomingMessage, socket: Socket, head: Buffer): NodeWebSocket { - const s = new NodeWebSocket(); - s.create(req, socket, head); - - return s; - } -} +/** + * @module botframework-streaming + */ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +import { INodeIncomingMessage, INodeBuffer, INodeSocket } from '../../interfaces'; +import { NodeWebSocket } from '../nodeWebSocket'; +import { NodeWebSocketFactoryBase } from './nodeWebSocketFactoryBase'; + +export class NodeWebSocketFactory extends NodeWebSocketFactoryBase { + constructor() { + super(); + } + + /** + * Creates a NodeWebSocket instance. + * @remarks + * The parameters for this method should be associated with an 'upgrade' event off of a Node.js HTTP Server. + * @param req An IncomingMessage from the 'http' module in Node.js. + * @param socket The Socket connecting the bot and the server, from the 'net' module in Node.js. + * @param head The first packet of the upgraded stream which may be empty per https://nodejs.org/api/http.html#http_event_upgrade_1. + */ + public async createWebSocket(req: INodeIncomingMessage, socket: INodeSocket, head: INodeBuffer): Promise { + const s = new NodeWebSocket(); + await s.create(req, socket, head); + + return s; + } +} diff --git a/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactoryBase.ts b/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactoryBase.ts index bd183546f7..a959e1fca1 100644 --- a/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactoryBase.ts +++ b/libraries/botframework-streaming/src/webSocket/factories/nodeWebSocketFactoryBase.ts @@ -6,10 +6,8 @@ * Licensed under the MIT License. */ -import { IncomingMessage } from 'http'; -import { Socket } from 'net'; -import { ISocket } from '../../interfaces'; +import { INodeIncomingMessage, INodeBuffer, INodeSocket, ISocket } from '../../interfaces'; export abstract class NodeWebSocketFactoryBase { - public abstract createWebSocket(req: IncomingMessage, socket: Socket, head: Buffer): ISocket; + public abstract createWebSocket(req: INodeIncomingMessage, socket: INodeSocket, head: INodeBuffer): Promise; } diff --git a/libraries/botframework-streaming/src/webSocket/factories/wsNodeWebSocketFactory.ts b/libraries/botframework-streaming/src/webSocket/factories/wsNodeWebSocketFactory.ts deleted file mode 100644 index e0bf708a46..0000000000 --- a/libraries/botframework-streaming/src/webSocket/factories/wsNodeWebSocketFactory.ts +++ /dev/null @@ -1,27 +0,0 @@ -/** - * @module botframework-streaming - */ -/** - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - */ - -import { IncomingMessage } from 'http'; -import { Socket } from 'net'; - -import { WsNodeWebSocket } from '../wsNodeWebSocket'; - -export class WsNodeWebSocketFactory { - /** - * Creates a WsNodeWebSocket instance. - * @param req - * @param socket - * @param head - */ - public async createWebSocket(req: IncomingMessage, socket: Socket, head: Buffer): Promise { - const s = new WsNodeWebSocket(); - await s.create(req, socket, head); - - return s; - } -} diff --git a/libraries/botframework-streaming/src/webSocket/index.ts b/libraries/botframework-streaming/src/webSocket/index.ts index 7685d49f2c..57043ad402 100644 --- a/libraries/botframework-streaming/src/webSocket/index.ts +++ b/libraries/botframework-streaming/src/webSocket/index.ts @@ -8,9 +8,7 @@ export * from './browserWebSocket'; export * from './factories'; -export * from '../interfaces/ISocket'; export * from './nodeWebSocket'; export * from './webSocketClient'; export * from './webSocketServer'; export * from './webSocketTransport'; -export * from './wsNodeWebSocket'; diff --git a/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts b/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts index b68571c388..ac1ab5c9a0 100644 --- a/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts +++ b/libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts @@ -1,119 +1,133 @@ -/** - * @module botframework-streaming - */ -/** - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - */ - -import { IncomingMessage, request } from 'http'; -import { Socket } from 'net'; -import { Watershed } from 'watershed'; -import { ISocket } from '../interfaces/ISocket'; - -export class NodeWebSocket implements ISocket { - private waterShedSocket: any; - private connected: boolean; - protected watershedShed: Watershed; - - /** - * Creates a new instance of the [NodeWebSocket](xref:botframework-streaming.NodeWebSocket) class. - * - * @param socket The WaterShed socket object to build this connection on. - */ - public constructor(waterShedSocket?) { - this.waterShedSocket = waterShedSocket; - this.connected = !!waterShedSocket; - this.watershedShed = new Watershed(); - } - - /** - * Create and set a WaterShed WebSocket with an HTTP Request, Socket and Buffer. - * @param req IncomingMessage - * @param socket Socket - * @param head Buffer - */ - public create(req: IncomingMessage, socket: Socket, head: Buffer): void { - this.waterShedSocket = this.watershedShed.accept(req, socket, head); - this.connected = true; - } - - /** - * True if the socket is currently connected. - */ - public get isConnected(): boolean { - return this.connected; - } - - /** - * Writes a buffer to the socket and sends it. - * - * @param buffer The buffer of data to send across the connection. - */ - public write(buffer: Buffer): void { - this.waterShedSocket.send(buffer); - } - - /** - * Connects to the supporting socket using WebSocket protocol. - * - * @param serverAddress The address the server is listening on. - * @param port The port the server is listening on, defaults to 8082. - */ - public async connect(serverAddress, port = 8082): Promise { - // Following template from https://github.com/joyent/node-watershed#readme - const wskey = this.watershedShed.generateKey(); - const options = { - port: port, - hostname: serverAddress, - headers: { - connection: 'upgrade', - 'Sec-WebSocket-Key': wskey, - 'Sec-WebSocket-Version': '13' - } - }; - const req = request(options); - req.end(); - req.on('upgrade', function(res, socket, head): void { - this.watershedShed.connect(res, socket, head, wskey); - }); - - this.connected = true; - - return new Promise((resolve, reject): void => { - req.on('close', resolve); - req.on('error', reject); - }); - } - - /** - * Set the handler for text and binary messages received on the socket. - */ - public setOnMessageHandler(handler: (x: any) => void): void { - this.waterShedSocket.on('text', handler); - this.waterShedSocket.on('binary', handler); - } - - /** - * Close the socket. - */ - public close(): any { - this.connected = false; - - return this.waterShedSocket.end(); - } - - /** - * Set the callback to call when encountering socket closures. - */ - public setOnCloseHandler(handler: (x: any) => void): void { - this.waterShedSocket.on('end', handler); - } - - /** - * Set the callback to call when encountering errors. - */ - public setOnErrorHandler(handler: (x: any) => void): void { - this.waterShedSocket.on('error', (error): void => { if (error) { handler(error); } }); - } -} +/** + * @module botframework-streaming + */ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +import * as crypto from 'crypto'; +import { IncomingMessage, request } from 'http'; +import { Socket } from 'net'; +import * as WebSocket from 'ws'; + +import { INodeIncomingMessage, INodeBuffer, INodeSocket, ISocket } from '../interfaces'; +const NONCE_LENGTH = 16; + +export class NodeWebSocket implements ISocket { + private wsSocket: WebSocket; + protected wsServer: WebSocket.Server; + + /** + * Creates a new instance of the [NodeWebSocket](xref:botframework-streaming.NodeWebSocket) class. + * + * @param socket The `ws` WebSocket instance to build this connection on. + */ + public constructor(wsSocket?: WebSocket) { + this.wsSocket = wsSocket; + } + + /** + * Create and set a `ws` WebSocket with an HTTP Request, Socket and Buffer. + * @param req INodeIncomingMessage + * @param socket INodeSocket + * @param head INodeBuffer + */ + public async create(req: INodeIncomingMessage, socket: INodeSocket, head: INodeBuffer): Promise { + this.wsServer = new WebSocket.Server({ noServer: true }); + return new Promise((resolve, reject) => { + try { + this.wsServer.handleUpgrade(req as IncomingMessage, socket as Socket, head as Buffer, (websocket) => { + this.wsSocket = websocket; + resolve(); + }); + } catch (err) { + reject(err); + } + }); + } + + /** + * True if the 'ws' WebSocket is currently connected. + */ + public get isConnected(): boolean { + return this.wsSocket && this.wsSocket.readyState === WebSocket.OPEN; + } + + /** + * Writes a buffer to the socket and sends it. + * + * @param buffer The buffer of data to send across the connection. + */ + public write(buffer: INodeBuffer): void { + this.wsSocket.send(buffer); + } + + /** + * Connects to the supporting socket using WebSocket protocol. + * + * @param serverAddress The address the server is listening on. + * @param port The port the server is listening on, defaults to 8082. + */ + public async connect(serverAddress, port = 8082): Promise { + this.wsServer = new WebSocket.Server({ noServer: true }); + // Key generation per https://tools.ietf.org/html/rfc6455#section-1.3 (pg. 7) + const wskey = crypto.randomBytes(NONCE_LENGTH).toString('base64'); + const options = { + port: port, + hostname: serverAddress, + headers: { + connection: 'upgrade', + 'Sec-WebSocket-Key': wskey, + 'Sec-WebSocket-Version': '13' + } + }; + const req = request(options); + req.end(); + req.on('upgrade', (res, socket, head): void => { + // @types/ws does not contain the signature for completeUpgrade + // https://github.com/websockets/ws/blob/0a612364e69fc07624b8010c6873f7766743a8e3/lib/websocket-server.js#L269 + (this.wsServer as any).completeUpgrade(wskey, undefined, res, socket, head, (websocket): void => { + this.wsSocket = websocket; + }); + }); + + return new Promise((resolve, reject): void => { + req.on('close', resolve); + req.on('error', reject); + }); + } + + /** + * Set the handler for `'data'` and `'message'` events received on the socket. + */ + public setOnMessageHandler(handler: (x: any) => void): void { + this.wsSocket.on('data', handler); + this.wsSocket.on('message', handler); + } + + /** + * Close the socket. + * @remarks + * Optionally pass in a status code and string explaining why the connection is closing. + * @param code + * @param data + */ + public close(code?: number, data?: string): void { + return this.wsSocket.close(code, data); + } + + /** + * Set the callback to call when encountering socket closures. + */ + public setOnCloseHandler(handler: (x: any) => void): void { + this.wsSocket.on('close', handler); + } + + /** + * Set the callback to call when encountering errors. + */ + public setOnErrorHandler(handler: (x: any) => void): void { + this.wsSocket.on('error', (error): void => { if (error) { handler(error); } }); + } +} \ No newline at end of file diff --git a/libraries/botframework-streaming/src/webSocket/webSocketClient.ts b/libraries/botframework-streaming/src/webSocket/webSocketClient.ts index aa8b34f349..0f7c076bd9 100644 --- a/libraries/botframework-streaming/src/webSocket/webSocketClient.ts +++ b/libraries/botframework-streaming/src/webSocket/webSocketClient.ts @@ -12,11 +12,11 @@ import { RequestManager } from '../payloads'; import { PayloadReceiver, PayloadSender, - TransportDisconnectedEventArgs + TransportDisconnectedEvent } from '../payloadTransport'; -import { BrowserWebSocket } from './BrowserWebSocket'; -import { NodeWebSocket } from './NodeWebSocket'; -import { WebSocketTransport } from './WebSocketTransport'; +import { BrowserWebSocket } from './browserWebSocket'; +import { NodeWebSocket } from './nodeWebSocket'; +import { WebSocketTransport } from './webSocketTransport'; import { IStreamingTransportClient, IReceiveResponse } from '../interfaces'; /** @@ -82,8 +82,8 @@ export class WebSocketClient implements IStreamingTransportClient { * Stop this client from listening. */ public disconnect(): void { - this._sender.disconnect(new TransportDisconnectedEventArgs('Disconnect was called.')); - this._receiver.disconnect(new TransportDisconnectedEventArgs('Disconnect was called.')); + this._sender.disconnect(new TransportDisconnectedEvent('Disconnect was called.')); + this._receiver.disconnect(new TransportDisconnectedEvent('Disconnect was called.')); } /** @@ -98,7 +98,7 @@ export class WebSocketClient implements IStreamingTransportClient { private onConnectionDisconnected(sender: object, args: any): void { if (this._disconnectionHandler != null) { - this._disconnectionHandler("Disconnected"); + this._disconnectionHandler('Disconnected'); return; } diff --git a/libraries/botframework-streaming/src/webSocket/webSocketServer.ts b/libraries/botframework-streaming/src/webSocket/webSocketServer.ts index b8e83c87ff..f6fb5a0ddf 100644 --- a/libraries/botframework-streaming/src/webSocket/webSocketServer.ts +++ b/libraries/botframework-streaming/src/webSocket/webSocketServer.ts @@ -12,7 +12,7 @@ import { RequestManager } from '../payloads'; import { PayloadReceiver, PayloadSender, - TransportDisconnectedEventArgs + TransportDisconnectedEvent } from '../payloadTransport'; import { ISocket } from '../interfaces/ISocket'; import { WebSocketTransport } from './WebSocketTransport'; @@ -79,11 +79,15 @@ export class WebSocketServer implements IStreamingTransportServer { * Stop this server. */ public disconnect(): void { - this._sender.disconnect(new TransportDisconnectedEventArgs('Disconnect was called.')); - this._receiver.disconnect(new TransportDisconnectedEventArgs('Disconnect was called.')); + this._sender.disconnect(new TransportDisconnectedEvent('Disconnect was called.')); + this._receiver.disconnect(new TransportDisconnectedEvent('Disconnect was called.')); } - private onConnectionDisconnected(sender: PayloadReceiver | PayloadSender, e?: TransportDisconnectedEventArgs): void { + /** + * @param sender The PayloadReceiver or PayloadSender that triggered or received a disconnect. + * @param e TransportDisconnectedEvent + */ + private onConnectionDisconnected(sender: PayloadReceiver | PayloadSender, e?: TransportDisconnectedEvent): void { if (this._closedSignal) { this._closedSignal('close'); this._closedSignal = null; diff --git a/libraries/botframework-streaming/src/webSocket/wsNodeWebSocket.ts b/libraries/botframework-streaming/src/webSocket/wsNodeWebSocket.ts deleted file mode 100644 index 854cee833e..0000000000 --- a/libraries/botframework-streaming/src/webSocket/wsNodeWebSocket.ts +++ /dev/null @@ -1,139 +0,0 @@ -/** - * @module botframework-streaming - */ -/** - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - */ - -import { ISocket } from '../interfaces'; -import { IncomingMessage, request } from 'http'; -import { Socket } from 'net'; -import * as WebSocket from 'ws'; -import * as crypto from 'crypto'; - -// Taken from watershed, these needs to be investigated. -const NONCE_LENGTH = 16; - -export class WsNodeWebSocket implements ISocket { - private wsSocket: WebSocket; - private connected: boolean; - protected wsServer: WebSocket.Server; - - /** - * Creates a new instance of the [WsNodeWebSocket](xref:botframework-streaming.WsNodeWebSocket) class. - * - * @param socket The ws socket object to build this connection on. - */ - public constructor(wsSocket?: WebSocket) { - this.wsSocket = wsSocket; - this.connected = !!wsSocket; - this.wsServer = new WebSocket.Server({ noServer: true }); - } - - /** - * Create and set a `ws` WebSocket with an HTTP Request, Socket and Buffer. - * @param req IncomingMessage - * @param socket Socket - * @param head Buffer - */ - public async create(req: IncomingMessage, socket: Socket, head: Buffer): Promise { - return new Promise((resolve, reject) => { - try { - this.wsServer.handleUpgrade(req, socket, head, (websocket) => { - this.wsSocket = websocket; - this.connected = true; - resolve(); - }); - } catch (err) { - reject(err); - } - }); - } - - /** - * True if the socket is currently connected. - */ - public get isConnected(): boolean { - return this.connected; - } - - /** - * Writes a buffer to the socket and sends it. - * - * @param buffer The buffer of data to send across the connection. - */ - public write(buffer: Buffer): void { - this.wsSocket.send(buffer); - } - - /** - * Connects to the supporting socket using WebSocket protocol. - * - * @param serverAddress The address the server is listening on. - * @param port The port the server is listening on, defaults to 8082. - */ - public async connect(serverAddress, port = 8082): Promise { - // Taken from WaterShed, this needs to be investigated. - const wskey = crypto.randomBytes(NONCE_LENGTH).toString('base64'); - const options = { - port: port, - hostname: serverAddress, - headers: { - connection: 'upgrade', - 'Sec-WebSocket-Key': wskey, - 'Sec-WebSocket-Version': '13' - } - }; - const req = request(options); - req.end(); - req.on('upgrade', (res, socket, head): void => { - // @types/ws does not contain the signature for completeUpgrade - // https://github.com/websockets/ws/blob/0a612364e69fc07624b8010c6873f7766743a8e3/lib/websocket-server.js#L269 - (this.wsServer as any).completeUpgrade(wskey, undefined, res, socket, head, (websocket): void => { - this.wsSocket = websocket; - this.connected = true; - }); - }); - - return new Promise((resolve, reject): void => { - req.on('close', resolve); - req.on('error', reject); - }); - } - - /** - * Set the handler for `'data'` and `'message'` events received on the socket. - */ - public setOnMessageHandler(handler: (x: any) => void): void { - this.wsSocket.on('data', handler); - this.wsSocket.on('message', handler); - } - - /** - * Close the socket. - * @remarks - * Optionally pass in a status code and string explaining why the connection is closing. - * @param code - * @param data - */ - public close(code?: number, data?: string): void { - this.connected = false; - - return this.wsSocket.close(code, data); - } - - /** - * Set the callback to call when encountering socket closures. - */ - public setOnCloseHandler(handler: (x: any) => void): void { - this.wsSocket.on('close', handler); - } - - /** - * Set the callback to call when encountering errors. - */ - public setOnErrorHandler(handler: (x: any) => void): void { - this.wsSocket.on('error', (error): void => { if (error) { handler(error); } }); - } -} \ No newline at end of file diff --git a/libraries/botframework-streaming/tests/NodeWebSocket.test.js b/libraries/botframework-streaming/tests/NodeWebSocket.test.js index 47b4884fbb..7b0b68c1d9 100644 --- a/libraries/botframework-streaming/tests/NodeWebSocket.test.js +++ b/libraries/botframework-streaming/tests/NodeWebSocket.test.js @@ -1,77 +1,82 @@ const { NodeWebSocket } = require('../'); const { expect } = require('chai'); const { FauxSock, TestRequest } = require('./helpers'); +const { randomBytes } = require('crypto'); -describe('NodeSocket', () => { - it('creates a new NodeSocket', () => { - const ns = new NodeWebSocket(new FauxSock); - expect(ns).to.be.instanceOf(NodeWebSocket); - expect(ns.close()).to.not.be.undefined; +describe('NodeWebSocket', () => { + it('creates a new NodeWebSocket', () => { + const socket = new NodeWebSocket(new FauxSock); + expect(socket).to.be.instanceOf(NodeWebSocket); + expect(socket.close()).to.not.throw; }); it('requires a valid URL', () => { try { - const ns = new NodeWebSocket(new FauxSock); + const socket = new NodeWebSocket(new FauxSock); } catch (error) { expect(error.message).to.equal('Invalid URL: fakeURL'); } }); it('starts out connected', () => { - const ns = new NodeWebSocket(new FauxSock); - expect(ns.isConnected).to.be.true; + const socket = new NodeWebSocket(new FauxSock); + expect(socket.isConnected).to.be.true; }); it('writes to the socket', () => { - const ns = new NodeWebSocket(new FauxSock); + const socket = new NodeWebSocket(new FauxSock); const buff = Buffer.from('hello'); - expect(ns.write(buff)).to.not.throw; + expect(socket.write(buff)).to.not.throw; }); it('attempts to open a connection', () => { - const ns = new NodeWebSocket(new FauxSock); - expect(ns.connect().catch((error) => { + const socket = new NodeWebSocket(new FauxSock); + expect(socket.connect().catch((error) => { expect(error.message).to.equal('connect ECONNREFUSED 127.0.0.1:8082'); })); }); it('can set message handlers on the socket', () => { const sock = new FauxSock(); - const ns = new NodeWebSocket(sock); - expect(sock.textHandler).to.be.undefined; - expect(sock.binaryHandler).to.be.undefined; - expect(ns.setOnMessageHandler(() => { })).to.not.throw; - expect(sock.textHandler).to.not.be.undefined; - expect(sock.binaryHandler).to.not.be.undefined; + const socket = new NodeWebSocket(sock); + expect(sock.dataHandler).to.be.undefined; + expect(sock._messageHandler).to.be.undefined; + expect(socket.setOnMessageHandler(() => { })).to.not.throw; + expect(sock.dataHandler).to.not.be.undefined; + expect(sock._messageHandler).to.not.be.undefined; }); it('can set error handler on the socket', () => { const sock = new FauxSock(); - const ns = new NodeWebSocket(sock); + const socket = new NodeWebSocket(sock); expect(sock.errorHandler).to.be.undefined; - expect(ns.setOnErrorHandler(() => { })).to.not.throw; + expect(socket.setOnErrorHandler(() => { })).to.not.throw; expect(sock.errorHandler).to.not.be.undefined; }); it('can set end handler on the socket', () => { const sock = new FauxSock(); - const ns = new NodeWebSocket(sock); - expect(sock.endHandler).to.be.undefined; - expect(ns.setOnCloseHandler(() => { })).to.not.throw; - expect(sock.endHandler).to.not.be.undefined; + const socket = new NodeWebSocket(sock); + expect(sock.closeHandler).to.be.undefined; + expect(socket.setOnCloseHandler(() => { })).to.not.throw; + expect(sock.closeHandler).to.not.be.undefined; }); - it('create() should be successful and set a WebSocket', () => { + it('create() should be successful and set a WebSocket', async () => { const sock = new FauxSock(); const nodeSocket = new NodeWebSocket(); const request = new TestRequest(); + + // Configure a proper upgrade request for `ws`. request.setIsUpgradeRequest(true); - request.headers = []; - request.headers['upgrade'] = 'websocket'; - request.headers['sec-websocket-key'] = 'BFlat'; + request.headers = { upgrade: 'websocket' }; + // Use Node.js `crypto` module to calculate a valid 'sec-websocket-key' value. + // The key must pass this RegExp: + // https://github.com/websockets/ws/blob/0a612364e69fc07624b8010c6873f7766743a8e3/lib/websocket-server.js#L12 + request.headers['sec-websocket-key'] = randomBytes(16).toString('base64'); request.headers['sec-websocket-version'] = '13'; request.headers['sec-websocket-protocol'] = ''; - nodeSocket.create(request, sock, Buffer.from([])); - nodeSocket.waterShedSocket.destroy(); + + await nodeSocket.create(request, sock, Buffer.from([])); }); }); diff --git a/libraries/botframework-streaming/tests/NodeWebSocketFactory.test.js b/libraries/botframework-streaming/tests/NodeWebSocketFactory.test.js index aa7391ce84..47819e316e 100644 --- a/libraries/botframework-streaming/tests/NodeWebSocketFactory.test.js +++ b/libraries/botframework-streaming/tests/NodeWebSocketFactory.test.js @@ -1,22 +1,22 @@ const { NodeWebSocket, NodeWebSocketFactory } = require('../'); const { FauxSock, TestRequest } = require('./helpers'); const { expect } = require('chai'); +const { randomBytes } = require('crypto'); describe('NodeWebSocketFactory', () => { - it('createWebSocket() should create a new NodeWebSocket', () => { + it('createWebSocket() should create a new NodeWebSocket', async () => { const factory = new NodeWebSocketFactory(); const sock = new FauxSock(); const request = new TestRequest(); - + request.method = 'GET' // TODO: Fix TestRequest class request.setIsUpgradeRequest(true); request.headers = []; request.headers['upgrade'] = 'websocket'; - request.headers['sec-websocket-key'] = 'BFlat'; + request.headers['sec-websocket-key'] = randomBytes(16).toString('base64'); request.headers['sec-websocket-version'] = '13'; request.headers['sec-websocket-protocol'] = ''; - const socket = factory.createWebSocket(request, sock, Buffer.from([])); + const socket = await factory.createWebSocket(request, sock, Buffer.from([])); expect(socket).to.be.instanceOf(NodeWebSocket); - socket.waterShedSocket.destroy(); }); }); diff --git a/libraries/botframework-streaming/tests/streamingAdatper.test.js b/libraries/botframework-streaming/tests/streamingAdatper.test.js deleted file mode 100644 index 2e42af2a51..0000000000 --- a/libraries/botframework-streaming/tests/streamingAdatper.test.js +++ /dev/null @@ -1,519 +0,0 @@ -const { StreamingAdapter } = require('../'); -const { StatusCodes } = require('botbuilder'); -const { ActivityHandler } = require('botbuilder-core'); -const chai = require('chai'); -var expect = chai.expect; - -class FauxSock { - constructor(contentString) { - if (contentString) { - this.contentString = contentString; - this.position = 0; - } - this.connected = true; - } - isConnected() { return this.connected; } - write(buffer) { return; } - connect(serverAddress) { return new Promise(); } - close() { this.connected = false; return; } - setOnMessageHandler(handler) { return; } //(x: any) => void); - setOnErrorHandler(handler) { return; } - setOnCloseHandler(handler) { return; } -} - -class TestRequest { - constructor() { - let headers = []; - } - - isUpgradeRequest() { - return this.upgradeRequestVal; - } - - setIsUpgradeRequest(value) { - this.upgradeRequestVal = value; - } - - status() { - return this.statusVal; - } - - status(value) { - this.statusVal = value; - } - - path(value) { - this.pathVal = value; - } - - path() { - return this.pathVal; - } - - verb(value) { - this.verbVal = value; - } - - verb() { - return this.verbVal; - } - - streams(value) { - this.streamsVal = value; - } - - streams() { - return this.streamsVal; - } - - setHeaders() { - return this.headersVal; - } - - setHeaders(value) { - this.headers = value; - } - -} - -class TestResponse { - send(value) { - this.sendVal = value; - return this.sendVal; - } - - status(value) { - this.statusVal = value; - return this.statusVal; - } - - setClaimUpgrade(value) { - this.claimUpgradeVal = value; - } - - claimUpgrade() { - return this.claimUpgradeVal; - } -} - -class TestAdapterSettings { - constructor(appId = undefined, appPassword = undefined, channelAuthTenant, oAuthEndpoint, openIdMetadata, channelServce) { - this.appId = appId; - this.appPassword = appPassword; - this.enableWebSockets = true; - } -} - -describe('BotFrameworkStreamingAdapter tests', () => { - - it('has the correct status codes', () => { - expect(StatusCodes.OK).to.equal(200); - expect(StatusCodes.BAD_REQUEST).to.equal(400); - expect(StatusCodes.UNAUTHORIZED).to.equal(401); - expect(StatusCodes.NOT_FOUND).to.equal(404); - expect(StatusCodes.METHOD_NOT_ALLOWED).to.equal(405); - expect(StatusCodes.UPGRADE_REQUIRED).to.equal(426); - expect(StatusCodes.INTERNAL_SERVER_ERROR).to.equal(500); - expect(StatusCodes.NOT_IMPLEMENTED).to.equal(501); - }); - - it('gets constructed properly', () => { - let handler = new StreamingAdapter(); - - expect(handler).to.be.instanceOf(StreamingAdapter); - }); - - it('starts and stops a namedpipe server', () => { - let handler = new StreamingAdapter(); - - handler.useNamedPipe('PipeyMcPipeface', async (context) => { - // Route to bot - await bot.run(context); - }); - expect(handler.streamingServer.disconnect()).to.not.throw; - }); - - it('starts and stops a websocket server', async () => { - const bot = new ActivityHandler(); - const handler = new StreamingAdapter(new TestAdapterSettings()); - const request = new TestRequest(); - request.setIsUpgradeRequest(true); - request.headers = []; - request.headers['upgrade'] = 'websocket'; - request.headers['sec-websocket-key'] = 'BFlat'; - request.headers['sec-websocket-version'] = '13'; - request.headers['sec-websocket-protocol'] = ''; - - const response = new TestResponse({ claimUpgrade: 'anything' }); - const fakeSocket = { - unshift: function () { return true; }, - write: function (value) { }, - on: function (value) { }, - read: function () { return new Buffer.from('data', 'utf8'); }, - end: function () { return; }, - }; - response.setClaimUpgrade({ socket: fakeSocket, head: 'websocket' }); - await handler.useWebSocket(request, response, async (context) => { - // Route to bot - await bot.run(context); - }); - }); - - it('returns a connector client', async () => { - let bot = new ActivityHandler(); - let handler = new StreamingAdapter(new TestAdapterSettings()); - let request = new TestRequest(); - request.setIsUpgradeRequest(true); - request.headers = []; - request.headers['upgrade'] = 'websocket'; - request.headers['sec-websocket-key'] = 'BFlat'; - request.headers['sec-websocket-version'] = '13'; - request.headers['sec-websocket-protocol'] = ''; - let response = new TestResponse(); - let fakeSocket = { - unshift: function () { return true; }, - write: function (value) { }, - on: function (value) { }, - read: function () { return new Buffer.from('data', 'utf8'); }, - end: function () { return; }, - }; - response.socket = fakeSocket; - response.setClaimUpgrade({ socket: fakeSocket, head: 'websocket' }); - - await handler.useWebSocket(request, response, async (context) => { - // Route to bot - await bot.run(context); - }); - const cc = handler.createConnectorClient('urn:test'); - expect(cc.baseUri).to.equal('urn:test'); - }); - - describe('useWebSocket()', () => { - it('connects', async () => { - let bot = new ActivityHandler(); - let handler = new StreamingAdapter(new TestAdapterSettings()); - let request = new TestRequest(); - request.setIsUpgradeRequest(true); - request.headers = []; - request.headers['upgrade'] = 'websocket'; - request.headers['sec-websocket-key'] = 'BFlat'; - request.headers['sec-websocket-version'] = '13'; - request.headers['sec-websocket-protocol'] = ''; - let response = new TestResponse(); - let fakeSocket = { - unshift: function () { return true; }, - write: function (value) { }, - on: function (value) { }, - read: function () { return new Buffer.from('data', 'utf8'); }, - end: function () { return; }, - }; - response.socket = fakeSocket; - response.setClaimUpgrade({ socket: fakeSocket, head: 'websocket' }); - - await handler.useWebSocket(request, response, async (context) => { - // Route to bot - await bot.run(context); - }); - }); - - it('returns status code 401 when request is not authorized', async () => { - let bot = new ActivityHandler(); - const settings = new TestAdapterSettings('appId', 'password'); - let handler = new StreamingAdapter(settings); - let request = new TestRequest(); - request.setIsUpgradeRequest(true); - request.setHeaders({ channelid: 'fakechannel', authorization: 'donttrustme' }); - let response = new TestResponse(); - - await handler.useWebSocket(request, response, async (context) => { - // Route to bot - await bot.run(context); - throw new Error('useWebSocket should have thrown an error'); - }).catch(err => { - expect(err.message).to.equal('Unauthorized. No valid identity.'); - }); - }); - }); - - describe('processRequest()', () => { - it('returns a 400 when the request is missing verb', async () => { - let handler = new StreamingAdapter(); - let request = new TestRequest(); - request.verb = undefined; - request.path = '/api/messages'; - let fakeStream = { - readAsJson: function () { return { type: 'Invoke', serviceUrl: 'somewhere/', channelId: 'test' }; }, - }; - request.streams[0] = fakeStream; - - const response = await handler.processRequest(request); - expect(response.statusCode).to.equal(400); - }); - - it('returns a 400 when the request is missing path', async () => { - let handler = new StreamingAdapter(); - let request = new TestRequest(); - request.verb = 'POST'; - request.path = undefined; - let fakeStream = { - readAsJson: function () { return { type: 'Invoke', serviceUrl: 'somewhere/', channelId: 'test' }; }, - }; - request.streams[0] = fakeStream; - - const response = await handler.processRequest(request); - expect(response.statusCode).to.equal(400); - }); - - it('returns a 400 when the request body is missing', async () => { - let handler = new StreamingAdapter(); - let request = new TestRequest('POST', '/api/messages'); - request.verb = 'POST'; - request.path = '/api/messages'; - request.streams = undefined; - - const response = await handler.processRequest(request); - expect(response.statusCode).to.equal(400); - }); - - it('returns user agent information when a GET hits the version endpoint', async () => { - let handler = new StreamingAdapter(); - let request = new TestRequest(); - request.verb = 'GET'; - request.path = '/api/version'; - let fakeStream = { - readAsJson: function () { return { type: 'Invoke', serviceUrl: 'somewhere/', channelId: 'test' }; }, - }; - request.streams[0] = fakeStream; - - const response = await handler.processRequest(request); - expect(response.statusCode).to.equal(200); - expect(response.streams[0].content).to.not.be.undefined; - }); - - it('returns user agent information from cache when a GET hits the version endpoint more than once', async () => { - let handler = new StreamingAdapter(); - let request = new TestRequest(); - request.verb = 'GET'; - request.path = '/api/version'; - let fakeStream = { - readAsJson: function () { return { type: 'Invoke', serviceUrl: 'somewhere/', channelId: 'test' }; }, - }; - request.streams[0] = fakeStream; - - const response = await handler.processRequest(request); - expect(response.statusCode).to.equal(200); - expect(response.streams[0].content).to.not.be.undefined; - - const response2 = await handler.processRequest(request); - expect(response2.statusCode).to.equal(200); - expect(response2.streams[0].content).to.not.be.undefined; - }); - - it('returns 405 for unsupported methods', async () => { - let handler = new StreamingAdapter(); - let request = new TestRequest(); - request.verb = 'UPDATE'; - request.path = '/api/version'; - let fakeStream = { - readAsJson: function () { return { type: 'Invoke', serviceUrl: 'somewhere/', channelId: 'test' }; }, - }; - request.streams[0] = fakeStream; - - const response = await handler.processRequest(request); - expect(response.statusCode).to.equal(405); - }); - - it('returns 404 for unsupported paths', async () => { - let handler = new StreamingAdapter(); - let request = new TestRequest(); - request.verb = 'POST'; - request.path = '/api/supersecretbackdoor'; - let fakeStream = { - readAsJson: function () { return { type: 'Invoke', serviceUrl: 'somewhere/', channelId: 'test' }; }, - }; - request.streams[0] = fakeStream; - - const response = await handler.processRequest(request); - expect(response.statusCode).to.equal(404); - }); - - it('processes a well formed request when there is no middleware with a non-Invoke activity type', async () => { - let bot = new ActivityHandler(); - let handler = new StreamingAdapter(); - let request = new TestRequest(); - request.verb = 'POST'; - request.path = '/api/messages'; - let fakeStream = { - readAsJson: function () { return { type: 'something', serviceUrl: 'somewhere/', channelId: 'test' }; }, - }; - request.streams[0] = fakeStream; - - handler.logic = async (context) => { - // Route to bot - await bot.run(context); - }; - - const response = await handler.processRequest(request); - expect(response.statusCode).to.equal(200); - }); - - it('returns a 501 when activity type is invoke, but the activity is invalid', async () => { - let bot = new ActivityHandler(); - let handler = new StreamingAdapter(); - let request = new TestRequest(); - request.verb = 'POST'; - request.path = '/api/messages'; - let fakeStream = { - readAsJson: function () { return { type: 'invoke', serviceUrl: 'somewhere/', channelId: 'test' }; }, - }; - request.streams[0] = fakeStream; - - handler.logic = async (context) => { - // Route to bot - await bot.run(context); - }; - - const response = await handler.processRequest(request); - expect(response.statusCode).to.equal(501); - }); - - it('returns a 500 when bot can not run', async () => { - const MiddleWare = require('botbuilder-core'); - let bot = {}; - let mw = { - async onTurn(context, next) { - console.log('Middleware executed!'); - await next(); - } - }; - let mwset = []; - mwset.push(mw); - let handler = new StreamingAdapter({ bot: bot, middleWare: mwset }); - let request = new TestRequest(); - request.verb = 'POST'; - request.path = '/api/messages'; - let fakeStream = { - readAsJson: function () { return { type: 'invoke', serviceUrl: 'somewhere/', channelId: 'test' }; }, - }; - request.streams[0] = fakeStream; - - const response = await handler.processRequest(request); - expect(response.statusCode).to.equal(500); - }); - - it('executes middleware', async () => { - var sinon = require('sinon'); - let bot = new ActivityHandler(); - bot.run = function (turnContext) { return Promise.resolve(); }; - - let handler = new StreamingAdapter(); - let middlewareCalled = false; - const middleware = { - async onTurn(context, next) { - middlewareCalled = true; - return next(); - } - } - - handler.use(middleware); - - const runSpy = sinon.spy(bot, 'run'); - let request = new TestRequest(); - request.verb = 'POST'; - request.path = '/api/messages'; - let fakeStream = { - readAsJson: function () { return { type: 'invoke', serviceUrl: 'somewhere/', channelId: 'test' }; }, - }; - request.streams[0] = fakeStream; - - handler.logic = async (context) => { - // Route to bot - await bot.run(context); - }; - - const response = await handler.processRequest(request); - expect(response.statusCode).to.equal(501); - expect(runSpy.called).to.be.true; - expect(middlewareCalled).to.be.true; - }); - }); - - it('sends a request', async () => { - let bot = new ActivityHandler(); - let handler = new StreamingAdapter(new TestAdapterSettings()); - let request = new TestRequest(); - request.setIsUpgradeRequest(true); - request.headers = []; - request.headers['upgrade'] = 'websocket'; - request.headers['sec-websocket-key'] = 'BFlat'; - request.headers['sec-websocket-version'] = '13'; - request.headers['sec-websocket-protocol'] = ''; - let response = new TestResponse(); - let fakeSocket = { - unshift: function () { return true; }, - write: function () { return Promise.resolve; }, - on: function () { return; }, - read: function () { return new Buffer.from('data', 'utf8'); }, - end: function () { return Promise.resolve; }, - }; - response.socket = fakeSocket; - const sinon = require('sinon'); - const spy = sinon.spy(fakeSocket, "write"); - response.setClaimUpgrade({ socket: fakeSocket, head: 'websocket' }); - - try { - await handler.useWebSocket(request, response, async (context) => { - // Route to bot - await bot.run(context); - }) - } catch (err) { - throw err; - } - - let connection = handler.createConnectorClient('fakeUrl'); - connection.sendRequest({ method: 'POST', url: 'testResultDotCom', body: 'Test body!' }); - expect(spy.called).to.be.true; - }).timeout(2000); - - describe('private methods', () => { - it('should identify streaming connections', function () { - let activity = { - type: 'message', - text: 'TestOAuth619 test activity', - recipient: { id: 'TestOAuth619' }, - }; - - const streaming = [ - 'urn:botframework:WebSocket:wss://beep.com', - 'urn:botframework:WebSocket:http://beep.com', - 'URN:botframework:WebSocket:wss://beep.com', - 'URN:botframework:WebSocket:http://beep.com', - ]; - - streaming.forEach(s => { - activity.serviceUrl = s; - expect(StreamingAdapter.isFromStreamingConnection(activity)).to.be.true; - }); - }); - - it('should identify http connections', function () { - let activity = { - type: 'message', - text: 'TestOAuth619 test activity', - recipient: { id: 'TestOAuth619' }, - }; - - const streaming = [ - 'http://yayay.com', - 'https://yayay.com', - 'HTTP://yayay.com', - 'HTTPS://yayay.com', - ]; - - streaming.forEach(s => { - activity.serviceUrl = s; - expect(StreamingAdapter.isFromStreamingConnection(activity)).to.be.false; - }); - }); - }); -}); diff --git a/libraries/botframework-streaming/tests/wsNodeWebSocket.test.js b/libraries/botframework-streaming/tests/wsNodeWebSocket.test.js deleted file mode 100644 index 755aa787e8..0000000000 --- a/libraries/botframework-streaming/tests/wsNodeWebSocket.test.js +++ /dev/null @@ -1,82 +0,0 @@ -const { WsNodeWebSocket } = require('../'); -const { expect } = require('chai'); -const { FauxSock, TestRequest } = require('./helpers'); -const { randomBytes } = require('crypto'); - -describe('WsNodeWebSocket', () => { - it('creates a new WsNodeWebSocket', () => { - const wsSocket = new WsNodeWebSocket(new FauxSock); - expect(wsSocket).to.be.instanceOf(WsNodeWebSocket); - expect(wsSocket.close()).to.not.throw; - }); - - it('requires a valid URL', () => { - try { - const wsSocket = new WsNodeWebSocket(new FauxSock); - } catch (error) { - expect(error.message).to.equal('Invalid URL: fakeURL'); - } - }); - - it('starts out connected', () => { - const wsSocket = new WsNodeWebSocket(new FauxSock); - expect(wsSocket.isConnected).to.be.true; - }); - - it('writes to the socket', () => { - const wsSocket = new WsNodeWebSocket(new FauxSock); - const buff = Buffer.from('hello'); - expect(wsSocket.write(buff)).to.not.throw; - }); - - it('attempts to open a connection', () => { - const wsSocket = new WsNodeWebSocket(new FauxSock); - expect(wsSocket.connect().catch((error) => { - expect(error.message).to.equal('connect ECONNREFUSED 127.0.0.1:8082'); - })); - }); - - it('can set message handlers on the socket', () => { - const sock = new FauxSock(); - const wsSocket = new WsNodeWebSocket(sock); - expect(sock.dataHandler).to.be.undefined; - expect(sock._messageHandler).to.be.undefined; - expect(wsSocket.setOnMessageHandler(() => { })).to.not.throw; - expect(sock.dataHandler).to.not.be.undefined; - expect(sock._messageHandler).to.not.be.undefined; - }); - - it('can set error handler on the socket', () => { - const sock = new FauxSock(); - const wsSocket = new WsNodeWebSocket(sock); - expect(sock.errorHandler).to.be.undefined; - expect(wsSocket.setOnErrorHandler(() => { })).to.not.throw; - expect(sock.errorHandler).to.not.be.undefined; - }); - - it('can set end handler on the socket', () => { - const sock = new FauxSock(); - const wsSocket = new WsNodeWebSocket(sock); - expect(sock.closeHandler).to.be.undefined; - expect(wsSocket.setOnCloseHandler(() => { })).to.not.throw; - expect(sock.closeHandler).to.not.be.undefined; - }); - - it('create() should be successful and set a WebSocket', async () => { - const sock = new FauxSock(); - const nodeSocket = new WsNodeWebSocket(); - const request = new TestRequest(); - - // Configure a proper upgrade request for `ws`. - request.setIsUpgradeRequest(true); - request.headers = { upgrade: 'websocket' }; - // Use Node.js `crypto` module to calculate a valid 'sec-websocket-key' value. - // The key must pass this RegExp: - // https://github.com/websockets/ws/blob/0a612364e69fc07624b8010c6873f7766743a8e3/lib/websocket-server.js#L12 - request.headers['sec-websocket-key'] = randomBytes(16).toString('base64'); - request.headers['sec-websocket-version'] = '13'; - request.headers['sec-websocket-protocol'] = ''; - - await nodeSocket.create(request, sock, Buffer.from([])); - }); -}); diff --git a/package.json b/package.json index 7accfe9f92..46dca98f88 100644 --- a/package.json +++ b/package.json @@ -7,9 +7,9 @@ "clean": "lerna run clean", "functional-test": "lerna run build && nyc mocha \"libraries/functional-tests/tests/*.test.js\"", "browser-functional-test": "cd libraries/browser-functional-tests && node nightwatch.js -e chrome", - "test": "lerna run build && nyc mocha \"libraries/bot*/tests/*.test.js\"", - "test:coveralls": "lerna run build && nyc mocha \"libraries/bot*/tests/*.test.js\" && nyc report --reporter=text-lcov | coveralls", - "test-coverage": "nyc mocha \"libraries/bot*/tests/*.test.js\" ", + "test": "lerna run build && nyc mocha \"libraries/bot*/tests/**/*.test.js\"", + "test:coveralls": "lerna run build && nyc mocha \"libraries/bot*/tests/**/*.test.js\" && nyc report --reporter=text-lcov | coveralls", + "test-coverage": "nyc mocha \"libraries/bot*/tests/**/*.test.js\"", "upload-coverage": "nyc report --reporter=text-lcov | coveralls", "build-docs": "lerna run build-docs", "eslint": "eslint ./libraries/*/src/*.ts ./libraries/*/src/**/*.ts",