Skip to content

Commit

Permalink
Improve Message
Browse files Browse the repository at this point in the history
  • Loading branch information
oeway committed Jul 26, 2024
1 parent 069319c commit bc6ab89
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 13 deletions.
4 changes: 2 additions & 2 deletions javascript/src/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,8 @@ export class RPC extends MessageEmitter {

emit(main_message, extra_data) {
assert(
typeof main_message === "object" && main_message.type,
"Invalid message, must be an object with a type field.",
typeof main_message === "object" && main_message.type && main_message.to,
"Invalid message, must be an object with at least `type` and `to` fields.",
);
let message_package = msgpack_packb(main_message);
if (extra_data) {
Expand Down
7 changes: 4 additions & 3 deletions javascript/src/websocket-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,11 @@ class WebsocketRPCConnection {
let retry = 0;
const reconnect = async () => {
try {
console.info(
console.warn(
`Reconnecting to ${this._server_url.split("?")[0]} (attempt #${retry})`,
);
await this.open();
console.warn(`Successfully reconnected to server ${this._server_url}`);
} catch (e) {
if (`${e}`.includes("ConnectionAbortedError")) {
console.warn("Failed to reconnect, connection aborted:", e);
Expand All @@ -229,7 +230,6 @@ class WebsocketRPCConnection {
console.warn("Failed to reconnect, connection aborted:", e);
return;
}
console.warn("Failed to reconnect:", e);
await new Promise((resolve) => setTimeout(resolve, 1000));
if (
this._websocket &&
Expand Down Expand Up @@ -294,6 +294,7 @@ export async function login(config) {
const service_id = config.login_service_id || "public/*:hypha-login";
const timeout = config.login_timeout || 60;
const callback = config.login_callback;
const profile = config.profile;

const server = await connectToServer({
name: "initial login client",
Expand All @@ -307,7 +308,7 @@ export async function login(config) {
} else {
console.log(`Please open your browser and login at ${context.login_url}`);
}
return await svc.check(context.key, timeout);
return await svc.check(context.key, timeout, profile);
} catch (error) {
throw error;
} finally {
Expand Down
3 changes: 2 additions & 1 deletion python/hypha_rpc/pyodide_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ def _handle_close(self, evt):
async def reconnect():
nonlocal retry
try:
logger.info(f"Reconnecting to {self._server_url.split('?')[0]} (attempt #{retry})")
logger.warning(f"Reconnecting to {self._server_url.split('?')[0]} (attempt #{retry})")
await self.open()
logger.warning(f"Successfully reconnected to the server {self._server_url.split('?')[0]}")
except ConnectionAbortedError as e:
logger.warning("Failed to reconnect, connection aborted: %s", e)
return
Expand Down
4 changes: 3 additions & 1 deletion python/hypha_rpc/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,9 @@ async def _send_chunks(self, package, target_id, session_id):

def emit(self, main_message, extra_data=None):
"""Emit a message."""
assert isinstance(main_message, dict) and "type" in main_message
assert isinstance(main_message, dict) and "type" in main_message and "to" in main_message, (
"Invalid message, must be an object with at least `type` and `to` fields"
)
message_package = msgpack.packb(main_message)
if extra_data:
message_package = message_package + msgpack.packb(extra_data)
Expand Down
8 changes: 4 additions & 4 deletions python/hypha_rpc/websocket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ async def _listen(self):
retry = 0
while retry < MAX_RETRY:
try:
logger.info("Reconnecting to %s (attempt #%s)", self._server_url.split("?")[0], retry)
logger.warn("Reconnecting to %s (attempt #%s)", self._server_url.split("?")[0], retry)
await self.open()
logger.info("Successfully reconnected to %s", self._server_url.split("?")[0])
logger.warn("Successfully reconnected to %s", self._server_url.split("?")[0])
break
except NotImplementedError:
logger.error("Legacy authentication is not supported")
Expand All @@ -218,7 +218,6 @@ async def _listen(self):
logger.warning("Server refuse to reconnect: %s", e)
break
except Exception as e:
logger.warning("Failed to reconnect: %s", e)
await asyncio.sleep(1)
if self._websocket and self._websocket.open:
break
Expand Down Expand Up @@ -255,6 +254,7 @@ async def login(config):
service_id = config.get("login_service_id", "public/*:hypha-login")
timeout = config.get("login_timeout", 60)
callback = config.get("login_callback")
profile = config.get("profile", False)

server = await connect_to_server(
{
Expand All @@ -271,7 +271,7 @@ async def login(config):
else:
print(f"Please open your browser and login at {context['login_url']}")

return await svc.check(context["key"], timeout)
return await svc.check(context["key"], timeout, profile=profile)
except Exception as error:
raise error
finally:
Expand Down
6 changes: 4 additions & 2 deletions scripts/hypha_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
from hypha_rpc import connect_to_server
from hypha_rpc import login, connect_to_server

async def start_server(server_url):
server = await connect_to_server({"server_url": server_url})
user_info = await login({"server_url" : server_url, "profile": True})
print(f"Logged in as: {user_info}")
server = await connect_to_server({"server_url": server_url, "token": user_info.token})

def hello(name):
print("Hello " + name)
Expand Down

0 comments on commit bc6ab89

Please sign in to comment.