-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add FileExecutor #344
Add FileExecutor #344
Conversation
Warning Rate limit exceeded@jan-janssen has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 45 minutes and 19 seconds before requesting another review. How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughThe recent updates introduce HDF5 file handling capabilities to the Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Outside diff range and nitpick comments (2)
pympipool/cache/shared.py (2)
13-26
: Consider adding docstrings to theFutureItem
class and its methods.Adding docstrings would improve the maintainability and readability of the code by providing a clear description of the class's purpose and its methods' functionalities.
90-93
: Optimize the regular expression in_get_hash
.Consider pre-compiling the regular expression for performance improvement, especially if
_get_hash
is called frequently.pattern = re.compile(b"(?<=/ipykernel_)(.*)(?=/)") binary_no_ipykernel = pattern.sub(b"", binary)
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (15)
- .ci_support/environment-docs.yml (1 hunks)
- .ci_support/environment-mpich.yml (1 hunks)
- .ci_support/environment-old.yml (1 hunks)
- .ci_support/environment-openmpi.yml (1 hunks)
- .ci_support/environment-win.yml (1 hunks)
- binder/environment.yml (1 hunks)
- pympipool/init.py (2 hunks)
- pympipool/cache/backend.py (1 hunks)
- pympipool/cache/executor.py (1 hunks)
- pympipool/cache/hdf.py (1 hunks)
- pympipool/cache/shared.py (1 hunks)
- pyproject.toml (1 hunks)
- tests/executables/test_cache_shared.py (1 hunks)
- tests/test_cache_file_executor.py (1 hunks)
- tests/test_cache_hdf.py (1 hunks)
Files skipped from review due to trivial changes (7)
- .ci_support/environment-docs.yml
- .ci_support/environment-mpich.yml
- .ci_support/environment-old.yml
- .ci_support/environment-openmpi.yml
- .ci_support/environment-win.yml
- binder/environment.yml
- pyproject.toml
Additional comments not posted (11)
pympipool/cache/executor.py (2)
8-12
: Constructor implementation looks good. It correctly sets up the cache directory and handles existing directories gracefully.
13-22
: Method to set up the process is well-implemented. UsingRaisingThread
helps in proper exception handling across threads.tests/test_cache_hdf.py (1)
19-64
: Test cases are comprehensive and well-implemented. They cover various scenarios and properly clean up resources.pympipool/cache/hdf.py (3)
9-32
: Thedump
function is well-implemented, usingcloudpickle
for serialization andh5io
for HDF5 interactions, ensuring robust data handling.
34-64
: Theload
function is correctly implemented, handling potential missing entries gracefully and ensuring data integrity.
67-83
: Theget_output
function is effectively implemented, correctly checking for and retrieving output from HDF5 files.tests/executables/test_cache_shared.py (1)
22-94
: Test cases are comprehensive and effectively test the shared cache functionalities, including asynchronous operations usingFuture
andFutureItem
.tests/test_cache_file_executor.py (1)
23-105
: Test cases forFileExecutor
are well-implemented, effectively simulating various scenarios and properly managing resources.pympipool/cache/shared.py (1)
28-33
: Ensure proper error handling inexecute_in_subprocess
.This function currently lacks error handling for subprocess execution, which could lead to unmanaged subprocess failures. Consider adding a try-except block to handle potential exceptions.
pympipool/__init__.py (2)
13-13
: Confirm the addition ofFileExecutor
to__all__
.The addition of
FileExecutor
to the__all__
list is consistent with the PR's objective to make it part of the public API.
3-3
: Ensure that theFileExecutor
import is necessary.This verification is to ensure that the
FileExecutor
import is utilized within the file, as unused imports should be avoided to maintain clean code.Verification successful
The
FileExecutor
import is indeed used within thepympipool/__init__.py
file.
pympipool/__init__.py
: TheFileExecutor
is imported and utilized within the file.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the necessity of the `FileExecutor` import. # Test: Search for `FileExecutor` usage in the file. rg --type python $'FileExecutor' pympipool/__init__.pyLength of output: 84
Script:
#!/bin/bash # Description: Verify the necessity of the `FileExecutor` import. # Test: Search for `FileExecutor` usage in the file. rg 'FileExecutor' pympipool/__init__.pyLength of output: 107
def execute_tasks_h5(future_queue, cache_directory, execute_function): | ||
memory_dict, process_dict, file_name_dict = {}, {}, {} | ||
while True: | ||
task_dict = None | ||
try: | ||
task_dict = future_queue.get_nowait() | ||
except queue.Empty: | ||
pass | ||
if ( | ||
task_dict is not None | ||
and "shutdown" in task_dict.keys() | ||
and task_dict["shutdown"] | ||
): | ||
future_queue.task_done() | ||
future_queue.join() | ||
break | ||
elif task_dict is not None: | ||
task_args, task_kwargs, future_wait_key_lst = _convert_args_and_kwargs( | ||
task_dict=task_dict, | ||
memory_dict=memory_dict, | ||
file_name_dict=file_name_dict, | ||
) | ||
task_key, data_dict = _serialize_funct_h5( | ||
task_dict["fn"], *task_args, **task_kwargs | ||
) | ||
if task_key not in memory_dict.keys(): | ||
if task_key + ".h5out" not in os.listdir(cache_directory): | ||
file_name = os.path.join(cache_directory, task_key + ".h5in") | ||
dump(file_name=file_name, data_dict=data_dict) | ||
process_dict[task_key] = execute_function( | ||
command=_get_execute_command(file_name=file_name), | ||
task_dependent_lst=[ | ||
process_dict[k] for k in future_wait_key_lst | ||
], | ||
) | ||
file_name_dict[task_key] = os.path.join( | ||
cache_directory, task_key + ".h5out" | ||
) | ||
memory_dict[task_key] = task_dict["future"] | ||
future_queue.task_done() | ||
else: | ||
memory_dict = { | ||
key: _check_task_output( | ||
task_key=key, future_obj=value, cache_directory=cache_directory | ||
) | ||
for key, value in memory_dict.items() | ||
if not value.done() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor execute_tasks_h5
to improve readability and maintainability.
- while True:
- task_dict = None
- try:
- task_dict = future_queue.get_nowait()
- except queue.Empty:
- pass
- if (
- task_dict is not None
- and "shutdown" in task_dict.keys()
- and task_dict["shutdown"]
- ):
- future_queue.task_done()
- future_queue.join()
- break
- elif task_dict is not None:
- # processing logic
- else:
- # memory_dict processing logic
+ while not _shutdown_requested(future_queue):
+ task_dict = _try_get_task(future_queue)
+ if task_dict:
+ _process_task(task_dict, future_queue, memory_dict, file_name_dict, cache_directory, execute_function)
This refactoring suggestion aims to break down the execute_tasks_h5
function into smaller, more manageable functions, improving readability and maintainability.
Committable suggestion was skipped due low confidence.
def execute_task_in_file(file_name: str): | ||
""" | ||
Execute the task stored in a given HDF5 file | ||
|
||
Args: | ||
file_name (str): file name of the HDF5 file as absolute path | ||
""" | ||
apply_dict = load(file_name=file_name) | ||
args = [ | ||
arg if not isinstance(arg, FutureItem) else arg.result() | ||
for arg in apply_dict["args"] | ||
] | ||
kwargs = { | ||
key: arg if not isinstance(arg, FutureItem) else arg.result() | ||
for key, arg in apply_dict["kwargs"].items() | ||
} | ||
result = apply_dict["fn"].__call__(*args, **kwargs) | ||
file_name_out = os.path.splitext(file_name)[0] | ||
os.rename(file_name, file_name_out + ".h5ready") | ||
dump(file_name=file_name_out + ".h5ready", data_dict={"output": result}) | ||
os.rename(file_name_out + ".h5ready", file_name_out + ".h5out") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using atomic operations or file locking mechanisms to handle file renaming to avoid issues in concurrent environments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (5)
- .ci_support/environment-mpich.yml (1 hunks)
- .ci_support/environment-openmpi.yml (1 hunks)
- binder/environment.yml (1 hunks)
- pympipool/cache/main.py (1 hunks)
- pympipool/cache/shared.py (1 hunks)
Files skipped from review due to trivial changes (1)
- pympipool/cache/main.py
Files skipped from review as they are similar to previous changes (4)
- .ci_support/environment-mpich.yml
- .ci_support/environment-openmpi.yml
- binder/environment.yml
- pympipool/cache/shared.py
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
Summary by CodeRabbit
New Features
FileExecutor
class for handling file execution tasks.FileExecutor
.Bug Fixes
Dependencies
h5py
version 3.11.0 andh5io
version 0.2.2 across various environments.Tests
FileExecutor
and HDF5-related caching functionalities.Chores