Skip to content

Commit

Permalink
Fix functions
Browse files Browse the repository at this point in the history
  • Loading branch information
oeway committed Sep 10, 2024
1 parent e2b2c5e commit c4d9391
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 12 deletions.
17 changes: 11 additions & 6 deletions hypha/core/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def __init__(
self._workspace_manager = None
self._websocket_server = None
self._server_id = server_id or "server-0"
self.manager_id = None
self.reconnection_token_life_time = reconnection_token_life_time
self._server_info = {
"hypha_version": __version__,
Expand Down Expand Up @@ -366,7 +365,6 @@ async def init(self, reset_redis, startup_functions=None):
await self.check_and_cleanup_servers()
self._workspace_manager = await self.register_workspace_manager()

self.manager_id = self._workspace_manager.get_client_id()
try:
await self.register_workspace(
WorkspaceInfo.model_validate(
Expand Down Expand Up @@ -437,7 +435,10 @@ async def init(self, reset_redis, startup_functions=None):
async def _register_root_services(self):
"""Register root services."""
self._root_workspace_interface = await self.get_workspace_interface(
self._root_user.get_workspace(), self._root_user, silent=False
self._root_user.get_workspace(),
self._root_user,
client_id=self._server_id,
silent=False,
)
await self._root_workspace_interface.register_service(
{
Expand Down Expand Up @@ -470,7 +471,7 @@ async def get_public_api(self):
"""Get the public API."""
if self._public_workspace_interface is None:
self._public_workspace_interface = await self.get_workspace_interface(
"public", self._root_user, silent=False
"public", self._root_user, client_id=self._server_id, silent=False
)
return self._public_workspace_interface

Expand Down Expand Up @@ -585,13 +586,17 @@ def connect_to_workspace(
workspace, user_info, client_id=client_id, timeout=timeout, silent=silent
)

def get_manager_id(self):
return self._workspace_manager.get_client_id()

async def register_workspace_manager(self):
"""Register a workspace manager."""
manager = WorkspaceManager(
self._redis,
self._root_user,
self._event_bus,
self._server_info,
self._server_id,
self._s3_controller,
)
await manager.setup()
Expand All @@ -615,7 +620,7 @@ def get_workspace_interface(
# the client will be hidden if client_id is None
if silent is None:
silent = client_id is None
client_id = client_id or self._server_id
client_id = client_id or "client-" + random_id(readable=False)
rpc = self.create_rpc(workspace, user_info, client_id=client_id, silent=silent)
return WorkspaceInterfaceContextManager(
rpc, self._redis, workspace, timeout=timeout
Expand Down Expand Up @@ -648,7 +653,7 @@ def create_rpc(
logger.info("Creating RPC for client %s", client_id)
assert user_info is not None, "User info is required"
connection = RedisRPCConnection(
self._event_bus, workspace, client_id, user_info, manager_id=self.manager_id
self._event_bus, workspace, client_id, user_info, manager_id=self._server_id
)
rpc = RPC(
connection,
Expand Down
7 changes: 4 additions & 3 deletions hypha/core/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def __init__(
root_user: UserInfo,
event_bus: EventBus,
server_info: dict,
client_id: str,
s3_controller: Optional[S3Controller] = None,
):
self._redis = redis
Expand All @@ -72,11 +73,11 @@ def __init__(
self._root_user = root_user
self._event_bus = event_bus
self._server_info = server_info
self._client_id = None
self._client_id = client_id
self._s3_controller = s3_controller

def get_client_id(self):
assert self._client_id is not None, "Manager client id not set."
assert self._client_id, "client id must not be empty."
return self._client_id

async def setup(
Expand All @@ -87,7 +88,7 @@ async def setup(
"""Setup the workspace manager."""
if self._initialized:
return self._rpc
self._client_id = "ws-" + random_id(readable=False)
assert self._client_id, "client id must be provided."
rpc = self._create_rpc(self._client_id)
self._rpc = rpc
management_service = self.create_service(service_id, service_name)
Expand Down
15 changes: 15 additions & 0 deletions hypha/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,14 @@ async def handle_function_service(self, service, path, scope, receive, send):
],
}
)
await send(
{
"type": "http.response.body",
"body": b"Internal Server Error: "
+ traceback.format_exc().encode(),
"more_body": False,
}
)
else:
await send(
{
Expand All @@ -369,6 +377,13 @@ async def handle_function_service(self, service, path, scope, receive, send):
],
}
)
await send(
{
"type": "http.response.body",
"body": b"Function not found: " + func_name.encode(),
"more_body": False,
}
)


class HTTPProxy:
Expand Down
2 changes: 1 addition & 1 deletion hypha/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ async def send_bytes(data):
"hypha_version": __version__,
"public_base_url": self.store.public_base_url,
"local_base_url": self.store.local_base_url,
"manager_id": self.store.manager_id,
"manager_id": self.store.get_manager_id(),
"workspace": workspace,
"client_id": client_id,
"user": user_info.model_dump(),
Expand Down
4 changes: 2 additions & 2 deletions tests/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ async def test_cleanup_workspace(fastapi_server, root_user_token):
servers = await admin.list_servers()
assert len(servers) == 1
summary = await api.cleanup("public")
assert "removed_clients" in summary
assert len(summary["removed_clients"]) == 0
assert "removed_clients" not in summary
assert len(summary) == 0


async def test_login(fastapi_server):
Expand Down

0 comments on commit c4d9391

Please sign in to comment.