Skip to content

Commit

Permalink
feat: add WS service
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Smart committed Dec 2, 2022
1 parent 1415b02 commit 85a7e8f
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 3 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
"@effect/io": "^0.0.28",
"@fp-ts/data": "^0.0.19",
"callbag-effect-ts": "1.0.0-alpha.8",
"isomorphic-fetch": "^3.0.0"
"isomorphic-fetch": "^3.0.0",
"ws": "^8.11.0"
},
"pnpm": {
"overrides": {
Expand Down
15 changes: 15 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions src/DiscordGateway/DiscordWS/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { RawData } from "ws"

export type Message = Discord.GatewayPayload | WS.Reconnect

export interface OpenOpts {
url?: string
version?: number
encoding?: Encoding
}

export interface Encoding {
type: "json" | "etf"
encode: (p: Discord.GatewayPayload) => string | Buffer | ArrayBuffer
decode: (p: RawData) => Discord.GatewayPayload
}

export const jsonEncoding: Encoding = {
type: "json",
encode: (p) => JSON.stringify(p),
decode: (p) => JSON.parse(p.toString("utf8")),
}

export const make = ({
url = "wss://gateway.discord.gg/",
version = 9,
encoding = jsonEncoding,
}: OpenOpts) => {
const ws = WS.make(`${url}?v=${version}&encoding=${encoding.type}`)

const source = ws.source
.tapError((e) => Log.info("DiscordWS", "ERROR", e))
.retry(Schedule.exponential(Duration.seconds(0.5)))
.map(encoding.decode)

const sink = ws.sink.map((msg: Message) =>
msg === WS.Reconnect ? msg : encoding.encode(msg),
)

return { source, sink }
}
Empty file.
86 changes: 86 additions & 0 deletions src/DiscordGateway/WS/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { asyncSink, unwrap, unwrapScope } from "callbag-effect-ts/Source"
import { ClientOptions, RawData, WebSocket } from "ws"

export const Reconnect = Symbol()
export type Reconnect = typeof Reconnect
export type Message = string | Buffer | ArrayBuffer | Reconnect

const socket = (url: string, options?: ClientOptions) =>
Effect.sync(() => new WebSocket(url, options)).acquireRelease((ws) =>
Effect.sync(() => {
ws.close()
ws.removeAllListeners()
}),
)

export class WebSocketError {
readonly _tag = "WebSocketError"
constructor(readonly reason: unknown) {}
}

export class WebSocketCloseError {
readonly _tag = "WebSocketCloseError"
constructor(readonly code: number, readonly reason: string) {}
}

const recv = (ws: WebSocket) =>
EffectSource.async<WebSocketError | WebSocketCloseError, RawData>((emit) => {
ws.on("message", (message) => emit.data(message))

ws.on("error", (cause) => {
emit.fail(new WebSocketError(cause))
})

ws.on("close", (code, reason) =>
emit.fail(new WebSocketCloseError(code, reason.toString("utf8"))),
)
})

export class WebSocketWriteError {
readonly _tag = "WebSocketWriteError"
constructor(readonly reason: Error) {}
}

const send = (ws: WebSocket, out: EffectSource<never, never, Message>) =>
pipe(
Effect.async<never, never, void>((resume) => {
if (ws.readyState & ws.OPEN) {
resume(Effect.unit())
} else {
ws.once("open", () => {
resume(Effect.unit())
})
}
}).map(() => out),
unwrap,
)
.tap((p) => Log.debug("WS", "send", p))
.tap((data) =>
Effect.async<never, WebSocketWriteError, void>((resume) => {
if (data === Reconnect) {
ws.close(1012, "reconnecting")
resume(Effect.unit())
} else {
ws.send(data, (err) => {
resume(
err ? Effect.fail(new WebSocketWriteError(err!)) : Effect.unit(),
)
})
}
}),
).drain

export const make = (url: string, options?: ClientOptions) => {
const [sink, outbound] = asyncSink<never, Message>()

const source = pipe(
socket(url, options).map((ws) => recv(ws).merge(send(ws, outbound))),
unwrapScope,
).retry(
Schedule.recurWhile(
(e) => e._tag === "WebSocketCloseError" && e.code === 1012,
),
)

return { source, sink }
}
4 changes: 2 additions & 2 deletions src/DiscordREST/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { BucketDetails } from "dfx/RateLimitStore/index"
import { millis } from "@fp-ts/data/Duration"
import { ResponseWithData, RestResponse } from "./types.js"
import { BucketDetails } from "dfx/RateLimitStore/index"
import { ResponseWithData } from "./types.js"
import { rateLimitFromHeaders, routeFromConfig } from "./utils.js"

const make = Do(($) => {
Expand Down
2 changes: 2 additions & 0 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ export type { Option as Maybe } from "@fp-ts/data/Option"
export type { Either } from "@fp-ts/data/Either"
export type { Exit } from "@effect/io/Exit"
export type { EffectSource } from "callbag-effect-ts/Source"
export type { EffectSink } from "callbag-effect-ts/Sink"

export * as Config from "dfx/DiscordConfig/index"
export * as Discord from "dfx/types"
export * as Http from "dfx/Http/index"
export * as Log from "dfx/Log/index"
export * as RateLimitStore from "dfx/RateLimitStore/index"
export * as Rest from "dfx/DiscordREST/index"
export * as WS from "dfx/DiscordGateway/WS/index"
3 changes: 3 additions & 0 deletions src/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import type {
Chunk,
Duration,
Exit,
Schedule,
EffectSource,
EffectSink,
Config,
Discord,
Http,
Log,
RateLimitStore,
Rest,
WS,
} from "dfx/common"

/**
Expand Down

0 comments on commit 85a7e8f

Please sign in to comment.