Skip to content

Commit

Permalink
feat: add support for Express middlewares
Browse files Browse the repository at this point in the history
This commit implements middlewares at the Engine.IO level, because
Socket.IO middlewares are meant for namespace authorization and are not
executed during a classic HTTP request/response cycle.

A workaround was possible by using the allowRequest option and the
"headers" event, but this feels way cleaner and works with upgrade
requests too.

Syntax:

```js
engine.use((req, res, next) => {
  // do something

  next();
});

// with express-session
import session from "express-session";

engine.use(session({
  secret: "keyboard cat",
  resave: false,
  saveUninitialized: true,
  cookie: { secure: true }
});

// with helmet
import helmet from "helmet";

engine.use(helmet());
```

Related:

- #668
- #651
- socketio/socket.io#4609
- socketio/socket.io#3933
- a lot of other issues asking for compatibility with express-session
  • Loading branch information
darrachequesne committed Feb 6, 2023
1 parent 4d6f454 commit 24786e7
Show file tree
Hide file tree
Showing 5 changed files with 723 additions and 122 deletions.
179 changes: 147 additions & 32 deletions lib/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@ import { Socket } from "./socket";
import debugModule from "debug";
import { serialize } from "cookie";
import { Server as DEFAULT_WS_ENGINE } from "ws";
import { IncomingMessage, Server as HttpServer } from "http";
import { CookieSerializeOptions } from "cookie";
import { CorsOptions, CorsOptionsDelegate } from "cors";
import type {
IncomingMessage,
Server as HttpServer,
ServerResponse,
} from "http";
import type { CookieSerializeOptions } from "cookie";
import type { CorsOptions, CorsOptionsDelegate } from "cors";
import type { Duplex } from "stream";

const debug = debugModule("engine");

const kResponseHeaders = Symbol("responseHeaders");

type Transport = "polling" | "websocket";

