-
Notifications
You must be signed in to change notification settings - Fork 281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Streaming, Preview] rewrite useWebSocket for low-level usage, remove from processActivity #1433
Changes from all commits
02977f4
5d01690
06cc3e4
b08c9fa
50a59e6
ab961af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,12 +6,12 @@ | |
* Licensed under the MIT License. | ||
*/ | ||
|
||
import { IncomingMessage } from 'http'; | ||
import { STATUS_CODES } from 'http'; | ||
import * as os from 'os'; | ||
|
||
import { Activity, ActivityTypes, BotAdapter, BotCallbackHandlerKey, ChannelAccount, ConversationAccount, ConversationParameters, ConversationReference, ConversationsResult, IUserTokenProvider, ResourceResponse, TokenResponse, TurnContext } from 'botbuilder-core'; | ||
import { AuthenticationConstants, ChannelValidation, ConnectorClient, EmulatorApiClient, GovernmentConstants, GovernmentChannelValidation, JwtTokenValidation, MicrosoftAppCredentials, SimpleCredentialProvider, TokenApiClient, TokenStatus, TokenApiModels } from 'botframework-connector'; | ||
import { IReceiveRequest, ISocket, IStreamingTransportServer, NamedPipeServer, NodeWebSocketFactory, NodeWebSocketFactoryBase, RequestHandler, StreamingResponse, WebSocketServer } from 'botframework-streaming'; | ||
import { INodeBuffer, INodeSocket, IReceiveRequest, ISocket, IStreamingTransportServer, NamedPipeServer, NodeWebSocketFactory, NodeWebSocketFactoryBase, RequestHandler, StreamingResponse, WebSocketServer } from 'botframework-streaming'; | ||
|
||
import { StreamingHttpClient, TokenResolver } from './streaming'; | ||
|
||
|
@@ -135,12 +135,7 @@ export interface BotFrameworkAdapterSettings { | |
channelService?: string; | ||
|
||
/** | ||
* Optional. The option to determine if this adapter accepts WebSocket connections | ||
*/ | ||
enableWebSockets?: boolean; | ||
|
||
/** | ||
* Optional. Used to pass in a NodeWebSocketFactoryBase instance. Allows bot to accept WebSocket connections. | ||
* Optional. Used to pass in a NodeWebSocketFactoryBase instance. | ||
*/ | ||
webSocketFactory?: NodeWebSocketFactoryBase; | ||
} | ||
|
@@ -268,12 +263,7 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide | |
this.credentials.oAuthScope = GovernmentConstants.ToChannelFromBotOAuthScope; | ||
} | ||
|
||
// If the developer wants to use WebSockets, but didn't provide a WebSocketFactory, | ||
// create a NodeWebSocketFactory. | ||
if (this.settings.enableWebSockets && !this.settings.webSocketFactory) { | ||
this.webSocketFactory = new NodeWebSocketFactory(); | ||
} | ||
|
||
// If a NodeWebSocketFactoryBase was passed in, set it on the BotFrameworkAdapter. | ||
if (this.settings.webSocketFactory) { | ||
this.webSocketFactory = this.settings.webSocketFactory; | ||
} | ||
|
@@ -755,9 +745,6 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide | |
* ``` | ||
*/ | ||
public async processActivity(req: WebRequest, res: WebResponse, logic: (context: TurnContext) => Promise<any>): Promise<void> { | ||
if (this.settings.enableWebSockets && req.method === GET && (req.headers.Upgrade || req.headers.upgrade)) { | ||
return this.useWebSocket(req, res, logic); | ||
} | ||
|
||
let body: any; | ||
let status: number; | ||
|
@@ -1148,39 +1135,41 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide | |
/** | ||
* Process the initial request to establish a long lived connection via a streaming server. | ||
* @param req The connection request. | ||
* @param res The response sent on error or connection termination. | ||
* @param logic The logic that will handle incoming requests. | ||
* @param socket The raw socket connection between the bot (server) and channel/caller (client). | ||
* @param head The first packet of the upgraded stream. | ||
* @param logic The logic that handles incoming streaming requests for the lifetime of the WebSocket connection. | ||
*/ | ||
public async useWebSocket(req: WebRequest, res: WebResponse, logic: (context: TurnContext) => Promise<any>): Promise<void> { | ||
public async useWebSocket(req: WebRequest, socket: INodeSocket, head: INodeBuffer, logic: (context: TurnContext) => Promise<any>): Promise<void> { | ||
// Use the provided NodeWebSocketFactoryBase on BotFrameworkAdapter construction, | ||
// otherwise create a new NodeWebSocketFactory. | ||
const webSocketFactory = this.webSocketFactory || new NodeWebSocketFactory(); | ||
|
||
if (!logic) { | ||
throw new Error('Streaming logic needs to be provided to `useWebSocket`'); | ||
} | ||
|
||
if (!this.webSocketFactory || !this.webSocketFactory.createWebSocket) { | ||
throw new Error('BotFrameworkAdapter must have a WebSocketFactory in order to support streaming.'); | ||
} | ||
|
||
this.logic = logic; | ||
|
||
// Restify-specific check. | ||
if (typeof((res as any).claimUpgrade) !== 'function') { | ||
throw new Error('ClaimUpgrade is required for creating WebSocket connection.'); | ||
} | ||
|
||
try { | ||
await this.authenticateConnection(req, this.settings.channelService); | ||
} catch (err) { | ||
// Set the correct status code for the socket to send back to the channel. | ||
res.status(StatusCodes.UNAUTHORIZED); | ||
res.send(err.message); | ||
// If the authenticateConnection call fails, send back the correct error code and close | ||
// the connection. | ||
if (typeof(err.message) === 'string' && err.message.toLowerCase().startsWith('unauthorized')) { | ||
abortWebSocketUpgrade(socket, 401); | ||
} else if (typeof(err.message) === 'string' && err.message.toLowerCase().startsWith(`'authheader'`)) { | ||
stevengum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
abortWebSocketUpgrade(socket, 400); | ||
} else { | ||
abortWebSocketUpgrade(socket, 500); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why dont we send the error message in the response? Seems impossible to detect the error without some more info. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or are they usually cryptic? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree that this isn't ideal. Maybe we can use NODE_ENV to detect if running in prod or not? Also, this is a great example where proper o11y (logging/tracing) would be helpful |
||
} | ||
Comment on lines
+1156
to
+1164
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @carlosscastro I'm not happy with this code, but I'm not sure if there are better steps that can be taken here. Node.js does not have HTTP Status Code-specific errors, so our underlying auth layers throw I have a test that covers the 401 Unauthorized route, and tested the other two routes manually. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, that hurts, but I understand. Can we improve the error reporting in the auth layers instead of here? Maybe we could create an abstraction that processes error messages and creates well defined errors + codes, and then the auth layer uses that in order to throw richer errors. Sounds like something that may not be possible without breaking backward compat though. If it is not possible to move this logic to the auth layer, then I guess we are out of choices. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filed #1445 for this. I've tentatively labeled it as a P2 for R7. Since this is internal, I don't want to block on this specific code. I'd like to get this in so that I can have the package available for the WebChat DLS bug bash tomorrow. |
||
|
||
// Re-throw the error so the developer will know what occurred. | ||
throw err; | ||
} | ||
|
||
const upgrade = (res as any).claimUpgrade(); | ||
const socket = await this.webSocketFactory.createWebSocket(req as IncomingMessage, upgrade.socket, upgrade.head); | ||
const nodeWebSocket = await webSocketFactory.createWebSocket(req, socket, head); | ||
|
||
await this.startWebSocket(socket); | ||
await this.startWebSocket(nodeWebSocket); | ||
} | ||
|
||
private async authenticateConnection(req: WebRequest, channelService?: string): Promise<void> { | ||
|
@@ -1303,4 +1292,13 @@ function delay(timeout: number): Promise<void> { | |
return new Promise((resolve) => { | ||
setTimeout(resolve, timeout); | ||
}); | ||
} | ||
|
||
function abortWebSocketUpgrade(socket: INodeSocket, code: number) { | ||
if (socket.writable) { | ||
const connectionHeader = `Connection: 'close'\r\n`; | ||
socket.write(`HTTP/1.1 ${code} ${STATUS_CODES[code]}\r\n${connectionHeader}\r\n`); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just checking - could we have any cross platform issues with the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should work on the different platforms, but I can check on a different one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about HTTP/2 calls to upgrade to WebSockets? Can we dynamically populate the HTTP version in this response? I'm not sure it matters, but it may lead to confusing debugging and AFAIK we don't specifically NOT support HTTP/2 anywhere. (The ABS channels are all HTTP/1.1, but there's nothing stopping an outside channel from using HTTP/2.) Not a ship stopper, but food for thought. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also it looks like we're never sending the actual error message back now, is that right? That will make debugging problematic, especially if the code is a 500. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
socket.destroy(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
string matching is error prone (P1) and expensive (P2)
One way to improve confidence is to use a string constant that's shared between the component throwing the error and this component, that way if the message changes, we're not exposed.
Ideally, we'd move to a status code based system which we can avoid doing message parsing in.