Skip to content
This repository has been archived by the owner on Sep 14, 2023. It is now read-only.

feat: add connection retry to proxyProvider #492

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/std/async.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "https://deno.land/std@0.127.0/async/mod.ts"
export * from "https://deno.land/std@0.170.0/async/mod.ts"
15 changes: 12 additions & 3 deletions examples/derived.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import * as C from "http://localhost:5646/@local/mod.ts"
import * as U from "http://localhost:5646/@local/util/mod.ts"

const ids = C.entryRead(C.polkadot)("Paras", "Parachains", [])
.access("value")
.as<number[]>()
// TODO: uncomment these lines to run a single effect upon solving `count` in zones
// or effects/rpc.ts#discardCheck
// const ids = C.entryRead(C.polkadot)("Paras", "Parachains", [])
// .access("value")
// .as<number[]>()

const ids = U.throwIfError(
await C.entryRead(C.polkadot)("Paras", "Parachains", [])
.access("value")
.as<number[]>()
.run(),
)
Comment on lines +10 to +15
Copy link
Contributor Author

@kratico kratico Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-wrote this because there is an issue with the client reference counting.
After

 const ids = C.entryRead(C.polkadot)("Paras", "Parachains", [])
   .access("value")
   .as<number[]>()

The client reference count reaches 1 and it's discarded in

capi/effects/rpc.ts

Lines 93 to 102 in 7edf6a2

async function discardCheck<CloseErrorData>(
client: rpc.Client<any, any, any, CloseErrorData>,
counter: Z.RcCounter,
) {
counter.i--
if (!counter.i) {
return await client.discard()
}
return
}

The above invokes the proxyProvider.release that removes the WebSocket listener.
As a result the next call C.Z.each(...) successfully creates a new WebSocket but there is not listener for the incoming messages.
Note: The listener is placed by the rpcClient effect.


