Skip to content

Commit

Permalink
feat: DiscordWSService
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Smart committed Mar 14, 2022
1 parent c16402c commit 208203b
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 42 deletions.
78 changes: 78 additions & 0 deletions DiscordWS/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import * as T from "@effect-ts/core/Effect"
import * as S from "@effect-ts/core/Effect/Experimental/Stream"
import * as SC from "@effect-ts/core/Effect/Schedule"
import { pipe } from "@effect-ts/core/Function"
import { tag } from "@effect-ts/core/Has"
import { _A } from "@effect-ts/core/Utils"
import { RawData } from "ws"
import { log } from "../Log"
import { GatewayPayload } from "../types"
import * as WS from "../WS"

export type Message = GatewayPayload | WS.Reconnect

export interface OpenOpts {
url?: string
version?: number
encoding?: Encoding
outgoing: WS.OutboundStream<Message>
}

export interface Encoding {
type: "json" | "etf"
encode: (p: GatewayPayload) => string | Buffer | ArrayBuffer
decode: (p: RawData) => GatewayPayload
}

export const jsonEncoding: Encoding = {
type: "json",
encode: (p) => JSON.stringify(p),
decode: (p) => JSON.parse(p.toString("utf8")),
}

const makeOutgoing = (s: S.UIO<Message>, e: Encoding): WS.OutboundStream =>
pipe(
s,
S.map((data) => {
if (data === WS.Reconnect) {
return data
}

return e.encode(data)
})
)

const openImpl = ({
url = "wss://gateway.discord.gg/",
version = 9,
encoding = jsonEncoding,
outgoing,
}: OpenOpts) =>
pipe(
WS.open(
`${url}?v=${version}&encoding=${encoding.type}`,
makeOutgoing(outgoing, encoding)
),
S.unwrap,
S.onError((e) =>
e._tag === "Fail" ? log(serviceTag, "error", e.value) : T.unit
),
S.retry(SC.exponential(10)),
S.map(encoding.decode)
)

export type Connection = ReturnType<typeof openImpl>

// Service definition
const serviceTag = "DiscordWSService" as const
const makeDiscordWS = T.succeed({
_tag: serviceTag,
open: openImpl,
} as const)
export interface DiscordWS extends _A<typeof makeDiscordWS> {}
export const DiscordWS = tag<DiscordWS>()
export const LiveDiscordWS = T.toLayer(DiscordWS)(makeDiscordWS)

// Helpers
export const open = (opts: OpenOpts) =>
T.accessService(DiscordWS)(({ open }) => open(opts))
18 changes: 18 additions & 0 deletions Log/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import * as T from "@effect-ts/core/Effect"
import { tag } from "@effect-ts/core/Has"
import { _A } from "@effect-ts/core/Utils"

const service = {
_tag: "LogService",
log: (...args: any[]) =>
T.succeedWith(() => {
console.error(...args)
}),
} as const

export type Log = typeof service
export const Log = tag<Log>()
export const LiveLog = T.toLayer(Log)(T.succeed(service))

export const log = (...args: any[]) =>
T.accessServiceM(Log)(({ log }) => log(...args))
45 changes: 18 additions & 27 deletions WS/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as T from "@effect-ts/core/Effect"
import * as S from "@effect-ts/core/Effect/Experimental/Stream"
import * as M from "@effect-ts/core/Effect/Managed"
import * as Q from "@effect-ts/core/Effect/Queue"
import * as SC from "@effect-ts/core/Effect/Schedule"
import { pipe } from "@effect-ts/core/Function"
import { tag } from "@effect-ts/core/Has"
Expand All @@ -14,17 +13,14 @@ export type WsError =
| { _tag: "error"; cause: unknown }
| { _tag: "write"; cause: unknown }

type WebSocketStream = S.Stream<HasClock, WsError, Ws.RawData>
export type WebSocketStream<T = Ws.RawData> = S.Stream<HasClock, WsError, T>

export const Reconnect = Symbol()
export type Message = Ws.RawData | typeof Reconnect
export type Reconnect = typeof Reconnect
export type Message = string | Buffer | ArrayBuffer | Reconnect
export type OutboundStream<T = Message> = S.UIO<T>

export interface WebSocketConnection {
read: WebSocketStream
write: Q.Queue<Message>
}

const open = (url: string, options?: Ws.ClientOptions) =>
const openSocket = (url: string, options?: Ws.ClientOptions) =>
pipe(
T.succeedWith(() => new Ws.WebSocket(url, options)),
M.makeExit((ws) =>
Expand Down Expand Up @@ -53,7 +49,7 @@ const recv = (ws: Ws.WebSocket): WebSocketStream =>
)
})

