Skip to content

Commit 123b68c

Browse files
feat: add support for WebTransport
Reference: https://developer.mozilla.org/en-US/docs/Web/API/WebTransport
1 parent 3144d27 commit 123b68c

File tree

11 files changed

+1853
-35
lines changed

11 files changed

+1853
-35
lines changed

.github/workflows/ci.yml

-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ jobs:
1717
strategy:
1818
matrix:
1919
node-version:
20-
- 10
2120
- 18
2221

2322
steps:

lib/server.ts

+105-3
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ import type {
1515
import type { CookieSerializeOptions } from "cookie";
1616
import type { CorsOptions, CorsOptionsDelegate } from "cors";
1717
import type { Duplex } from "stream";
18+
import { WebTransport } from "./transports/webtransport";
1819

1920
const debug = debugModule("engine");
2021

2122
const kResponseHeaders = Symbol("responseHeaders");
23+
const TEXT_DECODER = new TextDecoder();
2224

2325
type Transport = "polling" | "websocket";
2426

@@ -78,7 +80,13 @@ export interface ServerOptions {
7880
fn: (err: string | null | undefined, success: boolean) => void
7981
) => void;
8082
/**
81-
* the low-level transports that are enabled
83+
* The low-level transports that are enabled. WebTransport is disabled by default and must be manually enabled:
84+
*
85+
* @example
86+
* new Server({
87+
* transports: ["polling", "websocket", "webtransport"]
88+
* });
89+
*
8290
* @default ["polling", "websocket"]
8391
*/
8492
transports?: Transport[];
@@ -140,6 +148,17 @@ type Middleware = (
140148
next: (err?: any) => void
141149
) => void;
142150

151+
function parseSessionId(handshake: string) {
152+
if (handshake.startsWith("0{")) {
153+
try {
154+
const parsed = JSON.parse(handshake.substring(1));
155+
if (typeof parsed.sid === "string") {
156+
return parsed.sid;
157+
}
158+
} catch (e) {}
159+
}
160+
}
161+
143162
export abstract class BaseServer extends EventEmitter {
144163
public opts: ServerOptions;
145164

@@ -166,7 +185,7 @@ export abstract class BaseServer extends EventEmitter {
166185
pingInterval: 25000,
167186
upgradeTimeout: 10000,
168187
maxHttpBufferSize: 1e6,
169-
transports: Object.keys(transports),
188+
transports: ["polling", "websocket"], // WebTransport is disabled by default
170189
allowUpgrades: true,
171190
httpCompression: {
172191
threshold: 1024,
@@ -245,7 +264,11 @@ export abstract class BaseServer extends EventEmitter {
245264
protected verify(req, upgrade, fn) {
246265
// transport check
247266
const transport = req._query.transport;
248-
if (!~this.opts.transports.indexOf(transport)) {
267+
// WebTransport does not go through the verify() method, see the onWebTransportSession() method
268+
if (
269+
!~this.opts.transports.indexOf(transport) ||
270+
transport === "webtransport"
271+
) {
249272
debug('unknown transport "%s"', transport);
250273
return fn(Server.errors.UNKNOWN_TRANSPORT, { transport });
251274
}
@@ -495,6 +518,85 @@ export abstract class BaseServer extends EventEmitter {
495518
return transport;
496519
}
497520

521+
public async onWebTransportSession(session: any) {
522+
const timeout = setTimeout(() => {
523+
debug(
524+
"the client failed to establish a bidirectional stream in the given period"
525+
);
526+
session.close();
527+
}, this.opts.upgradeTimeout);
528+
529+
const streamReader = session.incomingBidirectionalStreams.getReader();
530+
const result = await streamReader.read();
531+
532+
if (result.done) {
533+
debug("session is closed");
534+
return;
535+
}
536+
537+
const stream = result.value;
538+
const reader = stream.readable.getReader();
539+
540+
// reading the first packet of the stream
541+
const { value, done } = await reader.read();
542+
if (done) {
543+
debug("stream is closed");
544+
return;
545+
}
546+
547+
clearTimeout(timeout);
548+
const handshake = TEXT_DECODER.decode(value);
549+
550+
// handshake is either
551+
// "0" => new session
552+
// '0{"sid":"xxxx"}' => upgrade
553+
if (handshake === "0") {
554+
const transport = new WebTransport(session, stream, reader);
555+
556+
// note: we cannot use "this.generateId()", because there is no "req" argument
557+
const id = base64id.generateId();
558+
debug('handshaking client "%s" (WebTransport)', id);
559+
560+
const socket = new Socket(id, this, transport, null, 4);
561+
562+
this.clients[id] = socket;
563+
this.clientsCount++;
564+
565+
socket.once("close", () => {
566+
delete this.clients[id];
567+
this.clientsCount--;
568+
});
569+
570+
this.emit("connection", socket);
571+
return;
572+
}
573+
574+
const sid = parseSessionId(handshake);
575+
576+
if (!sid) {
577+
debug("invalid WebTransport handshake");
578+
return session.close();
579+
}
580+
581+
const client = this.clients[sid];
582+
583+
if (!client) {
584+
debug("upgrade attempt for closed client");
585+
session.close();
586+
} else if (client.upgrading) {
587+
debug("transport has already been trying to upgrade");
588+
session.close();
589+
} else if (client.upgraded) {
590+
debug("transport had already been upgraded");
591+
session.close();
592+
} else {
593+
debug("upgrading existing transport");
594+
595+
const transport = new WebTransport(session, stream, reader);
596+
client.maybeUpgrade(transport);
597+
}
598+
}
599+
498600
protected abstract createTransport(transportName, req);
499601

500602
/**

lib/socket.ts

+8-3
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,15 @@ export class Socket extends EventEmitter {
7070
this.protocol = protocol;
7171

7272
// Cache IP since it might not be in the req later
73-
if (req.websocket && req.websocket._socket) {
74-
this.remoteAddress = req.websocket._socket.remoteAddress;
73+
if (req) {
74+
if (req.websocket && req.websocket._socket) {
75+
this.remoteAddress = req.websocket._socket.remoteAddress;
76+
} else {
77+
this.remoteAddress = req.connection.remoteAddress;
78+
}
7579
} else {
76-
this.remoteAddress = req.connection.remoteAddress;
80+
// TODO there is currently no way to get the IP address of the client when it connects with WebTransport
81+
// see https://github.com/fails-components/webtransport/issues/114
7782
}
7883

7984
this.checkIntervalTimer = null;

lib/transport.ts

+18
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,26 @@ export abstract class Transport extends EventEmitter {
136136
this.emit("close");
137137
}
138138

139+
/**
140+
* Advertise framing support.
141+
*/
139142
abstract get supportsFraming();
143+
144+
/**
145+
* The name of the transport.
146+
*/
140147
abstract get name();
148+
149+
/**
150+
* Sends an array of packets.
151+
*
152+
* @param {Array} packets
153+
* @package
154+
*/
141155
abstract send(packets);
156+
157+
/**
158+
* Closes the transport.
159+
*/
142160
abstract doClose(fn?);
143161
}

lib/transports/index.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import { Polling as XHR } from "./polling";
22
import { JSONP } from "./polling-jsonp";
33
import { WebSocket } from "./websocket";
4+
import { WebTransport } from "./webtransport";
45

56
export default {
67
polling: polling,
78
websocket: WebSocket,
9+
webtransport: WebTransport,
810
};
911

1012
/**
@@ -21,4 +23,4 @@ function polling(req) {
2123
}
2224
}
2325

24-
polling.upgradesTo = ["websocket"];
26+
polling.upgradesTo = ["websocket", "webtransport"];

lib/transports/webtransport.ts

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { Transport } from "../transport";
2+
import debugModule from "debug";
3+
4+
const debug = debugModule("engine:webtransport");
5+
6+
const BINARY_HEADER = Buffer.of(54);
7+
8+
function shouldIncludeBinaryHeader(packet, encoded) {
9+
// 48 === "0".charCodeAt(0) (OPEN packet type)
10+
// 54 === "6".charCodeAt(0) (NOOP packet type)
11+
return (
12+
packet.type === "message" &&
13+
typeof packet.data !== "string" &&
14+
encoded[0] >= 48 &&
15+
encoded[0] <= 54
16+
);
17+
}
18+
19+
/**
20+
* Reference: https://developer.mozilla.org/en-US/docs/Web/API/WebTransport_API
21+
*/
22+
export class WebTransport extends Transport {
23+
private readonly writer;
24+
25+
constructor(private readonly session, stream, reader) {
26+
super({ _query: { EIO: "4" } });
27+
this.writer = stream.writable.getWriter();
28+
(async () => {
29+
let binaryFlag = false;
30+
while (true) {
31+
const { value, done } = await reader.read();
32+
if (done) {
33+
debug("session is closed");
34+
break;
35+
}
36+
debug("received chunk: %o", value);
37+
if (!binaryFlag && value.byteLength === 1 && value[0] === 54) {
38+
binaryFlag = true;
39+
continue;
40+
}
41+
this.onPacket(
42+
this.parser.decodePacketFromBinary(value, binaryFlag, "nodebuffer")
43+
);
44+
binaryFlag = false;
45+
}
46+
})();
47+
48+
session.closed.then(() => this.onClose());
49+
50+
this.writable = true;
51+
}
52+
53+
get name() {
54+
return "webtransport";
55+
}
56+
57+
get supportsFraming() {
58+
return true;
59+
}
60+
61+
send(packets) {
62+
this.writable = false;
63+
64+
for (let i = 0; i < packets.length; i++) {
65+
const packet = packets[i];
66+
const isLast = i + 1 === packets.length;
67+
68+
this.parser.encodePacketToBinary(packet, (data) => {
69+
if (shouldIncludeBinaryHeader(packet, data)) {
70+
debug("writing binary header");
71+
this.writer.write(BINARY_HEADER);
72+
}
73+
debug("writing chunk: %o", data);
74+
this.writer.write(data);
75+
if (isLast) {
76+
this.writable = true;
77+
this.emit("drain");
78+
}
79+
});
80+
}
81+
}
82+
83+
doClose(fn) {
84+
debug("closing WebTransport session");
85+
this.session.close();
86+
fn && fn();
87+
}
88+
}

0 commit comments

Comments
 (0)