const root = C.Z.each(ids, (id) => {
return C.entryRead(C.polkadot)("Paras", "Heads", [id])
Expand Down
51 changes: 51 additions & 0 deletions rpc/provider/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,54 @@ export function nextIdFactory() {
let i = 0
return () => i++
}

export class ListenersContainer<
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent of this class is to replace the ProviderConnection class and use it in the smoldot provider too.
The ProviderConnection class has the inner and cleanUp props, but they are not used by the class methods, so it seems that they don't belong to this class.
After removing these props from ProviderConnection, this class has the responsibility of containing listeners for a single WebSocket connection.
The ListenersContainer is an improvement on the above that can contain listeners for many WebSocket connections.

DiscoveryValue,
SendErrorData,
HandlerErrorData,
> {
#listeners = new Map<
DiscoveryValue,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious how this will be generalized for smoldot (perhaps I recall incorrectly: the discovery value must first be retrieved based on the chainspec, right?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DiscoveryValue is not a good name.
This is just a key type for Map so it doesn't always need to be the provider discoveryValue

For smoldot, it's a bit challenging to pick a key for parachains.
To connect to a parachain, we need the potential relay chain spec.
So, a better key could be the concatenation of the relay+parachain specs.
Currently the key is an object which can easily have a different object ref for the same values

{
  chainSpec: {
    relay: string
    para?: string
  }
}

Map<
ProviderListener<SendErrorData, HandlerErrorData>,
ProviderListener<SendErrorData, HandlerErrorData>
>
>()

set(discoveryValue: DiscoveryValue, listener: ProviderListener<SendErrorData, HandlerErrorData>) {
const map = U.getOrInit(this.#listeners, discoveryValue, () =>
new Map<
ProviderListener<SendErrorData, HandlerErrorData>,
ProviderListener<SendErrorData, HandlerErrorData>
>())
if (map.has(listener)) return
map.set(
listener,
listener.bind({
stop: () => map!.delete(listener),
}),
)
}

delete(
discoveryValue: DiscoveryValue,
listener: ProviderListener<SendErrorData, HandlerErrorData>,
) {
this.#listeners.get(discoveryValue)?.delete(listener)
}

count(discoveryValue: DiscoveryValue) {
return this.#listeners.get(discoveryValue)?.size ?? 0
}

forEachListener(
discoveryValue: DiscoveryValue,
message: Parameters<ProviderListener<SendErrorData, HandlerErrorData>>[0],
) {
const map = this.#listeners.get(discoveryValue)
if (!map) return
for (const listener of map.values()) {
listener(message)
}
}
}
20 changes: 15 additions & 5 deletions rpc/provider/proxy.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
import * as A from "../../deps/std/testing/asserts.ts"
import * as T from "../../test_util/mod.ts"
import { proxyProvider } from "./proxy.ts"
import { proxyProviderFactory } from "./proxy.ts"
import { setup } from "./test_util.ts"

Deno.test({
name: "Proxy Provider",
sanitizeResources: false,
sanitizeOps: false,
async fn(t) {
await t.step({
name: "send/listen",
// TODO: await T.polkadot initializes a 2nd proxyProvider
sanitizeResources: false,
sanitizeOps: false,
async fn() {
const [ref, message] = await setup(proxyProvider, await T.polkadot.url, "system_health", [])
const [ref, message] = await setup(
proxyProviderFactory(),
await T.polkadot.url,
"system_health",
[],
)
A.assertNotInstanceOf(message, Error)
A.assertExists(message.result)
A.assertNotInstanceOf(await ref.release(), Error)
Expand All @@ -20,7 +30,7 @@ Deno.test({
name: "create WebSocket error",
async fn() {
const [ref, message] = await setup(
proxyProvider,
proxyProviderFactory({ retryOptions: { maxAttempts: 1 } }),
"invalid-endpoint-url",
"system_health",
[],
Expand All @@ -37,7 +47,7 @@ Deno.test({
this.close()
})
const [ref, message] = await setup(
proxyProvider,
proxyProviderFactory(),
server.url,
"system_health",
[],
Expand All @@ -53,7 +63,7 @@ Deno.test({
async fn() {
const server = createWebSocketServer()
const [ref, message] = await setup(
proxyProvider,
proxyProviderFactory(),
server.url,
"system_health",
// make JSON.stringify to throw
Expand Down
191 changes: 111 additions & 80 deletions rpc/provider/proxy.ts
Original file line number Diff line number Diff line change
@@ -1,95 +1,126 @@
import * as U from "../../util/mod.ts"
import { retry, RetryOptions } from "../../deps/std/async.ts"
import * as msg from "../messages.ts"
import { nextIdFactory, Provider, ProviderConnection, ProviderListener } from "./base.ts"
import { ListenersContainer, nextIdFactory, Provider, ProviderListener } from "./base.ts"
import { ProviderCloseError, ProviderHandlerError, ProviderSendError } from "./errors.ts"

/** Global lookup of existing connections */
const connections = new Map<string, ProxyProviderConnection>()
type ProxyProviderConnection = ProviderConnection<WebSocket, Event, Event>
const CUSTOM_WS_CLOSE_CODE = 4000
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status codes in the range 4000-4999 are reserved for private use (see https://www.rfc-editor.org/rfc/rfc6455.html#section-7.4.1)

This code is used to signal a graceful client WebSocket close.


const nextId = nextIdFactory()
export interface ProxyProviderFactoryProps {
retryOptions?: RetryOptions
}

export const proxyProvider: Provider<string, Event, Event, Event> = (url, listener) => {
return {
nextId,
send: (message) => {
let conn
try {
conn = connection(url, listener)
} catch (error) {
listener(new ProviderHandlerError(error as Event))
return
}
;(async () => {
const openError = await ensureWsOpen(conn.inner)
if (openError) {
conn.forEachListener(new ProviderSendError(openError, message))
return
}
try {
conn.inner.send(JSON.stringify(message))
} catch (error) {
listener(new ProviderSendError(error as Event, message))
export const proxyProviderFactory = (
{ retryOptions }: ProxyProviderFactoryProps = {},
): Provider<string, Event, Event, Event> => {
const listenersContainer = new ListenersContainer<string, Event, Event>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be within the module scope? If not, multiple providers could house websockets of identical discovery values (correct?). On the flip side, the current approach means we don't need to track the number of "users" of a globally accessible ws instance (to decide whether or not it can actually be closed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how the factory is used.
Calling the factory once and sharing the created provider works similar to not having a factory and using a module variable to cache stuff.
With the factory approach there is an improvement for concurrent unit tests and fixes some unit tests caching issues.

const activeWs = new Map<string, WebSocket>()
const connectingWs = new Map<string, Promise<WebSocket>>()
return (url, listener) => {
listenersContainer.set(url, listener)
let ws: WebSocket | undefined
return {
nextId: nextIdFactory(),
send: (message) => {
;(async () => {
try {
ws = await openedWs({
url,
activeWs,
connectingWs,
retryOptions,
listener: (e) => listenersContainer.forEachListener(url, e),
})
} catch (error) {
return listener(new ProviderHandlerError(error as Event))
}
try {
ws.send(JSON.stringify(message))
} catch (error) {
listener(new ProviderSendError(error as Event, message))
}
})()
},
release: () => {
listenersContainer.delete(url, listener)
if (!listenersContainer.count(url) && ws) {
return closeWs(ws)
}
})()
},
release: () => {
const conn = connections.get(url)
if (!conn) {
return Promise.resolve(undefined)
}
const { cleanUp, listeners, inner } = conn
listeners.delete(listener)
if (!listeners.size) {
connections.delete(url)
cleanUp()
return closeWs(inner)
}
return Promise.resolve(undefined)
},
},
}
}
}

function connection(
url: string,
listener: ProviderListener<Event, Event>,
): ProxyProviderConnection {
const conn = U.getOrInit(connections, url, () => {
const controller = new AbortController()
const ws = new WebSocket(url)
ws.addEventListener("message", (e) => {
conn!.forEachListener(msg.parse(e.data))
}, controller)
ws.addEventListener("error", (e) => {
conn!.forEachListener(new ProviderHandlerError(e))
}, controller)
ws.addEventListener("close", (e) => {
conn!.forEachListener(new ProviderHandlerError(e))
}, controller)
return new ProviderConnection(ws, () => controller.abort())
})
conn.addListener(listener)
return conn
export const proxyProvider = proxyProviderFactory()

interface OpenedWsProps {
url: string
activeWs: Map<string, WebSocket>
connectingWs: Map<string, Promise<WebSocket>>
retryOptions?: RetryOptions
listener: ProviderListener<Event, Event>
}

function ensureWsOpen(ws: WebSocket): Promise<undefined | Event> {
if (ws.readyState === WebSocket.OPEN) {
return Promise.resolve(undefined)
} else if (ws.readyState === WebSocket.CLOSING || ws.readyState === WebSocket.CLOSED) {
return Promise.resolve(new Event("error"))
} else {
return new Promise<undefined | Event>((resolve) => {
const controller = new AbortController()
ws.addEventListener("open", () => {
controller.abort()
resolve(undefined)
}, controller)
ws.addEventListener("error", (e) => {
controller.abort()
resolve(e)
}, controller)
function openedWs(
{ url, activeWs, connectingWs, listener, retryOptions }: OpenedWsProps,
): Promise<WebSocket> {
return retry(() => {
const activeWsValue = activeWs.get(url)
if (activeWsValue) return Promise.resolve(activeWsValue)
const connectingWsValue = connectingWs.get(url)
if (connectingWsValue) return connectingWsValue
const openedWs = new Promise<WebSocket>((resolve, reject) => {
const connectingWsController = new AbortController()
const ws = new WebSocket(url)
ws.addEventListener(
"open",
() => {
connectingWsController.abort()
connectingWs.delete(url)
activeWs.set(url, ws)
const activeWsController = new AbortController()
ws.addEventListener(
"message",
(e) => listener(msg.parse(e.data)),
activeWsController,
)
ws.addEventListener(
"error",
(e) => {
activeWs.delete(url)
listener(new ProviderHandlerError(e))
},
activeWsController,
)
ws.addEventListener(
"close",
(e) => {
activeWs.delete(url)
activeWsController.abort()
if (e.code !== CUSTOM_WS_CLOSE_CODE) {
listener(new ProviderHandlerError(e))
}
},
activeWsController,
)
resolve(ws)
},
connectingWsController,
)
ws.addEventListener(
"close",
(e) => {
connectingWsController.abort()
connectingWs.delete(url)
activeWs.delete(url)
reject(e)
},
connectingWsController,
)
})
}
connectingWs.set(url, openedWs)
return openedWs
}, retryOptions)
}

function closeWs(socket: WebSocket): Promise<undefined | ProviderCloseError<Event>> {
Expand All @@ -106,6 +137,6 @@ function closeWs(socket: WebSocket): Promise<undefined | ProviderCloseError<Even
controller.abort()
resolve(new ProviderCloseError(e))
}, controller)
socket.close()
socket.close(CUSTOM_WS_CLOSE_CODE, "Client normal closure")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where will devs see the "Client normal closure" message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

})
}