-
Notifications
You must be signed in to change notification settings - Fork 175
/
index.ts
121 lines (113 loc) · 3.47 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import type { Server } from 'node:http'
import type { Http2SecureServer, Http2Server } from 'node:http2'
import type { Hono } from 'hono'
import type { UpgradeWebSocket, WSContext } from 'hono/ws'
import type { WebSocket } from 'ws'
import { WebSocketServer } from 'ws'
import type { IncomingMessage } from 'http'
import { CloseEvent } from './events'
export interface NodeWebSocket {
upgradeWebSocket: UpgradeWebSocket
injectWebSocket(server: Server | Http2Server | Http2SecureServer): void
}
export interface NodeWebSocketInit {
app: Hono
baseUrl?: string | URL
}
/**
* Create WebSockets for Node.js
* @param init Options
* @returns NodeWebSocket
*/
export const createNodeWebSocket = (init: NodeWebSocketInit): NodeWebSocket => {
const wss = new WebSocketServer({ noServer: true })
const waiter = new Map<IncomingMessage, (ws: WebSocket) => void>()
wss.on('connection', (ws, request) => {
const waiterFn = waiter.get(request)
if (waiterFn) {
waiterFn(ws)
waiter.delete(request)
}
})
const nodeUpgradeWebSocket = (request: IncomingMessage) => {
return new Promise<WebSocket>((resolve) => {
waiter.set(request, resolve)
})
}
return {
injectWebSocket(server) {
server.on('upgrade', async (request, socket, head) => {
const url = new URL(request.url ?? '/', init.baseUrl ?? 'http://localhost')
const headers = new Headers()
for (const key in request.headers) {
const value = request.headers[key]
if (!value) {
continue
}
headers.append(key, Array.isArray(value) ? value[0] : value)
}
await init.app.request(
url,
{ headers: headers },
{ incoming: request, outgoing: undefined }
)
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request)
})
})
},
upgradeWebSocket: (createEvents) =>
async function upgradeWebSocket(c, next) {
if (c.req.header('upgrade') !== 'websocket') {
// Not websocket
await next()
return
}
;(async () => {
const events = await createEvents(c)
const ws = await nodeUpgradeWebSocket(c.env.incoming)
const ctx: WSContext = {
binaryType: 'arraybuffer',
close(code, reason) {
ws.close(code, reason)
},
protocol: ws.protocol,
raw: ws,
get readyState() {
return ws.readyState
},
send(source, opts) {
ws.send(source, {
compress: opts?.compress,
})
},
url: new URL(c.req.url),
}
events.onOpen?.(new Event('open'), ctx)
ws.on('message', (data, isBinary) => {
const datas = Array.isArray(data) ? data : [data]
for (const data of datas) {
events.onMessage?.(
new MessageEvent('message', {
data: isBinary ? data : data.toString('utf-8'),
}),
ctx
)
}
})
ws.on('close', () => {
events.onClose?.(new CloseEvent('close'), ctx)
})
ws.on('error', (error) => {
events.onError?.(
new ErrorEvent('error', {
error: error,
}),
ctx
)
})
})()
return new Response()
},
}
}