-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Get data from cache #525
Get data from cache #525
Conversation
Example: ```python import os import pandas import shutil from executorlib import Executor from executorlib.standalone.hdf import get_cache_data cache_directory = "./cache" with Executor(backend="local", cache_directory=cache_directory) as exe: future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)] print([f.result() for f in future_lst]) df = pandas.DataFrame(get_cache_data(cache_directory=cache_directory)) df ```
for more information, see https://pre-commit.ci
WalkthroughThis pull request introduces a new function Changes
Sequence DiagramsequenceDiagram
participant Executor
participant GetCacheData
participant HDF5Files
Executor->>GetCacheData: Request cache data
GetCacheData->>HDF5Files: Scan cache directory
HDF5Files-->>GetCacheData: Return file list
GetCacheData->>HDF5Files: Read each HDF5 file
HDF5Files-->>GetCacheData: Extract file contents
GetCacheData-->>Executor: Return list of cached data
Possibly related PRs
Poem
Tip CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
executorlib/standalone/hdf.py
(2 hunks)tests/test_cache_executor_interactive.py
(1 hunks)
🔇 Additional comments (3)
executorlib/standalone/hdf.py (1)
8-15
: LGTM: Well-structured global mapping dictionary
Moving group_dict
to global scope improves code organization by making the mapping accessible across functions while maintaining a single source of truth.
tests/test_cache_executor_interactive.py (2)
1-17
: LGTM: Well-implemented test skip logic
The implementation properly handles h5py dependency and follows unittest best practices for conditional test execution.
15-18
: LGTM: Proper test class structure
The test class follows unittest conventions and is properly decorated for conditional execution.
def get_cache_data(cache_directory: str) -> List[dict]: | ||
file_lst = [] | ||
for file_name in os.listdir(cache_directory): | ||
with h5py.File(os.path.join(cache_directory, file_name), "r") as hdf: | ||
file_content_dict = { | ||
key: cloudpickle.loads(np.void(hdf["/" + key])) | ||
for key in group_dict.values() | ||
if key in hdf | ||
} | ||
file_content_dict["filename"] = file_name | ||
file_lst.append(file_content_dict) | ||
return file_lst |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling and improve resource management
The function needs additional safeguards for robustness:
- Missing directory existence check
- No error handling for invalid HDF5 files
- Resource management could be improved using context manager pattern
Consider this improved implementation:
def get_cache_data(cache_directory: str) -> List[dict]:
+ if not os.path.isdir(cache_directory):
+ raise ValueError(f"Cache directory does not exist: {cache_directory}")
+
file_lst = []
for file_name in os.listdir(cache_directory):
- with h5py.File(os.path.join(cache_directory, file_name), "r") as hdf:
- file_content_dict = {
- key: cloudpickle.loads(np.void(hdf["/" + key]))
- for key in group_dict.values()
- if key in hdf
- }
- file_content_dict["filename"] = file_name
- file_lst.append(file_content_dict)
+ file_path = os.path.join(cache_directory, file_name)
+ try:
+ with h5py.File(file_path, "r") as hdf:
+ file_content_dict = {
+ key: cloudpickle.loads(np.void(hdf["/" + key]))
+ for key in group_dict.values()
+ if key in hdf
+ }
+ file_content_dict["filename"] = file_name
+ file_lst.append(file_content_dict)
+ except (OSError, KeyError) as e:
+ # Log warning but continue processing other files
+ import warnings
+ warnings.warn(f"Failed to read cache file {file_name}: {str(e)}")
return file_lst
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def get_cache_data(cache_directory: str) -> List[dict]: | |
file_lst = [] | |
for file_name in os.listdir(cache_directory): | |
with h5py.File(os.path.join(cache_directory, file_name), "r") as hdf: | |
file_content_dict = { | |
key: cloudpickle.loads(np.void(hdf["/" + key])) | |
for key in group_dict.values() | |
if key in hdf | |
} | |
file_content_dict["filename"] = file_name | |
file_lst.append(file_content_dict) | |
return file_lst | |
def get_cache_data(cache_directory: str) -> List[dict]: | |
if not os.path.isdir(cache_directory): | |
raise ValueError(f"Cache directory does not exist: {cache_directory}") | |
file_lst = [] | |
for file_name in os.listdir(cache_directory): | |
file_path = os.path.join(cache_directory, file_name) | |
try: | |
with h5py.File(file_path, "r") as hdf: | |
file_content_dict = { | |
key: cloudpickle.loads(np.void(hdf["/" + key])) | |
for key in group_dict.values() | |
if key in hdf | |
} | |
file_content_dict["filename"] = file_name | |
file_lst.append(file_content_dict) | |
except (OSError, KeyError) as e: | |
# Log warning but continue processing other files | |
import warnings | |
warnings.warn(f"Failed to read cache file {file_name}: {str(e)}") | |
return file_lst |
def tearDown(self): | ||
if os.path.exists("cache"): | ||
shutil.rmtree("cache") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Update cleanup to use test's cache directory
The tearDown method should clean up the temporary directory used in the test, not a hardcoded path.
def tearDown(self):
- if os.path.exists("cache"):
- shutil.rmtree("cache")
+ if hasattr(self, '_testMethodName'):
+ method = getattr(self, self._testMethodName)
+ if hasattr(method, 'cache_directory'):
+ shutil.rmtree(method.cache_directory)
Committable suggestion skipped: line range outside the PR's diff.
def test_cache_data(self): | ||
cache_directory = "./cache" | ||
with Executor(backend="local", cache_directory=cache_directory) as exe: | ||
future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)] | ||
result_lst = [f.result() for f in future_lst] | ||
|
||
cache_lst = get_cache_data(cache_directory=cache_directory) | ||
self.assertEqual(sum([c["output"] for c in cache_lst]), sum(result_lst)) | ||
self.assertEqual( | ||
sum([sum(c["input_args"][0]) for c in cache_lst]), sum(result_lst) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance test coverage and use temporary directory
The test could be improved in several ways:
- Use
tempfile.mkdtemp()
instead of hardcoded "./cache" - Add test cases for error conditions
- Validate all cached fields, not just output and input_args
Consider this improved implementation:
+import tempfile
+
def test_cache_data(self):
- cache_directory = "./cache"
+ cache_directory = tempfile.mkdtemp()
with Executor(backend="local", cache_directory=cache_directory) as exe:
future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]
result_lst = [f.result() for f in future_lst]
cache_lst = get_cache_data(cache_directory=cache_directory)
+ # Validate cache size
+ self.assertEqual(len(cache_lst), len(future_lst))
+
+ # Validate all cached fields
+ for cache_entry in cache_lst:
+ self.assertIn('function', cache_entry)
+ self.assertIn('input_args', cache_entry)
+ self.assertIn('input_kwargs', cache_entry)
+ self.assertIn('output', cache_entry)
+ self.assertIn('runtime', cache_entry)
+
self.assertEqual(sum([c["output"] for c in cache_lst]), sum(result_lst))
self.assertEqual(
sum([sum(c["input_args"][0]) for c in cache_lst]), sum(result_lst)
)
+
+def test_cache_data_invalid_directory(self):
+ with self.assertRaises(ValueError):
+ get_cache_data("/nonexistent/path")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def test_cache_data(self): | |
cache_directory = "./cache" | |
with Executor(backend="local", cache_directory=cache_directory) as exe: | |
future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)] | |
result_lst = [f.result() for f in future_lst] | |
cache_lst = get_cache_data(cache_directory=cache_directory) | |
self.assertEqual(sum([c["output"] for c in cache_lst]), sum(result_lst)) | |
self.assertEqual( | |
sum([sum(c["input_args"][0]) for c in cache_lst]), sum(result_lst) | |
) | |
def test_cache_data(self): | |
cache_directory = tempfile.mkdtemp() | |
with Executor(backend="local", cache_directory=cache_directory) as exe: | |
future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)] | |
result_lst = [f.result() for f in future_lst] | |
cache_lst = get_cache_data(cache_directory=cache_directory) | |
# Validate cache size | |
self.assertEqual(len(cache_lst), len(future_lst)) | |
# Validate all cached fields | |
for cache_entry in cache_lst: | |
self.assertIn('function', cache_entry) | |
self.assertIn('input_args', cache_entry) | |
self.assertIn('input_kwargs', cache_entry) | |
self.assertIn('output', cache_entry) | |
self.assertIn('runtime', cache_entry) | |
self.assertEqual(sum([c["output"] for c in cache_lst]), sum(result_lst)) | |
self.assertEqual( | |
sum([sum(c["input_args"][0]) for c in cache_lst]), sum(result_lst) | |
) | |
def test_cache_data_invalid_directory(self): | |
with self.assertRaises(ValueError): | |
get_cache_data("/nonexistent/path") |
Example:
Summary by CodeRabbit
New Features
Bug Fixes
Tests