Skip to content
This repository has been archived by the owner on Sep 14, 2023. It is now read-only.

Commit

Permalink
feat: add listener container to proxyProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
kratico committed Jan 4, 2023
1 parent 0a3b239 commit 044b8a5
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 114 deletions.
51 changes: 51 additions & 0 deletions rpc/provider/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,54 @@ export function nextIdFactory() {
let i = 0
return () => i++
}

export class ListenersContainer<
DiscoveryValue,
SendErrorData,
HandlerErrorData,
> {
#listeners = new Map<
DiscoveryValue,
Map<
ProviderListener<SendErrorData, HandlerErrorData>,
ProviderListener<SendErrorData, HandlerErrorData>
>
>()

set(discoveryValue: DiscoveryValue, listener: ProviderListener<SendErrorData, HandlerErrorData>) {
let map = this.#listeners.get(discoveryValue)
if (!map) {
map = new Map()
this.#listeners.set(discoveryValue, map)
}
if (map.has(listener)) return
map.set(
listener,
listener.bind({
stop: () => map!.delete(listener),
}),
)
}

delete(
discoveryValue: DiscoveryValue,
listener: ProviderListener<SendErrorData, HandlerErrorData>,
) {
this.#listeners.get(discoveryValue)?.delete(listener)
}

count(discoveryValue: DiscoveryValue) {
return this.#listeners.get(discoveryValue)?.size ?? 0
}

forEachListener(
discoveryValue: DiscoveryValue,
message: Parameters<ProviderListener<SendErrorData, HandlerErrorData>>[0],
) {
const map = this.#listeners.get(discoveryValue)
if (!map) return
for (const listener of map.values()) {
listener(message)
}
}
}
7 changes: 5 additions & 2 deletions rpc/provider/proxy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ import { setup } from "./test_util.ts"

