From d9f4d56a1d70b266d42cc35661d67d9b523be706 Mon Sep 17 00:00:00 2001 From: melike2d Date: Thu, 12 Aug 2021 08:18:44 -0700 Subject: [PATCH] :sparkles: remove old std websocket --- deps.ts | 1 - mod.ts | 4 +- src/cluster.ts | 5 +- src/clusternode.ts | 2 +- src/{ => connection}/connection.ts | 26 ++- src/connection/socket.ts | 261 +++++++++++++++++++++++++++++ src/node.ts | 5 +- 7 files changed, 281 insertions(+), 23 deletions(-) rename src/{ => connection}/connection.ts (95%) create mode 100644 src/connection/socket.ts diff --git a/deps.ts b/deps.ts index c9720c7..2da1073 100644 --- a/deps.ts +++ b/deps.ts @@ -1,3 +1,2 @@ export * as Lavalink from "https://deno.land/x/lavalink_types@2.0.6/mod.ts"; export * from "https://deno.land/x/event@2.0.0/mod.ts"; -export * from "https://deno.land/std@0.68.0/ws/mod.ts"; diff --git a/mod.ts b/mod.ts index 8625bde..6ff4c41 100644 --- a/mod.ts +++ b/mod.ts @@ -1,11 +1,13 @@ export * from "./src/cluster.ts"; export * from "./src/clusternode.ts"; -export * from "./src/connection.ts"; export * from "./src/player.ts"; export * from "./src/node.ts"; export * from "./src/rest.ts"; export * from "./src/routeplanner.ts"; +export * from "./src/connection/connection.ts"; +export * from "./src/connection/socket.ts"; + export * from "./src/util/functions.ts"; export * from "./src/util/nodestate.ts"; export * from "./src/util/backoff.ts"; diff --git a/src/cluster.ts b/src/cluster.ts index 74c778d..bf11ca4 100644 --- a/src/cluster.ts +++ b/src/cluster.ts @@ -1,13 +1,14 @@ -import { EventEmitter, Lavalink, WebSocketCloseEvent } from "../deps.ts"; +import { EventEmitter, Lavalink } from "../deps.ts"; import { ClusterNode } from "./clusternode.ts"; import constants from "./util/constants.ts"; import { fromSnowflake } from "./util/functions.ts"; import type { SendGatewayPayload, Snowflake } from "./node.ts"; -import type { ConnectionInfo } from "./connection.ts"; +import type { ConnectionInfo } from "./connection/connection.ts"; import type { DiscordVoiceServer, DiscordVoiceState, Player } from "./player.ts"; import type { REST } from "./rest.ts"; +import { WebSocketCloseEvent } from "https://deno.land/std@0.104.0/ws/mod.ts"; export class Cluster extends EventEmitter { readonly nodes: Map; diff --git a/src/clusternode.ts b/src/clusternode.ts index e78fd1d..34eef20 100644 --- a/src/clusternode.ts +++ b/src/clusternode.ts @@ -1,7 +1,7 @@ import { Node } from "./node.ts"; import type { Cluster } from "./cluster.ts"; -import type { ConnectionInfo } from "./connection.ts"; +import type { ConnectionInfo } from "./connection/connection.ts"; export class ClusterNode extends Node { readonly id: string; diff --git a/src/connection.ts b/src/connection/connection.ts similarity index 95% rename from src/connection.ts rename to src/connection/connection.ts index a0327c1..e00621d 100644 --- a/src/connection.ts +++ b/src/connection/connection.ts @@ -1,19 +1,13 @@ -import { NodeState } from "./util/nodestate.ts"; -import { Backoff, BackoffOptions } from "./util/backoff.ts"; -import constants from "./util/constants.ts"; -import { sleep } from "./util/functions.ts"; - -import { - WebSocket, - connectWebSocket, - WebSocketCloseEvent, - Lavalink, - isWebSocketCloseEvent, - isWebSocketPingEvent, - isWebSocketPongEvent, -} from "../deps.ts"; - -import type { Node } from "./node.ts"; +import { NodeState } from "../util/nodestate.ts"; +import { Backoff, BackoffOptions } from "../util/backoff.ts"; +import constants from "../util/constants.ts"; +import { sleep } from "../util/functions.ts"; + +import { Lavalink } from "../../deps.ts"; +import { connectWebSocket } from "./socket.ts" +import { WebSocket, WebSocketCloseEvent, isWebSocketCloseEvent, isWebSocketPingEvent, isWebSocketPongEvent } from "https://deno.land/std@0.104.0/ws/mod.ts" + +import type { Node } from "../node.ts"; export class Connection { readonly node: N; diff --git a/src/connection/socket.ts b/src/connection/socket.ts new file mode 100644 index 0000000..92c585d --- /dev/null +++ b/src/connection/socket.ts @@ -0,0 +1,261 @@ +import { deferred, Deferred } from "https://deno.land/std@0.104.0/async/deferred.ts"; +import { BufReader, BufWriter } from "https://deno.land/std@0.104.0/io/bufio.ts"; +import { OpCode, readFrame, unmask, WebSocketEvent, WebSocketFrame, WebSocketMessage, WebSocketPingEvent, WebSocketPongEvent, writeFrame, WebSocket, handshake } from "https://deno.land/std@0.104.0/ws/mod.ts"; + +export const encoder = new TextEncoder(); + +export function encode(input?: string): Uint8Array { + return encoder.encode(input); +} + +export const decoder = new TextDecoder(); + +export function decode(input?: Uint8Array): string { + return decoder.decode(input); +} + +/* i wear a mask for hours at a time */ +function createMask(): Uint8Array { + return crypto.getRandomValues(new Uint8Array(4)); +} + +/** + * Connect to given websocket endpoint url. + * Endpoint must be acceptable for URL. + */ +export async function connectWebSocket( + endpoint: string, + headers: Headers = new Headers(), +): Promise { + const url = new URL(endpoint); + const { hostname } = url; + + let conn: Deno.Conn; + if (url.protocol === "http:" || url.protocol === "ws:") { + const port = parseInt(url.port || "80"); + conn = await Deno.connect({ hostname, port }); + } else if (url.protocol === "https:" || url.protocol === "wss:") { + const port = parseInt(url.port || "443"); + conn = await Deno.connectTls({ hostname, port }); + } else { + throw new Error("ws: unsupported protocol: " + url.protocol); + } + + const bufWriter = new BufWriter(conn); + const bufReader = new BufReader(conn); + + try { + await handshake(url, headers, bufReader, bufWriter); + } catch (err) { + conn.close(); + throw err; + } + + return new WebSocketImpl({ + conn, + bufWriter, + bufReader, + mask: createMask(), + }); +} + +export class WebSocketImpl implements WebSocket { + readonly conn: Deno.Conn; + + private sendQueue: Array = []; + private _isClosed = false; + + private readonly mask?: Uint8Array; + private readonly bufReader: BufReader; + private readonly bufWriter: BufWriter; + + constructor({ + conn, + bufReader, + bufWriter, + mask, + }: { + conn: Deno.Conn; + bufReader?: BufReader; + bufWriter?: BufWriter; + mask?: Uint8Array; + }) { + this.conn = conn; + this.mask = mask; + this.bufReader = bufReader || new BufReader(conn); + this.bufWriter = bufWriter || new BufWriter(conn); + } + + get isClosed(): boolean { + return this._isClosed; + } + + async *[Symbol.asyncIterator](): AsyncIterableIterator { + let frames: WebSocketFrame[] = []; + let payloadsLength = 0; + while (!this._isClosed) { + let frame: WebSocketFrame; + try { + frame = await readFrame(this.bufReader); + } catch (_e) { + this.ensureSocketClosed(); + break; + } + + unmask(frame.payload, frame.mask); + switch (frame.opcode) { + case OpCode.TextFrame: + case OpCode.BinaryFrame: + case OpCode.Continue: + frames.push(frame); + payloadsLength += frame.payload.length; + if (frame.isLastFrame) { + const concat = new Uint8Array(payloadsLength); + let offs = 0; + for (const frame of frames) { + concat.set(frame.payload, offs); + offs += frame.payload.length; + } + if (frames[0].opcode === OpCode.TextFrame) { + // text + yield decode(concat); + } else { + // binary + yield concat; + } + frames = []; + payloadsLength = 0; + } + break; + case OpCode.Close: { + // [0x12, 0x34] -> 0x1234 + const code = (frame.payload[0] << 8) | frame.payload[1]; + const reason = decode( + frame.payload.subarray(2, frame.payload.length), + ); + await this.close(code, reason); + yield { code, reason }; + return; + } + case OpCode.Ping: + await this.enqueue({ + opcode: OpCode.Pong, + payload: frame.payload, + isLastFrame: true, + }); + yield ["ping", frame.payload] as WebSocketPingEvent; + break; + case OpCode.Pong: + yield ["pong", frame.payload] as WebSocketPongEvent; + break; + default: + } + } + } + + send(data: WebSocketMessage): Promise { + const opcode = typeof data === "string" + ? OpCode.TextFrame + : OpCode.BinaryFrame; + const payload = typeof data === "string" ? encode(data) : data; + const isLastFrame = true; + const frame = { + isLastFrame, + opcode, + payload, + mask: this.mask, + }; + return this.enqueue(frame); + } + + ping(data: WebSocketMessage = ""): Promise { + const payload = typeof data === "string" ? encode(data) : data; + const frame = { + isLastFrame: true, + opcode: OpCode.Ping, + mask: this.mask, + payload, + }; + return this.enqueue(frame); + } + + async close(code = 1000, reason?: string): Promise { + try { + const header = [code >>> 8, code & 0x00ff]; + + let payload: Uint8Array; + if (reason) { + const reasonBytes = encode(reason); + payload = new Uint8Array(2 + reasonBytes.byteLength); + payload.set(header); + payload.set(reasonBytes, 2); + } else { + payload = new Uint8Array(header); + } + + await this.enqueue({ + isLastFrame: true, + opcode: OpCode.Close, + mask: this.mask, + payload, + }); + } catch (e) { + throw e; + } finally { + this.ensureSocketClosed(); + } + } + + closeForce(): void { + this.ensureSocketClosed(); + } + + private dequeue(): void { + const [entry] = this.sendQueue; + if (!entry) return; + if (this._isClosed) return; + + const { d, frame } = entry; + writeFrame(frame, this.bufWriter) + .then(() => d.resolve()) + .catch((e) => d.reject(e)) + .finally(() => { + this.sendQueue.shift(); + this.dequeue(); + }); + } + + private enqueue(frame: WebSocketFrame): Promise { + if (this._isClosed) { + throw new Deno.errors.ConnectionReset("Socket has already been closed"); + } + const d = deferred(); + this.sendQueue.push({ d, frame }); + if (this.sendQueue.length === 1) { + this.dequeue(); + } + return d; + } + + private ensureSocketClosed(): void { + if (this.isClosed) { + return; + } + + try { + this.conn.close(); + } catch (e) { + console.error(e); + } finally { + this._isClosed = true; + const rest = this.sendQueue; + this.sendQueue = []; + rest.forEach(e => e.d.reject(new Deno.errors.ConnectionReset("Socket has already been closed"))); + } + } +} + +export interface Queued { + frame: WebSocketFrame; + d: Deferred; +} diff --git a/src/node.ts b/src/node.ts index 5d49fbd..00d453e 100644 --- a/src/node.ts +++ b/src/node.ts @@ -1,7 +1,8 @@ // deno-lint-ignore-file camelcase -import { EventEmitter, Lavalink, WebSocketCloseEvent } from "../deps.ts"; -import { Connection, ConnectionInfo } from "./connection.ts"; +import { WebSocketCloseEvent } from "https://deno.land/std@0.104.0/ws/mod.ts"; +import { EventEmitter, Lavalink } from "../deps.ts"; +import { Connection, ConnectionInfo } from "./connection/connection.ts"; import { DiscordVoiceServer, DiscordVoiceState, Player } from "./player.ts"; import { REST } from "./rest.ts";