From 921e802de579b242c526ecaee1f8cb4ea38635f6 Mon Sep 17 00:00:00 2001 From: Wei Ouyang Date: Sat, 22 Jun 2024 19:51:04 +0200 Subject: [PATCH 1/3] support setupLocalClient --- javascript/package-lock.json | 2 +- javascript/package.json | 2 +- javascript/src/hypha/rpc.js | 1 - javascript/src/hypha/websocket-client.js | 219 +++++++++++++++++++++++ python/imjoy_rpc/VERSION | 2 +- 5 files changed, 222 insertions(+), 4 deletions(-) diff --git a/javascript/package-lock.json b/javascript/package-lock.json index 96c53a50..2ed64213 100644 --- a/javascript/package-lock.json +++ b/javascript/package-lock.json @@ -1,6 +1,6 @@ { "name": "imjoy-rpc", - "version": "0.5.50", + "version": "0.5.51", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/javascript/package.json b/javascript/package.json index a48957e5..b3296530 100644 --- a/javascript/package.json +++ b/javascript/package.json @@ -1,6 +1,6 @@ { "name": "imjoy-rpc", - "version": "0.5.50", + "version": "0.5.51", "description": "Remote procedure calls for ImJoy.", "module": "index.js", "types": "index.d.ts", diff --git a/javascript/src/hypha/rpc.js b/javascript/src/hypha/rpc.js index e77f904a..88c95767 100644 --- a/javascript/src/hypha/rpc.js +++ b/javascript/src/hypha/rpc.js @@ -839,7 +839,6 @@ export class RPC extends MessageEmitter { let clear_after_called = true; for (let arg of args) { if (typeof arg === "object" && arg._rintf === true) { - debugger; clear_after_called = false; break; } diff --git a/javascript/src/hypha/websocket-client.js b/javascript/src/hypha/websocket-client.js index 2be6fb13..d1298b64 100644 --- a/javascript/src/hypha/websocket-client.js +++ b/javascript/src/hypha/websocket-client.js @@ -270,3 +270,222 @@ export async function connectToServer(config) { } return wm; } + +export async function connectToLocalServer({ + server_url, + workspace, + client_id, + token, + method_timeout, + name +}) { + const context = typeof window !== "undefined" ? window : self; + const isWindow = typeof window !== "undefined"; + const postMessage = message => { + if (isWindow) { + window.parent.postMessage(message, "*"); + } else { + self.postMessage(message); + } + }; + + class WebSocketProxy { + constructor(url) { + this.url = url; + this.onopen = () => {}; + this.onmessage = () => {}; + this.onclose = () => {}; + this.onerror = () => {}; + + this.readyState = WebSocket.CONNECTING; + context.addEventListener( + "message", + event => { + const { type, data, to } = event.data; + if (to !== client_id) { + console.debug("message not for me", to, client_id); + return; + } + switch (type) { + case "message": + if (this.readyState === WebSocket.OPEN && this.onmessage) { + this.onmessage({ data: data }); + } + break; + case "connected": + this.readyState = WebSocket.OPEN; + this.onopen(); + break; + case "closed": + this.readyState = WebSocket.CLOSED; + this.onclose(); + break; + default: + break; + } + }, + false + ); + postMessage({ + type: "connect", + url: this.url, + from: client_id, + workspace + }); + } + + send(data) { + if (this.readyState === WebSocket.OPEN) { + postMessage({ + type: "message", + data: data, + from: client_id, + workspace + }); + } + } + + close() { + this.readyState = WebSocket.CLOSING; + postMessage({ type: "close", from: client_id, workspace }); + this.onclose(); + } + + addEventListener(type, listener) { + if (type === "message") { + this.onmessage = listener; + } + if (type === "open") { + this.onopen = listener; + } + if (type === "close") { + this.onclose = listener; + } + if (type === "error") { + this.onerror = listener; + } + } + } + const server = await connectToServer({ + server_url, + workspace, + client_id, + method_timeout, + name, + token, + WebSocketClass: WebSocketProxy + }); + return server; +} + +export async function setupLocalClient({enable_execution=false}) { + const context = typeof window !== "undefined" ? window : self; + const isWindow = typeof window !== "undefined"; + context.addEventListener( + "message", + event => { + const { + type, + server_url, + workspace, + client_id, + token, + method_timeout, + name, + config, + } = event.data; + + if (type === "initializeHyphaClient") { + if (!server_url || !workspace || !client_id) { + console.error("server_url, workspace, and client_id are required."); + return; + } + hyphaWebsocketClient + .connectToLocalServer({ + server_url, + workspace, + client_id, + token, + method_timeout, + name, + }) + .then(async server => { + globalThis.api = server; + // for iframe + if (isWindow && enable_execution) { + function loadScript(script) { + return new Promise((resolve, reject) => { + const scriptElement = document.createElement("script"); + scriptElement.innerHTML = script.content; + scriptElement.lang = script.lang; + + scriptElement.onload = () => resolve(); + scriptElement.onerror = e => reject(e); + + document.head.appendChild(scriptElement); + }); + } + if (config.styles && config.styles.length > 0) { + for (const style of config.styles) { + const styleElement = document.createElement("style"); + styleElement.innerHTML = style.content; + styleElement.lang = style.lang; + document.head.appendChild(styleElement); + } + } + if (config.links && config.links.length > 0) { + for (const link of config.links) { + const linkElement = document.createElement("a"); + linkElement.href = link.url; + linkElement.innerText = link.text; + document.body.appendChild(linkElement); + } + } + if (config.windows && config.windows.length > 0) { + for (const w of config.windows) { + document.body.innerHTML = w.content; + break; + } + } + if (config.scripts && config.scripts.length > 0) { + try { + for (const script of config.scripts) { + if (script.lang !== "javascript") + throw new Error("Only javascript scripts are supported"); + await loadScript(script); // Await the loading of each script + } + } catch (e) { + // If any script fails to load, send an error message + await server.update_client_info({ + id: client_id, + error: e.message + }); + } + } + } + // for web worker + else if ( + !isWindow && + enable_execution && + config.scripts && + config.scripts.length > 0 + ) { + try { + for (const script of config.scripts) { + if (script.lang !== "javascript") + throw new Error("Only javascript scripts are supported"); + eval(script.content); + } + } catch (e) { + await server.update_client_info({ + id: client_id, + error: e.message + }); + } + } + }); + } + }, + false + ); +} diff --git a/python/imjoy_rpc/VERSION b/python/imjoy_rpc/VERSION index 91c6f08c..303a4083 100644 --- a/python/imjoy_rpc/VERSION +++ b/python/imjoy_rpc/VERSION @@ -1,3 +1,3 @@ { - "version": "0.5.50" + "version": "0.5.51" } From ff1a7bdd79b8e09ab561fff70024fbdd25714021 Mon Sep 17 00:00:00 2001 From: Wei Ouyang Date: Sat, 22 Jun 2024 23:01:42 +0200 Subject: [PATCH 2/3] Fix websocket patch --- javascript/src/hypha/websocket-client.js | 218 +++++++++++--------- python/imjoy_rpc/hypha/pyodide_websocket.py | 106 +++++++++- 2 files changed, 221 insertions(+), 103 deletions(-) diff --git a/javascript/src/hypha/websocket-client.js b/javascript/src/hypha/websocket-client.js index d1298b64..da7751bd 100644 --- a/javascript/src/hypha/websocket-client.js +++ b/javascript/src/hypha/websocket-client.js @@ -34,8 +34,10 @@ class WebsocketRPCConnection { this._opening = null; this._retry_count = 0; this._closing = false; + this._client_id = client_id; + this._workspace = workspace; // Allow to override the WebSocket class for mocking or testing - this._WebSocketClass = WebSocketClass || WebSocket; + this._WebSocketClass = WebSocketClass; } set_reconnection_token(token) { @@ -57,7 +59,24 @@ class WebsocketRPCConnection { : this._server_url; console.info("Creating a new connection to ", server_url.split("?")[0]); - const websocket = new this._WebSocketClass(server_url); + let websocket = null; + if(server_url.startsWith("wss://local-hypha-server:")){ + if(this._WebSocketClass){ + websocket = new this._WebSocketClass(server_url) + } + else{ + console.log("Using local websocket") + websocket = new LocalWebSocket(server_url, this._client_id, this._workspace); + } + } + else{ + if(this._WebSocketClass){ + websocket = new this._WebSocketClass(server_url) + } + else{ + websocket = new WebSocket(server_url); + } + } websocket.binaryType = "arraybuffer"; websocket.onmessage = event => { const data = event.data; @@ -181,7 +200,9 @@ export async function connectToServer(config) { let clientId = config.client_id; if (!clientId) { clientId = randId(); + config.client_id = clientId; } + let server_url = normalizeServerUrl(config.server_url); let connection = new WebsocketRPCConnection( @@ -271,113 +292,101 @@ export async function connectToServer(config) { return wm; } -export async function connectToLocalServer({ - server_url, - workspace, - client_id, - token, - method_timeout, - name -}) { - const context = typeof window !== "undefined" ? window : self; - const isWindow = typeof window !== "undefined"; - const postMessage = message => { - if (isWindow) { - window.parent.postMessage(message, "*"); - } else { - self.postMessage(message); - } - }; - - class WebSocketProxy { - constructor(url) { - this.url = url; - this.onopen = () => {}; - this.onmessage = () => {}; - this.onclose = () => {}; - this.onerror = () => {}; - - this.readyState = WebSocket.CONNECTING; - context.addEventListener( - "message", - event => { - const { type, data, to } = event.data; - if (to !== client_id) { - console.debug("message not for me", to, client_id); - return; - } - switch (type) { - case "message": - if (this.readyState === WebSocket.OPEN && this.onmessage) { - this.onmessage({ data: data }); - } - break; - case "connected": - this.readyState = WebSocket.OPEN; - this.onopen(); - break; - case "closed": - this.readyState = WebSocket.CLOSED; - this.onclose(); - break; - default: - break; - } - }, - false - ); - postMessage({ - type: "connect", - url: this.url, - from: client_id, - workspace +class LocalWebSocket { + constructor(url, client_id, workspace) { + this.url = url; + this.onopen = () => {}; + this.onmessage = () => {}; + this.onclose = () => {}; + this.onerror = () => {}; + this.client_id = client_id; + this.workspace = workspace; + const context = typeof window !== "undefined" ? window : self; + const isWindow = typeof window !== "undefined"; + this.postMessage = message => { + if (isWindow) { + window.parent.postMessage(message, "*"); + } else { + self.postMessage(message); + } + }; + + this.readyState = WebSocket.CONNECTING; + context.addEventListener( + "message", + event => { + const { type, data, to } = event.data; + if (to !== this.client_id) { + console.debug("message not for me", to, this.client_id); + return; + } + switch (type) { + case "message": + if (this.readyState === WebSocket.OPEN && this.onmessage) { + this.onmessage({ data: data }); + } + break; + case "connected": + this.readyState = WebSocket.OPEN; + this.onopen(); + break; + case "closed": + this.readyState = WebSocket.CLOSED; + this.onclose(); + break; + default: + break; + } + }, + false + ); + + if(!this.client_id) + throw new Error("client_id is required"); + if(!this.workspace) + throw new Error("workspace is required"); + this.postMessage({ + type: "connect", + url: this.url, + from: this.client_id, + workspace: this.workspace + }); + } + + send(data) { + if (this.readyState === WebSocket.OPEN) { + this.postMessage({ + type: "message", + data: data, + from: this.client_id, + workspace: this.workspace }); } + } - send(data) { - if (this.readyState === WebSocket.OPEN) { - postMessage({ - type: "message", - data: data, - from: client_id, - workspace - }); - } - } + close() { + this.readyState = WebSocket.CLOSING; + this.postMessage({ type: "close", from: this.client_id, workspace: this.workspace }); + this.onclose(); + } - close() { - this.readyState = WebSocket.CLOSING; - postMessage({ type: "close", from: client_id, workspace }); - this.onclose(); + addEventListener(type, listener) { + if (type === "message") { + this.onmessage = listener; } - - addEventListener(type, listener) { - if (type === "message") { - this.onmessage = listener; - } - if (type === "open") { - this.onopen = listener; - } - if (type === "close") { - this.onclose = listener; - } - if (type === "error") { - this.onerror = listener; - } + if (type === "open") { + this.onopen = listener; + } + if (type === "close") { + this.onclose = listener; + } + if (type === "error") { + this.onerror = listener; } } - const server = await connectToServer({ - server_url, - workspace, - client_id, - method_timeout, - name, - token, - WebSocketClass: WebSocketProxy - }); - return server; } + export async function setupLocalClient({enable_execution=false}) { const context = typeof window !== "undefined" ? window : self; const isWindow = typeof window !== "undefined"; @@ -400,8 +409,13 @@ export async function setupLocalClient({enable_execution=false}) { console.error("server_url, workspace, and client_id are required."); return; } - hyphaWebsocketClient - .connectToLocalServer({ + + if(!server_url.startsWith("https://local-hypha-server:")){ + console.error("server_url should start with https://local-hypha-server:"); + return; + } + + connectToServer({ server_url, workspace, client_id, diff --git a/python/imjoy_rpc/hypha/pyodide_websocket.py b/python/imjoy_rpc/hypha/pyodide_websocket.py index 29c6e9ee..decc9081 100644 --- a/python/imjoy_rpc/hypha/pyodide_websocket.py +++ b/python/imjoy_rpc/hypha/pyodide_websocket.py @@ -2,12 +2,108 @@ import asyncio import inspect from js import WebSocket +import js try: from pyodide.ffi import to_js except ImportError: from pyodide import to_js +local_websocket_patch = """ +class LocalWebSocket { + constructor(url, client_id, workspace) { + this.url = url; + this.onopen = () => {}; + this.onmessage = () => {}; + this.onclose = () => {}; + this.onerror = () => {}; + this.client_id = client_id; + this.workspace = workspace; + const context = typeof window !== "undefined" ? window : self; + const isWindow = typeof window !== "undefined"; + this.postMessage = message => { + if (isWindow) { + window.parent.postMessage(message, "*"); + } else { + self.postMessage(message); + } + }; + + this.readyState = WebSocket.CONNECTING; + context.addEventListener( + "message", + event => { + const { type, data, to } = event.data; + if (to !== this.client_id) { + console.debug("message not for me", to, this.client_id); + return; + } + switch (type) { + case "message": + if (this.readyState === WebSocket.OPEN && this.onmessage) { + this.onmessage({ data: data }); + } + break; + case "connected": + this.readyState = WebSocket.OPEN; + this.onopen(); + break; + case "closed": + this.readyState = WebSocket.CLOSED; + this.onclose(); + break; + default: + break; + } + }, + false + ); + + if(!this.client_id) + throw new Error("client_id is required"); + if(!this.workspace) + throw new Error("workspace is required"); + this.postMessage({ + type: "connect", + url: this.url, + from: this.client_id, + workspace: this.workspace + }); + } + + send(data) { + if (this.readyState === WebSocket.OPEN) { + this.postMessage({ + type: "message", + data: data, + from: this.client_id, + workspace: this.workspace + }); + } + } + + close() { + this.readyState = WebSocket.CLOSING; + this.postMessage({ type: "close", from: this.client_id, workspace: this.workspace }); + this.onclose(); + } + + addEventListener(type, listener) { + if (type === "message") { + this.onmessage = listener; + } + if (type === "open") { + this.onopen = listener; + } + if (type === "close") { + this.onclose = listener; + } + if (type === "error") { + this.onerror = listener; + } + } +} +""" class PyodideWebsocketRPCConnection: """Represent a pyodide websocket RPC connection.""" @@ -19,6 +115,7 @@ def __init__( self._websocket = None self._handle_message = None assert server_url and client_id + server_url = server_url + f"?client_id={client_id}" if workspace is not None: server_url += f"&workspace={workspace}" @@ -27,6 +124,8 @@ def __init__( self._server_url = server_url self._logger = logger self._timeout = timeout + self._client_id = client_id + self._workspace = workspace def on_message(self, handler): """Register a message handler.""" @@ -35,7 +134,12 @@ def on_message(self, handler): async def open(self): """Open the connection.""" - self._websocket = WebSocket.new(self._server_url) + if self._server_url.startswith("https://local-hypha-server:"): + print("Using local websocket") + LocalWebSocket = js.eval("(" + local_websocket_patch + ")") + self._websocket = LocalWebSocket.new(self._server_url, self._client_id, self._workspace) + else: + self._websocket = WebSocket.new(self._server_url) self._websocket.binaryType = "arraybuffer" def onmessage(evt): From 5de854146eba174a5fbadaa88794a6f5039dbbfc Mon Sep 17 00:00:00 2001 From: Wei Ouyang Date: Sat, 22 Jun 2024 23:13:28 +0200 Subject: [PATCH 3/3] Format --- javascript/src/hypha/websocket-client.js | 187 ++++++++++---------- python/imjoy_rpc/hypha/pyodide_websocket.py | 2 +- 2 files changed, 96 insertions(+), 93 deletions(-) diff --git a/javascript/src/hypha/websocket-client.js b/javascript/src/hypha/websocket-client.js index da7751bd..ee3cb98b 100644 --- a/javascript/src/hypha/websocket-client.js +++ b/javascript/src/hypha/websocket-client.js @@ -60,20 +60,21 @@ class WebsocketRPCConnection { console.info("Creating a new connection to ", server_url.split("?")[0]); let websocket = null; - if(server_url.startsWith("wss://local-hypha-server:")){ - if(this._WebSocketClass){ - websocket = new this._WebSocketClass(server_url) - } - else{ - console.log("Using local websocket") - websocket = new LocalWebSocket(server_url, this._client_id, this._workspace); - } - } - else{ - if(this._WebSocketClass){ - websocket = new this._WebSocketClass(server_url) + if (server_url.startsWith("wss://local-hypha-server:")) { + if (this._WebSocketClass) { + websocket = new this._WebSocketClass(server_url); + } else { + console.log("Using local websocket"); + websocket = new LocalWebSocket( + server_url, + this._client_id, + this._workspace + ); } - else{ + } else { + if (this._WebSocketClass) { + websocket = new this._WebSocketClass(server_url); + } else { websocket = new WebSocket(server_url); } } @@ -202,7 +203,7 @@ export async function connectToServer(config) { clientId = randId(); config.client_id = clientId; } - + let server_url = normalizeServerUrl(config.server_url); let connection = new WebsocketRPCConnection( @@ -341,10 +342,8 @@ class LocalWebSocket { false ); - if(!this.client_id) - throw new Error("client_id is required"); - if(!this.workspace) - throw new Error("workspace is required"); + if (!this.client_id) throw new Error("client_id is required"); + if (!this.workspace) throw new Error("workspace is required"); this.postMessage({ type: "connect", url: this.url, @@ -366,7 +365,11 @@ class LocalWebSocket { close() { this.readyState = WebSocket.CLOSING; - this.postMessage({ type: "close", from: this.client_id, workspace: this.workspace }); + this.postMessage({ + type: "close", + from: this.client_id, + workspace: this.workspace + }); this.onclose(); } @@ -386,8 +389,7 @@ class LocalWebSocket { } } - -export async function setupLocalClient({enable_execution=false}) { +export async function setupLocalClient({ enable_execution = false }) { const context = typeof window !== "undefined" ? window : self; const isWindow = typeof window !== "undefined"; context.addEventListener( @@ -401,103 +403,104 @@ export async function setupLocalClient({enable_execution=false}) { token, method_timeout, name, - config, + config } = event.data; - + if (type === "initializeHyphaClient") { if (!server_url || !workspace || !client_id) { console.error("server_url, workspace, and client_id are required."); return; } - if(!server_url.startsWith("https://local-hypha-server:")){ - console.error("server_url should start with https://local-hypha-server:"); + if (!server_url.startsWith("https://local-hypha-server:")) { + console.error( + "server_url should start with https://local-hypha-server:" + ); return; } - + connectToServer({ - server_url, - workspace, - client_id, - token, - method_timeout, - name, - }) - .then(async server => { - globalThis.api = server; - // for iframe - if (isWindow && enable_execution) { - function loadScript(script) { - return new Promise((resolve, reject) => { - const scriptElement = document.createElement("script"); - scriptElement.innerHTML = script.content; - scriptElement.lang = script.lang; - - scriptElement.onload = () => resolve(); - scriptElement.onerror = e => reject(e); - - document.head.appendChild(scriptElement); - }); - } - if (config.styles && config.styles.length > 0) { - for (const style of config.styles) { - const styleElement = document.createElement("style"); - styleElement.innerHTML = style.content; - styleElement.lang = style.lang; - document.head.appendChild(styleElement); - } - } - if (config.links && config.links.length > 0) { - for (const link of config.links) { - const linkElement = document.createElement("a"); - linkElement.href = link.url; - linkElement.innerText = link.text; - document.body.appendChild(linkElement); - } + server_url, + workspace, + client_id, + token, + method_timeout, + name + }).then(async server => { + globalThis.api = server; + // for iframe + if (isWindow && enable_execution) { + function loadScript(script) { + return new Promise((resolve, reject) => { + const scriptElement = document.createElement("script"); + scriptElement.innerHTML = script.content; + scriptElement.lang = script.lang; + + scriptElement.onload = () => resolve(); + scriptElement.onerror = e => reject(e); + + document.head.appendChild(scriptElement); + }); + } + if (config.styles && config.styles.length > 0) { + for (const style of config.styles) { + const styleElement = document.createElement("style"); + styleElement.innerHTML = style.content; + styleElement.lang = style.lang; + document.head.appendChild(styleElement); } - if (config.windows && config.windows.length > 0) { - for (const w of config.windows) { - document.body.innerHTML = w.content; - break; - } + } + if (config.links && config.links.length > 0) { + for (const link of config.links) { + const linkElement = document.createElement("a"); + linkElement.href = link.url; + linkElement.innerText = link.text; + document.body.appendChild(linkElement); } - if (config.scripts && config.scripts.length > 0) { - try { - for (const script of config.scripts) { - if (script.lang !== "javascript") - throw new Error("Only javascript scripts are supported"); - await loadScript(script); // Await the loading of each script - } - } catch (e) { - // If any script fails to load, send an error message - await server.update_client_info({ - id: client_id, - error: e.message - }); - } + } + if (config.windows && config.windows.length > 0) { + for (const w of config.windows) { + document.body.innerHTML = w.content; + break; } } - // for web worker - else if ( - !isWindow && - enable_execution && - config.scripts && - config.scripts.length > 0 - ) { + if (config.scripts && config.scripts.length > 0) { try { for (const script of config.scripts) { if (script.lang !== "javascript") throw new Error("Only javascript scripts are supported"); - eval(script.content); + await loadScript(script); // Await the loading of each script } } catch (e) { + // If any script fails to load, send an error message await server.update_client_info({ id: client_id, error: e.message }); } } - }); + } + // for web worker + else if ( + !isWindow && + enable_execution && + config.scripts && + config.scripts.length > 0 + ) { + try { + for (const script of config.scripts) { + if (script.lang !== "javascript") + throw new Error("Only javascript scripts are supported"); + eval(script.content); + } + } catch (e) { + await server.update_client_info({ + id: client_id, + error: e.message + }); + } + } + }); } }, false diff --git a/python/imjoy_rpc/hypha/pyodide_websocket.py b/python/imjoy_rpc/hypha/pyodide_websocket.py index decc9081..d6e0d638 100644 --- a/python/imjoy_rpc/hypha/pyodide_websocket.py +++ b/python/imjoy_rpc/hypha/pyodide_websocket.py @@ -134,7 +134,7 @@ def on_message(self, handler): async def open(self): """Open the connection.""" - if self._server_url.startswith("https://local-hypha-server:"): + if self._server_url.startswith("wss://local-hypha-server:"): print("Using local websocket") LocalWebSocket = js.eval("(" + local_websocket_patch + ")") self._websocket = LocalWebSocket.new(self._server_url, self._client_id, self._workspace)