-
Notifications
You must be signed in to change notification settings - Fork 41
Fixed naming from workers to threads, preparing production release 0.0.1 #261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Caution Review failedThe pull request is closed. 📝 WalkthroughSummary by CodeRabbit
WalkthroughRenamed Runtime's concurrency parameter from Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Host
participant Runtime
participant Queue
participant WorkerThread as _worker_thread
participant StateMgr as State Manager
participant Node
Host->>Runtime: start()
Note over Runtime: init(thread_count >= 1)
Runtime->>Queue: enqueue pending states (batch)
Runtime->>Runtime: spawn _thread_count worker threads
loop For each thread
Runtime->>WorkerThread: run()
end
par Worker processing
WorkerThread->>Queue: get state
alt state available
WorkerThread->>Node: execute(state, Secrets(**secrets["secrets"])) %% # type: ignore on one site
alt success
WorkerThread->>StateMgr: notify executed
else error
WorkerThread->>StateMgr: notify errored
end
else no state
WorkerThread->>Runtime: sleep / poll interval
end
and Repeat
Runtime->>Queue: fetch/enqueue new states
end
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: ASSERTIVE Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (2)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @NiveditJain, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request primarily focuses on improving the clarity and accuracy of terminology within the python-sdk/exospherehost/runtime.py file. It renames instances of "workers" to "threads" to better reflect that these are asyncio tasks (coroutines) rather than traditional OS-level worker processes. This change enhances the codebase's readability and aligns with Python's concurrency model. Additionally, it includes a minor type-hinting adjustment and an import of the multiprocessing module, potentially in preparation for future enhancements, and is part of preparing for production release 0.0.1.
Highlights
- Terminology Refinement: The
workersparameter in theRuntimeclass constructor was renamed tothread_count, and its internal usage was updated to reflect this change. - Method Renaming: The
_workerasynchronous method was renamed to_worker_threadto maintain consistent terminology across the codebase. - Type Hinting Adjustment: A
# type: ignorecomment was added to a specific line to suppress a type-checking warning, improving code cleanliness. - Module Import: The
multiprocessingmodule was imported, potentially in preparation for future features or enhancements related to process management.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request aims to standardize naming by changing workers to threads ahead of a production release. However, there are several issues that need to be addressed.
Most importantly, the tests have not been updated to reflect this renaming. For example, test_runtime_invalid_params_raises in test_runtime_validation.py still uses the old workers parameter, which will cause the tests to fail. A failing test suite is a critical blocker for a production release.
Other points for improvement:
- An unused
multiprocessingmodule is imported inruntime.py. - The term
thread_countis misleading in anasynciocontext, where concurrency is achieved with tasks on a single thread. The original termworkersis more conventional and clearer. - A
# type: ignorecomment was added to fix a typing issue. This is an unrelated change and should be moved to a separate PR to keep this one focused on the renaming.
| """ | ||
|
|
||
| def __init__(self, namespace: str, name: str, nodes: List[type[BaseNode]], state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, workers: int = 4, state_manage_version: str = "v0", poll_interval: int = 1): | ||
| def __init__(self, namespace: str, name: str, nodes: List[type[BaseNode]], state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, thread_count: int = 4, state_manage_version: str = "v0", poll_interval: int = 1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming workers to thread_count is potentially misleading. In asyncio, concurrency is handled by tasks running on a single OS thread's event loop. The term 'worker' is standard for describing a task that processes a queue. Using 'thread' might incorrectly suggest that the threading module and multiple OS threads are being used. It would be clearer to stick with workers or a similar term like worker_count.
| node = self._node_mapping[state["node_name"]] | ||
| secrets = await self._get_secrets(state["state_id"]) | ||
| outputs = await node()._execute(node.Inputs(**state["inputs"]), node.Secrets(**secrets["secrets"])) | ||
| outputs = await node()._execute(node.Inputs(**state["inputs"]), node.Secrets(**secrets["secrets"])) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This # type: ignore comment was added to suppress a typing error, but this change is unrelated to the main goal of this PR (renaming workers to threads). It's best practice to keep pull requests focused on a single concern. Please move this fix to a separate PR. The typing error itself points to a potential design issue in BaseNode's type hierarchy that should be investigated.
| outputs = await node()._execute(node.Inputs(**state["inputs"]), node.Secrets(**secrets["secrets"])) # type: ignore | |
| outputs = await node()._execute(node.Inputs(**state["inputs"]), node.Secrets(**secrets["secrets"])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
python-sdk/exospherehost/runtime.py (1)
311-337: Make the worker resilient and align naming with asyncio (“task”), ensure task_done in finally, and guard error notifications.Today, if _notify_errored raises, task_done is never called, risking queue deadlock. Also, naming it “thread” is inaccurate (it’s an asyncio task).
Refactor:
- async def _worker_thread(self): + async def _worker_task(self): """ - Worker task that processes states from the queue. + Worker task that processes states from the queue. @@ - while True: - state = await self._state_queue.get() - - try: - node = self._node_mapping[state["node_name"]] - secrets = await self._get_secrets(state["state_id"]) - outputs = await node()._execute(node.Inputs(**state["inputs"]), node.Secrets(**secrets["secrets"])) # type: ignore - - if outputs is None: - outputs = [] - - if not isinstance(outputs, list): - outputs = [outputs] - - await self._notify_executed(state["state_id"], outputs) - - except Exception as e: - await self._notify_errored(state["state_id"], str(e)) - - self._state_queue.task_done() # type: ignore + while True: + state = await self._state_queue.get() + state_id = None + try: + state_id = state["state_id"] + node_name = state["node_name"] + node_cls = self._node_mapping[node_name] + inputs = node_cls.Inputs(**state["inputs"]) + secrets = await self._get_secrets(state_id) # returns a flat dict after the change below + outputs = await node_cls()._execute(inputs, node_cls.Secrets(**secrets)) + + if outputs is None: + outputs_list = [] + elif isinstance(outputs, list): + outputs_list = outputs + else: + outputs_list = [outputs] + + await self._notify_executed(state_id, outputs_list) + except Exception as e: + try: + await self._notify_errored(state_id or state.get("state_id", "<unknown>"), str(e)) + except Exception as notify_exc: + logger.error(f"Failed to notify errored state %s: %s", state_id, notify_exc) + finally: + self._state_queue.task_done() # type: ignoreNotes:
- This also drops the need for the inline “# type: ignore” on the secrets line once _get_secrets is fixed per the next comment.
- If you adopt the API rename in the previous comment, line 351 is already updated to call _worker_task.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
python-sdk/exospherehost/runtime.py(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
python-sdk/exospherehost/runtime.py (1)
python-sdk/exospherehost/node/BaseNode.py (4)
BaseNode(6-100)_execute(70-83)Inputs(29-35)Secrets(45-68)
| @@ -1,5 +1,7 @@ | |||
| import asyncio | |||
| import os | |||
| import multiprocessing | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
Confirm intent for new multiprocessing import (currently unused).
The new import isn’t referenced in this module. If you don’t plan to introduce multiprocessing-based code in this PR, please remove it to avoid confusion. If it’s a placeholder for imminent work, ignore this note.
🤖 Prompt for AI Agents
In python-sdk/exospherehost/runtime.py around line 3, the new import
multiprocessing is unused; remove the import to avoid confusion and linter
warnings (or, if you intended to add multiprocessing-based functionality in this
PR, add the corresponding code that uses multiprocessing and update
tests/documentation accordingly) — default action: delete the unused "import
multiprocessing" line.
| """ | ||
|
|
||
| def __init__(self, namespace: str, name: str, nodes: List[type[BaseNode]], state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, workers: int = 4, state_manage_version: str = "v0", poll_interval: int = 1): | ||
| def __init__(self, namespace: str, name: str, nodes: List[type[BaseNode]], state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, thread_count: int = 4, state_manage_version: str = "v0", poll_interval: int = 1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
API misnomer: “thread_count” suggests OS threads, but implementation uses asyncio tasks. Change before publishing 0.0.1.
Exposing “thread_count” as a public constructor param (and naming the worker as “_worker_thread”) is misleading and will be hard to change post-release. The runtime spawns asyncio tasks (cooperative coroutines) via asyncio.create_task, not threads. Recommend renaming to worker_count (or task_count) and updating validation, docs, and call sites accordingly.
Proposed minimal API/name fixes:
-def __init__(self, namespace: str, name: str, nodes: List[type[BaseNode]], state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, thread_count: int = 4, state_manage_version: str = "v0", poll_interval: int = 1):
+def __init__(self, namespace: str, name: str, nodes: List[type[BaseNode]], state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, worker_count: int = 4, state_manage_version: str = "v0", poll_interval: int = 1):
@@
- self._thread_count = thread_count
+ self._worker_count = worker_count
@@
- if self._thread_count < 1:
- raise ValueError("Thread count should be at least 1")
+ if self._worker_count < 1:
+ raise ValueError("Worker count should be at least 1")
@@
- worker_tasks = [asyncio.create_task(self._worker_thread()) for _ in range(self._thread_count)]
+ worker_tasks = [asyncio.create_task(self._worker_task()) for _ in range(self._worker_count)]Also update docstrings to reflect the terminology and avoid implying OS threads:
- workers (int, optional): Number of concurrent worker tasks. Defaults to 4.
+ worker_count (int, optional): Number of concurrent worker tasks (asyncio tasks). Defaults to 4.
@@
- ValueError: If configuration is invalid (e.g., missing URI or key, batch_size/workers < 1).
+ ValueError: If configuration is invalid (e.g., missing URI or key, batch_size/worker_count < 1).If you truly intend real threads, we’d need a different design (e.g., threading or ThreadPoolExecutor, and careful event loop handoff), which is a larger change.
Run this to find lingering references that need renaming and stale “workers” mentions in code/docs:
Also applies to: 58-58, 91-92, 351-351
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -nP -C2 '(?<!_)workers?\b|\bthread_count\b|\b_thread_count\b|\b_worker_thread\b'Length of output: 7529
Rename thread_count to worker_count (asyncio tasks, not OS threads)
The constructor parameter and internal naming currently suggests OS threads, but the runtime spawns asyncio tasks. To avoid a breaking API change post-release, please rename all thread_count/workers references to worker_count (or task_count) before publishing v0.0.1.
Locations to update:
-
python-sdk/exospherehost/runtime.py
- Line 52: constructor signature –
thread_count➔worker_count - Line 58:
self._thread_count➔self._worker_count - Lines 91–92: validation – rename and error message to
"Worker count should be at least 1" - Line 311: private method – rename
_worker_thread➔_worker_task - Line 351: spawn loop – use
self._worker_countand call_worker_task() - Docstrings around lines 39–44: update parameter name and description (
workers➔worker_countand clarify “asyncio tasks”)
- Line 52: constructor signature –
-
python-sdk/tests/test_runtime_validation.py
- Line 65: test invocation – change
workers=0➔worker_count=0
- Line 65: test invocation – change
Proposed diffs:
--- a/python-sdk/exospherehost/runtime.py
@@ -52,7 +52,7 @@ class Runtime:
- def __init__(self, namespace: str, name: str, nodes: List[type[BaseNode]], state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, thread_count: int = 4, state_manage_version: str = "v0", poll_interval: int = 1):
+ def __init__(self, namespace: str, name: str, nodes: List[type[BaseNode]], state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, worker_count: int = 4, state_manage_version: str = "v0", poll_interval: int = 1):
@@ -58,7 +58,7 @@ class Runtime:
- self._thread_count = thread_count
+ self._worker_count = worker_count
@@ -91,5 +91,5 @@ class Runtime:
- if self._thread_count < 1:
- raise ValueError("Thread count should be at least 1")
+ if self._worker_count < 1:
+ raise ValueError("Worker count should be at least 1")
@@ -311,4 +311,4 @@ class Runtime:
- async def _worker_thread(self):
+ async def _worker_task(self):
@@ -351,4 +351,4 @@ class Runtime:
- worker_tasks = [asyncio.create_task(self._worker_thread()) for _ in range(self._thread_count)]
+ worker_tasks = [asyncio.create_task(self._worker_task()) for _ in range(self._worker_count)]--- a/python-sdk/tests/test_runtime_validation.py
@@ -63,7 +63,7 @@ def test_batch_size_validation():
Runtime(namespace="ns", name="rt", nodes=[GoodNode], batch_size=0)
with pytest.raises(ValueError):
- Runtime(namespace="ns", name="rt", nodes=[GoodNode], workers=0)
+ Runtime(namespace="ns", name="rt", nodes=[GoodNode], worker_count=0)Please apply these changes across docstrings and any other call sites to ensure consistency. Let me know if you need help updating remaining references.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def __init__(self, namespace: str, name: str, nodes: List[type[BaseNode]], state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, thread_count: int = 4, state_manage_version: str = "v0", poll_interval: int = 1): | |
| # File: python-sdk/exospherehost/runtime.py | |
| def __init__( | |
| self, | |
| namespace: str, | |
| name: str, | |
| nodes: List[type[BaseNode]], | |
| state_manager_uri: str | None = None, | |
| key: str | None = None, | |
| batch_size: int = 16, | |
| worker_count: int = 4, | |
| state_manage_version: str = "v0", | |
| poll_interval: int = 1, | |
| ): | |
| # ... | |
| self._worker_count = worker_count | |
| # ... | |
| # validation | |
| - if self._thread_count < 1: | |
| if self._worker_count < 1: | |
| raise ValueError("Worker count should be at least 1") | |
| # ... | |
| async def _worker_task(self): | |
| # ... | |
| # spawn worker tasks | |
| worker_tasks = [asyncio.create_task(self._worker_task()) for _ in range(self._worker_count)] | |
| # ... |
| def __init__(self, namespace: str, name: str, nodes: List[type[BaseNode]], state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, thread_count: int = 4, state_manage_version: str = "v0", poll_interval: int = 1): | |
| # File: python-sdk/tests/test_runtime_validation.py | |
| def test_batch_size_validation(): | |
| Runtime(namespace="ns", name="rt", nodes=[GoodNode], batch_size=1) | |
| with pytest.raises(ValueError): | |
| Runtime(namespace="ns", name="rt", nodes=[GoodNode], worker_count=0) |
🤖 Prompt for AI Agents
In python-sdk/exospherehost/runtime.py around line 52 (constructor) and nearby
docstrings (lines ~39–44), rename the parameter thread_count to worker_count and
update its docstring to clarify it represents asyncio tasks; change internal
attribute self._thread_count to self._worker_count (line 58) and update
validation on lines ~91–92 to validate worker_count and use the error message
"Worker count should be at least 1"; rename the private method _worker_thread to
_worker_task (around line 311) and in the spawn loop (around line 351) use
self._worker_count and call _worker_task(); also update any other references in
this file and tests (python-sdk/tests/test_runtime_validation.py line 65: pass
worker_count=0) and adjust names in docstrings and error messages to ensure
consistency across the codebase.
| node = self._node_mapping[state["node_name"]] | ||
| secrets = await self._get_secrets(state["state_id"]) | ||
| outputs = await node()._execute(node.Inputs(**state["inputs"]), node.Secrets(**secrets["secrets"])) | ||
| outputs = await node()._execute(node.Inputs(**state["inputs"]), node.Secrets(**secrets["secrets"])) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Secrets shape mismatch and unnecessary type: ignore. Return a flat mapping from _get_secrets and pass it directly.
Currently, _get_secrets() is annotated as Dict[str, str] but the worker indexes secrets["secrets"], forcing a type ignore. Fix by returning the inner map from _get_secrets and using it directly:
Change the call site (adjust names if you apply the worker refactor above):
-outputs = await node()._execute(node.Inputs(**state["inputs"]), node.Secrets(**secrets["secrets"])) # type: ignore
+outputs = await node()._execute(node.Inputs(**state["inputs"]), node.Secrets(**secrets))And update _get_secrets to return the flat dict:
- async def _get_secrets(self, state_id: str) -> Dict[str, str]:
+ async def _get_secrets(self, state_id: str) -> Dict[str, str]:
@@
- async with session.get(endpoint, headers=headers) as response: # type: ignore
+ async with session.get(endpoint, headers=headers) as response: # type: ignore
res = await response.json()
@@
- return res
+ # Expecting server payload like: {"secrets": {"KEY": "VALUE", ...}}
+ return res.get("secrets", {})This removes the type suppression and aligns annotation with actual return.
🤖 Prompt for AI Agents
In python-sdk/exospherehost/runtime.py around line 324, the call currently
indexes secrets["secrets"] and uses a "# type: ignore" because _get_secrets
returns a nested shape; change _get_secrets to return a flat Dict[str, str] (the
inner mapping) and update its type annotation accordingly, then at this call
site pass the returned dict directly into node.Secrets(...) (and remove the "#
type: ignore"), ensuring the shapes align and type checking passes.
python-sdk/exospherehost/runtime.py
Outdated
|
|
||
| poller = asyncio.create_task(self._enqueue()) | ||
| worker_tasks = [asyncio.create_task(self._worker()) for _ in range(self._workers)] | ||
| worker_tasks = [asyncio.create_task(self._worker_thread()) for _ in range(self._thread_count)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
Optional: reuse a shared aiohttp ClientSession for performance.
You create a new ClientSession per request across _register/_enqueue_call/_notify_executed/_notify_errored/_get_secrets. Consider instantiating one session in _start and reusing it (store on self), then closing on shutdown. This reduces connection churn and improves throughput.
🤖 Prompt for AI Agents
In python-sdk/exospherehost/runtime.py around line 351, multiple methods create
a new aiohttp.ClientSession per request which causes connection churn; instead
instantiate a single aiohttp.ClientSession in the startup path (e.g., _start)
and assign it to self._http_session, update _register, _enqueue_call,
_notify_executed, _notify_errored, and _get_secrets to reuse self._http_session
for all requests, and explicitly close/await self._http_session.close() in the
shutdown/stop path to release resources; ensure session creation happens once
and is awaited before worker tasks start and that methods handle the case where
the session is missing (raise or recreate only as a fallback).
…ity and update related documentation and tests accordingly.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
No description provided.