export interface AttachOptions {
Expand Down Expand Up @@ -119,12 +126,26 @@ export interface ServerOptions {
allowEIO3?: boolean;
}

/**
* An Express-compatible middleware.
*
* Middleware functions are functions that have access to the request object (req), the response object (res), and the
* next middleware function in the application’s request-response cycle.
*
* @see https://expressjs.com/en/guide/using-middleware.html
*/
type Middleware = (
req: IncomingMessage,
res: ServerResponse,
next: () => void
) => void;

export abstract class BaseServer extends EventEmitter {
public opts: ServerOptions;

protected clients: any;
private clientsCount: number;
protected corsMiddleware: Function;
protected middlewares: Middleware[] = [];

/**
* Server constructor.
Expand Down Expand Up @@ -170,7 +191,7 @@ export abstract class BaseServer extends EventEmitter {
}

if (this.opts.cors) {
this.corsMiddleware = require("cors")(this.opts.cors);
this.use(require("cors")(this.opts.cors));
}

if (opts.perMessageDeflate) {
Expand Down Expand Up @@ -289,6 +310,52 @@ export abstract class BaseServer extends EventEmitter {
fn();
}

/**
* Adds a new middleware.
*
* @example
* import helmet from "helmet";
*
* engine.use(helmet());
*
* @param fn
*/
public use(fn: Middleware) {
this.middlewares.push(fn);
}

/**
* Apply the middlewares to the request.
*
* @param req
* @param res
* @param callback
* @protected
*/
protected _applyMiddlewares(
req: IncomingMessage,
res: ServerResponse,
callback: () => void
) {
if (this.middlewares.length === 0) {
debug("no middleware to apply, skipping");
return callback();
}

const apply = (i) => {
debug("applying middleware n°%d", i + 1);
this.middlewares[i](req, res, () => {
if (i + 1 < this.middlewares.length) {
apply(i + 1);
} else {
callback();
}
});
};

apply(0);
}

/**
* Closes all clients.
*
Expand Down Expand Up @@ -449,6 +516,40 @@ export abstract class BaseServer extends EventEmitter {
};
}

/**
* Exposes a subset of the http.ServerResponse interface, in order to be able to apply the middlewares to an upgrade
* request.
*
* @see https://nodejs.org/api/http.html#class-httpserverresponse
*/
class WebSocketResponse {
constructor(readonly req, readonly socket: Duplex) {
// temporarily store the response headers on the req object (see the "headers" event)
req[kResponseHeaders] = {};
}

public setHeader(name: string, value: any) {
this.req[kResponseHeaders][name] = value;
}

public getHeader(name: string) {
return this.req[kResponseHeaders][name];
}

public removeHeader(name: string) {
delete this.req[kResponseHeaders][name];
}

public write() {}

public writeHead() {}

public end() {
// we could return a proper error code, but the WebSocket client will emit an "error" event anyway.
this.socket.destroy();
}
}

export class Server extends BaseServer {
public httpServer?: HttpServer;
private ws: any;
Expand All @@ -474,7 +575,8 @@ export class Server extends BaseServer {
this.ws.on("headers", (headersArray, req) => {
// note: 'ws' uses an array of headers, while Engine.IO uses an object (response.writeHead() accepts both formats)
// we could also try to parse the array and then sync the values, but that will be error-prone
const additionalHeaders = {};
const additionalHeaders = req[kResponseHeaders] || {};
delete req[kResponseHeaders];

const isInitialRequest = !req._query.sid;
if (isInitialRequest) {
Expand All @@ -483,6 +585,7 @@ export class Server extends BaseServer {

this.emit("headers", additionalHeaders, req);

debug("writing headers: %j", additionalHeaders);
Object.keys(additionalHeaders).forEach((key) => {
headersArray.push(`${key}: ${additionalHeaders[key]}`);
});
Expand Down Expand Up @@ -517,13 +620,14 @@ export class Server extends BaseServer {
/**
* Handles an Engine.IO HTTP request.
*
* @param {http.IncomingMessage} request
* @param {http.ServerResponse|http.OutgoingMessage} response
* @param {IncomingMessage} req
* @param {ServerResponse} res
* @api public
*/
public handleRequest(req, res) {
public handleRequest(req: IncomingMessage, res: ServerResponse) {
debug('handling "%s" http request "%s"', req.method, req.url);
this.prepare(req);
// @ts-ignore
req.res = res;

const callback = (errorCode, errorContext) => {
Expand All @@ -538,51 +642,62 @@ export class Server extends BaseServer {
return;
}

// @ts-ignore
if (req._query.sid) {
debug("setting new request for existing client");
// @ts-ignore
this.clients[req._query.sid].transport.onRequest(req);
} else {
const closeConnection = (errorCode, errorContext) =>
abortRequest(res, errorCode, errorContext);
// @ts-ignore
this.handshake(req._query.transport, req, closeConnection);
}
};

if (this.corsMiddleware) {
this.corsMiddleware.call(null, req, res, () => {
this.verify(req, false, callback);
});
} else {
this._applyMiddlewares(req, res, () => {
this.verify(req, false, callback);
}
});
}

/**
* Handles an Engine.IO HTTP Upgrade.
*
* @api public
*/
public handleUpgrade(req, socket, upgradeHead) {
public handleUpgrade(
req: IncomingMessage,
socket: Duplex,
upgradeHead: Buffer
) {
this.prepare(req);

this.verify(req, true, (errorCode, errorContext) => {
if (errorCode) {
this.emit("connection_error", {
req,
code: errorCode,
message: Server.errorMessages[errorCode],
context: errorContext,
});
abortUpgrade(socket, errorCode, errorContext);
return;
}
const res = new WebSocketResponse(req, socket);

const head = Buffer.from(upgradeHead);
upgradeHead = null;
this._applyMiddlewares(req, res as unknown as ServerResponse, () => {
this.verify(req, true, (errorCode, errorContext) => {
if (errorCode) {
this.emit("connection_error", {
req,
code: errorCode,
message: Server.errorMessages[errorCode],
context: errorContext,
});
abortUpgrade(socket, errorCode, errorContext);
return;
}

// delegate to ws
this.ws.handleUpgrade(req, socket, head, (websocket) => {
this.onWebSocket(req, socket, websocket);
const head = Buffer.from(upgradeHead);
upgradeHead = null;

// some middlewares (like express-session) wait for the writeHead() call to flush their headers
// see https://github.com/expressjs/session/blob/1010fadc2f071ddf2add94235d72224cf65159c6/index.js#L220-L244
res.writeHead();

// delegate to ws
this.ws.handleUpgrade(req, socket, head, (websocket) => {
this.onWebSocket(req, socket, websocket);
});
});
});
}
Expand Down
Loading

0 comments on commit 24786e7

Please sign in to comment.