Skip to content

Commit

Permalink
feat(*): support for expressjs/compression, add flushAfterWrite option
Browse files Browse the repository at this point in the history
  • Loading branch information
toverux committed Feb 11, 2018
1 parent 0bd6cd2 commit 6ba8c46
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 34 deletions.
41 changes: 16 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ interface ISseMiddlewareOptions {
* @default 5000
*/
keepAliveInterval?: number;
/**
* If you are using expressjs/compression, you MUST set this option to true.
* It will call res.flush() after each SSE messages so the partial content is compressed and reaches the client.
* Read {@link https://github.com/expressjs/compression#server-sent-events} for more.
*
* @default false
*/
flushAfterWrite: boolean;
}
```

Expand Down Expand Up @@ -231,34 +240,16 @@ app.get('/events', sse({ serializer: data => data.toString() }), yourMiddleware)

### Using Compression

If you are using a HTTP compression middleware, like [expressjs/compression](https://github.com/expressjs/compression), _expresse_ won't likely work out of the box.
If you are using a dynamic HTTP compression middleware, like [expressjs/compression](https://github.com/expressjs/compression), _expresse_ won't likely work out of the box.

This is due to the nature of compression and how compression middlewares work. For example, express' compression middleware will patch res.write and hold the content written in it until `res.end()` or an equivalent is called. Then the body compression can happen and the compressed content can be sent.

However `res.write()` must not be buffered with SSEs.
You have two main options:

<details>
<summary>Disable compression on SSE endpoints (preferred)</summary>

There are chances your SSE messages are short and won't benefit from compression. Moreover, compression is not efficient on short messages.

You have various ways to disable compression (conditional middleware, per-route compression, etc), here's an example for expressjs/compression, that allows to filter the routes that will benefit from compression:
Therefore, `res.write()` must not be buffered with SSEs. That's why ExpreSSE offers expressjs/compression support through the `flushAfterWrite` option. It **must** be set when using the compression middleware:

```typescript
app.use(compression({
level: 7,
filter(req, res) {
return !req.originalUrl.includes('/your-sse-path');
}
}));
```
</details>

<details>
<summary>Call res.flush() to send compressed events (expressjs/compression -specific)</summary>
app.use(compression());
If you want to keep compression, that's still possible, expressjs/compression provides a [`res.flush()`](https://expressjs.com/en/resources/middleware/compression.html#resflush) function that will compress and send the (partial) response content immediately (an event), instead of buffering everything that comes until the response end.

After each `res.sse.*()` call, also call `res.flush()`. That's it. Oh and add a comment that links to this paragraph.
</details>
app.get('/events', sse({ flushAfterWrite: true }), (req, res: ISseResponse) => {
res.sse.comment('Welcome! This is a compressed SSE stream.');
});
```
36 changes: 31 additions & 5 deletions src/sse_handler_middleware.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Handler } from 'express';
import { Handler, Response } from 'express';
import * as fmt from './sse_formatter';

export interface ISseMiddlewareOptions {
Expand All @@ -16,10 +16,25 @@ export interface ISseMiddlewareOptions {
* @default 5000
*/
keepAliveInterval: number;

/**
* If you are using expressjs/compression, you MUST set this option to true.
* It will call res.flush() after each SSE messages so the partial content is compressed and reaches the client.
* Read {@link https://github.com/expressjs/compression#server-sent-events} for more.
*
* @default false
*/
flushAfterWrite: boolean;
}

export const sseWrite = Symbol('@toverux/expresse#sseWrite');

export interface ISseHandlerResponse extends Response {
[sseWrite]: (chunk: any) => void;
}

export function sseHandler(options: Partial<ISseMiddlewareOptions> = {}): Handler {
const { keepAliveInterval = 5000 } = options;
const { keepAliveInterval = 5000, flushAfterWrite = false } = options;

return (req, res, next) => {
//=> Basic headers for an SSE session
Expand All @@ -32,16 +47,27 @@ export function sseHandler(options: Partial<ISseMiddlewareOptions> = {}): Handle
//=> Write immediately on the socket.
// This has the advantage to 'test' the connection: if the client can't access this resource because of
// CORS restrictions, the connection will fail instantly.
res.write(': sse-start\n');
write(': sse-start\n');

//=> Regularly send keep-alive SSE comments, clear interval on socket close
const keepAliveTimer = setInterval(() => res.write(': sse-keep-alive\n'), keepAliveInterval);
const keepAliveTimer = setInterval(() => write(': sse-keep-alive\n'), keepAliveInterval);

//=> When the connection gets closed (close=client, finish=server), stop the keep-alive timer
res.once('close', () => clearInterval(keepAliveTimer));
res.once('finish', () => clearInterval(keepAliveTimer));

//=> Done
//=> Attach the res.write wrapper function to the response for internal use
(res as ISseHandlerResponse)[sseWrite] = write;

//=> Done.
next();

/**
* An internal function to write on the response socket with respect to compression settings.
*/
function write(chunk: any) {
res.write(chunk);
flushAfterWrite && (res as any).flush();
}
};
}
10 changes: 6 additions & 4 deletions src/sse_middleware.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { compose } from 'compose-middleware';
import { Handler, NextFunction, Request, Response } from 'express';
import * as fmt from './sse_formatter';
import { ISseMiddlewareOptions, sseHandler } from './sse_handler_middleware';
import { ISseHandlerResponse, ISseMiddlewareOptions, sseHandler, sseWrite } from './sse_handler_middleware';

export interface ISseFunctions {
/**
Expand Down Expand Up @@ -57,16 +57,18 @@ export function sse(options: Partial<ISseMiddlewareOptions> = {}): Handler {
const { serializer } = options;

function middleware(req: Request, res: Response, next: NextFunction): void {
const write = (res as ISseHandlerResponse)[sseWrite];

//=> Install the sse*() functions on Express' Response
(res as ISseResponse).sse = {
data(data: fmt.SseValue, id?: string) {
res.write(fmt.message(null, data, id, serializer));
write(fmt.message(null, data, id, serializer));
},
event(event: string, data: fmt.SseValue, id?: string) {
res.write(fmt.message(event, data, id, serializer));
write(fmt.message(event, data, id, serializer));
},
comment(comment: string) {
res.write(fmt.comment(comment));
write(fmt.comment(comment));
}
};

Expand Down

0 comments on commit 6ba8c46

Please sign in to comment.