const send = (out: Q.Queue<Message>) => (ws: Ws.WebSocket) =>
const send = (out: OutboundStream) => (ws: Ws.WebSocket) =>
pipe(
T.effectAsync<unknown, WsError, void>((cb) => {
if (ws.readyState & ws.OPEN) {
Expand All @@ -64,7 +60,7 @@ const send = (out: Q.Queue<Message>) => (ws: Ws.WebSocket) =>
})
}
}),
T.map(() => S.fromQueue()(out)),
T.map(() => out),
S.unwrap,
S.tap((data) =>
T.effectAsync<unknown, WsError, void>((cb) => {
Expand All @@ -85,33 +81,21 @@ const send = (out: Q.Queue<Message>) => (ws: Ws.WebSocket) =>
S.drain
)

const duplex = (out: Q.Queue<Message>) => (ws: Ws.WebSocket) =>
const duplex = (out: OutboundStream) => (ws: Ws.WebSocket) =>
pipe(recv(ws), S.mergeTerminateLeft(send(out)(ws)))

const openDuplexWithQueue = (
const openDuplex = (
url: string,
out: Q.Queue<Message>,
out: OutboundStream,
options?: Ws.ClientOptions
): WebSocketStream =>
pipe(
open(url, options),
openSocket(url, options),
M.map(duplex(out)),
S.unwrapManaged,
S.retry(SC.recurWhile((e) => e._tag === "close" && e.code === 1012))
)

const openDuplex = (
url: string,
options?: Ws.ClientOptions
): T.UIO<WebSocketConnection> =>
pipe(
Q.makeUnbounded<Message>(),
T.map((write) => ({
read: openDuplexWithQueue(url, write, options),
write,
}))
)

const makeWS = T.succeed({
_tag: "WSService",
open: openDuplex,
Expand All @@ -120,3 +104,10 @@ const makeWS = T.succeed({
export interface WS extends _A<typeof makeWS> {}
export const WS = tag<WS>()
export const LiveWS = T.toLayer(WS)(makeWS)

// Helpers
export const open = (
url: string,
out: OutboundStream,
options?: Ws.ClientOptions
) => T.accessService(WS)(({ open }) => open(url, out, options))
26 changes: 12 additions & 14 deletions mod.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
import { Effect as T, pipe } from "@effect-ts/core"
import * as S from "@effect-ts/core/Effect/Experimental/Stream"
import { exponential } from "@effect-ts/core/Effect/Schedule"
import * as Q from "@effect-ts/core/Effect/Queue"
import * as R from "@effect-ts/node/Runtime"
import * as DiscordWS from "./DiscordWS"
import { LiveLog, log } from "./Log"
import * as WS from "./WS"

pipe(
T.accessServiceM(WS.WS)(({ open }) =>
open("wss://gateway.discord.gg/?v=9&encoding=json")
),
T.chain(({ read, write }) =>
pipe(
read,
S.retry(exponential(10)),
S.forEach((data) =>
T.succeedWith(() => {
console.error(data.toString("utf8"))
})
)
)
Q.makeUnbounded<DiscordWS.Message>(),
T.map(S.fromQueue()),
T.chain((outgoing) =>
T.accessService(DiscordWS.DiscordWS)(({ open }) => open({ outgoing }))
),
T.chain(S.forEach((payload) => log(payload))),

T.provideSomeLayer(LiveLog),
T.provideSomeLayer(WS.LiveWS),
T.provideSomeLayer(DiscordWS.LiveDiscordWS),

R.runMain
)
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"scripts": {
"prepare": "ttsc",
"types": "discord-api-codegen ./discord-api-docs -l typescript -o 'imports=Response|./DiscordREST/types' 'endpointReturnType=Response' > types.ts && prettier -w types.ts",
"clean": "git clean -fxd -e node_modules/ -e .env"
"clean": "git clean -fxd -e node_modules/ -e .env",
"dev": "ts-node -C ttypescript mod.ts"
},
"files": [
"**/*.ts",
Expand All @@ -21,6 +22,7 @@
"@tim-smart/discord-api-docs-parser": "^0.3.0",
"@types/ws": "^8.5.3",
"prettier": "^2.5.1",
"ts-node": "^10.7.0",
"ttypescript": "^1.5.13",
"typescript": "^4.6.2"
},
Expand Down
Loading

0 comments on commit 208203b

Please sign in to comment.