Deno.test({
name: "Proxy Provider",
sanitizeResources: false,
sanitizeOps: false,
async fn(t) {
await t.step({
name: "send/listen",
// TODO: await T.polkadot initializes a 2nd proxyProvider
sanitizeResources: false,
sanitizeOps: false,
async fn() {
const [ref, message] = await setup(proxyProvider, await T.polkadot.url, "system_health", [])
A.assertNotInstanceOf(message, Error)
Expand All @@ -18,7 +23,6 @@ Deno.test({

await t.step({
name: "create WebSocket error",
ignore: true,
async fn() {
const [ref, message] = await setup(
proxyProvider,
Expand All @@ -33,7 +37,6 @@ Deno.test({

await t.step({
name: "close WebSocket while listening",
ignore: true,
async fn() {
const server = createWebSocketServer(function() {
this.close()
Expand Down
184 changes: 72 additions & 112 deletions rpc/provider/proxy.ts
Original file line number Diff line number Diff line change
@@ -1,147 +1,107 @@
import { retry } from "../../deps/std/async.ts"
import { nextIdFactory, Provider, ProviderListener } from "./base.ts"
import * as msg from "../messages.ts"
import { ListenersContainer, nextIdFactory, Provider, ProviderListener } from "./base.ts"
import { ProviderCloseError, ProviderHandlerError, ProviderSendError } from "./errors.ts"

/** Global lookup of existing connections */
// const connections = new Map<string, ProxyProviderConnection>()
// type ProxyProviderConnection = ProviderConnection<WebSocket, Event, Event>

const nextId = nextIdFactory()

const listeners = new Map<
string,
Map<ProviderListener<Event, Event>, ProviderListener<Event, Event>>
>()

function setListener(url: string, listener: ProviderListener<Event, Event>) {
if (!listeners.has(url)) {
listeners.set(url, new Map())
}
const map = listeners.get(url)!
if (map.has(listener)) return
map.set(
listener,
listener.bind({
stop: () => deleteListener(url, listener),
}),
)
}

function deleteListener(url: string, listener: ProviderListener<Event, Event>) {
if (!listeners.has(url)) return
const map = listeners.get(url)!
map.delete(listener)
}

function callListener(url: string, message: Parameters<ProviderListener<Event, Event>>[0]) {
if (!listeners.has(url)) return
for (const listener of listeners.get(url)!.values()) {
listener(message)
}
}
const listenersContainer = new ListenersContainer<string, Event, Event>()
const activeWs = new Map<string, WebSocket>()
const connectingWs = new Map<string, Promise<WebSocket>>()
const CUSTOM_WS_CLOSE_CODE = 4000

export const proxyProvider: Provider<string, Event, Event, Event> = (url, listener) => {
setListener(url, listener)
let ws: WebSocket
listenersContainer.set(url, listener)
let ws: WebSocket | undefined
return {
nextId,
send: (message) => {
;(async () => {
try {
ws = await openWsWithRetry(url, listener)
ws = await openedWs(url, (e) => listenersContainer.forEachListener(url, e))
} catch (error) {
return callListener(url, new ProviderHandlerError(error as Event))
return listener(new ProviderHandlerError(error as Event))
}
try {
ws.send(JSON.stringify(message))
} catch (error) {
callListener(url, new ProviderSendError(error as Event, message))
listener(new ProviderSendError(error as Event, message))
}
})()
},
release: () => {
deleteListener(url, listener)
if (ws) {
listenersContainer.delete(url, listener)
if (!listenersContainer.count(url) && ws) {
return closeWs(ws)
}
return Promise.resolve(undefined)
},
}
}

const activeWs = new Map<string, WebSocket>()
const connectingWs = new Map<string, Promise<WebSocket>>()

function openWsWithRetry(
function openedWs(
url: string,
listener: ProviderListener<Event, Event>,
) {
return retry(() => openWs(url, listener), { maxAttempts: 10 })
}

function openWs(
url: string,
listener: ProviderListener<Event, Event>,
) {
if (activeWs.has(url)) {
return Promise.resolve(activeWs.get(url)!)
}
if (connectingWs.has(url)) {
return connectingWs.get(url)!
}

const openedWs = new Promise<WebSocket>((resolve, reject) => {
const connectingWsController = new AbortController()
const ws = new WebSocket(url)
ws.addEventListener(
"open",
() => {
connectingWsController.abort()
connectingWs.delete(url)
activeWs.set(url, ws)

const activeWsController = new AbortController()
ws.addEventListener(
"message",
(e) => listener(JSON.parse(e.data)),
activeWsController,
)
ws.addEventListener(
"error",
(e) => {
activeWs.delete(url)
listener(new ProviderHandlerError(e))
},
activeWsController,
)
ws.addEventListener(
"close",
(e) => {
activeWs.delete(url)
activeWsController.abort()
if (!e.wasClean) {
return retry(() => {
if (activeWs.has(url)) {
return Promise.resolve(activeWs.get(url)!)
}
if (connectingWs.has(url)) {
return connectingWs.get(url)!
}
const openedWs = new Promise<WebSocket>((resolve, reject) => {
const connectingWsController = new AbortController()
const ws = new WebSocket(url)
ws.addEventListener(
"open",
() => {
connectingWsController.abort()
connectingWs.delete(url)
activeWs.set(url, ws)
const activeWsController = new AbortController()
ws.addEventListener(
"message",
(e) => listener(msg.parse(e.data)),
activeWsController,
)
ws.addEventListener(
"error",
(e) => {
activeWs.delete(url)
listener(new ProviderHandlerError(e))
}
},
activeWsController,
)
resolve(ws)
},
connectingWsController,
)
ws.addEventListener(
"close",
(e) => {
connectingWsController.abort()
connectingWs.delete(url)
activeWs.delete(url)
reject(e)
},
connectingWsController,
)
},
activeWsController,
)
ws.addEventListener(
"close",
(e) => {
activeWs.delete(url)
activeWsController.abort()
if (e.code !== CUSTOM_WS_CLOSE_CODE) {
listener(new ProviderHandlerError(e))
}
},
activeWsController,
)
resolve(ws)
},
connectingWsController,
)
ws.addEventListener(
"close",
(e) => {
connectingWsController.abort()
connectingWs.delete(url)
activeWs.delete(url)
reject(e)
},
connectingWsController,
)
})
connectingWs.set(url, openedWs)
return openedWs
})
connectingWs.set(url, openedWs)
return openedWs
}

function closeWs(socket: WebSocket): Promise<undefined | ProviderCloseError<Event>> {
Expand All @@ -158,6 +118,6 @@ function closeWs(socket: WebSocket): Promise<undefined | ProviderCloseError<Even
controller.abort()
resolve(new ProviderCloseError(e))
}, controller)
socket.close()
socket.close(CUSTOM_WS_CLOSE_CODE, "Client normal closure")
})
}

0 comments on commit 044b8a5

Please sign in to comment.