Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ def __new__(
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores_per_worker (int): number of MPI cores to be used for each function call
- cores (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
- gpus_per_core (int): number of GPUs per worker - defaults to 0
Comment on lines +141 to +143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the search results, I can generate the final response as there are clear inconsistencies found in the codebase.

Documentation and code still contain references to old resource_dict keys

Several files still use the old keys and need to be updated:

  • notebooks/examples.ipynb: Contains multiple references to cores_per_worker and gpus_per_worker in documentation and code examples
  • README.md: Uses cores_per_worker in documentation
  • executorlib/__init__.py: Still documents the parameter as cores_per_worker and gpus_per_worker in docstrings
  • executorlib/standalone/inputcheck.py: Contains function and error messages using gpus_per_worker
  • tests/test_shared_input_check.py: Test cases use gpus_per_worker

While some files like executorlib/interactive/executor.py have been updated to use the new cores and gpus_per_core keys, there are still many places where the old terminology is used, which could lead to confusion.

🔗 Analysis chain

Ensure all references to old resource_dict keys are updated

The keys in resource_dict have been updated from cores_per_worker to cores and from gpus_per_worker to gpus_per_core. Please verify that all references to the old keys have been updated throughout the codebase and documentation to prevent inconsistencies.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for occurrences of deprecated resource_dict keys

# Searching for 'cores_per_worker'
rg 'cores_per_worker'

# Searching for 'gpus_per_worker'
rg 'gpus_per_worker'

Length of output: 3352

- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
and SLURM only) - default False
Expand Down
28 changes: 21 additions & 7 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
)
from executorlib.standalone.thread import RaisingThread

try:
from executorlib.standalone.cache.queue import execute_with_pysqa
except ImportError:
# If pysqa is not available fall back to executing tasks in a subprocess
execute_with_pysqa = execute_in_subprocess


class FileExecutor(ExecutorBase):
def __init__(
self,
cache_directory: str = "cache",
cores_per_worker: int = 1,
cwd: Optional[str] = None,
execute_function: callable = execute_in_subprocess,
resource_dict: Optional[dict] = None,
execute_function: callable = execute_with_pysqa,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
Expand All @@ -26,14 +31,24 @@ def __init__(

Args:
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- cwd (str/None): current working directory where the parallel python task is executed
Comment on lines +34 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove documentation for deprecated parameters.

The docstring still contains entries for the removed parameters cores_per_worker and cwd. These should be removed since they've been replaced by resource_dict.

Apply this diff to fix the docstring:

            resource_dict (dict): A dictionary of resources required by the task. With the following keys:
                              - cores (int): number of MPI cores to be used for each function call
                              - cwd (str/None): current working directory where the parallel python task is executed
            execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
-           cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
            terminate_function (callable, optional): The function to terminate the tasks.
-           cwd (str, optional): current working directory where the parallel python task is executed

Also applies to: 33-34

execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
terminate_function (callable, optional): The function to terminate the tasks.
cwd (str, optional): current working directory where the parallel python task is executed
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.
"""
super().__init__()
default_resource_dict = {
"cores": 1,
"cwd": None,
}
if resource_dict is None:
resource_dict = {}
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
Comment on lines +43 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider simplifying the resource dictionary initialization.

While the current implementation is correct, it could be more concise.

Consider this simpler implementation:

-        default_resource_dict = {
-            "cores": 1,
-            "cwd": None,
-        }
-        if resource_dict is None:
-            resource_dict = {}
-        resource_dict.update(
-            {k: v for k, v in default_resource_dict.items() if k not in resource_dict}
-        )
+        resource_dict = {
+            "cores": 1,
+            "cwd": None,
+            **(resource_dict or {})
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
default_resource_dict = {
"cores": 1,
"cwd": None,
}
if resource_dict is None:
resource_dict = {}
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
resource_dict = {
"cores": 1,
"cwd": None,
**(resource_dict or {})
}

if execute_function == execute_in_subprocess and terminate_function is None:
terminate_function = terminate_subprocess
cache_directory_path = os.path.abspath(cache_directory)
Expand All @@ -45,8 +60,7 @@ def __init__(
"future_queue": self._future_queue,
"execute_function": execute_function,
"cache_directory": cache_directory_path,
"cores_per_worker": cores_per_worker,
"cwd": cwd,
"resource_dict": resource_dict,
"terminate_function": terminate_function,
"config_directory": config_directory,
"backend": backend,
Expand Down
23 changes: 11 additions & 12 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ def execute_tasks_h5(
future_queue: queue.Queue,
cache_directory: str,
execute_function: callable,
cores_per_worker: int = 1,
cwd: Optional[str] = None,
resource_dict: dict,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
Expand All @@ -62,9 +61,10 @@ def execute_tasks_h5(
Args:
future_queue (queue.Queue): The queue containing the tasks.
cache_directory (str): The directory to store the HDF5 files.
cores_per_worker (int): The number of cores per worker.
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- cwd (str/None): current working directory where the parallel python task is executed
Comment on lines +64 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve docstring formatting for resource_dict parameter

The docstring for resource_dict can be reformatted for clarity and to adhere to standard documentation practices.

Consider applying this change:

-        resource_dict (dict): A dictionary of resources required by the task. With the following keys:
-                              - cores (int): number of MPI cores to be used for each function call
-                              - cwd (str/None): current working directory where the parallel python task is executed
+        resource_dict (dict): A dictionary of resources required by the task with the following keys:
+            - **cores** (int): Number of MPI cores to be used for each function call.
+            - **cwd** (Optional[str]): Current working directory where the parallel Python task is executed.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- cwd (str/None): current working directory where the parallel python task is executed
resource_dict (dict): A dictionary of resources required by the task with the following keys:
- **cores** (int): Number of MPI cores to be used for each function call.
- **cwd** (Optional[str]): Current working directory where the parallel Python task is executed.

execute_function (callable): The function to execute the tasks.
cwd (str/None): current working directory where the parallel python task is executed
terminate_function (callable): The function to terminate the tasks.
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.
Expand Down Expand Up @@ -97,16 +97,15 @@ def execute_tasks_h5(
memory_dict=memory_dict,
file_name_dict=file_name_dict,
)
resource_dict = task_dict["resource_dict"].copy()
if "cores" not in resource_dict:
resource_dict["cores"] = cores_per_worker
if "cwd" not in resource_dict:
resource_dict["cwd"] = cwd
task_resource_dict = task_dict["resource_dict"].copy()
task_resource_dict.update(
{k: v for k, v in resource_dict.items() if k not in task_resource_dict}
)
Comment on lines +100 to +103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle missing 'resource_dict' in task_dict to prevent KeyError

At line 100, the code assumes that task_dict always contains a 'resource_dict' key. If this key is missing, it will raise a KeyError.

Apply this diff to safely handle cases where 'resource_dict' might not be present:

-task_resource_dict = task_dict["resource_dict"].copy()
+task_resource_dict = task_dict.get("resource_dict", {}).copy()

Additionally, ensure that the subsequent code accounts for the possibility of an empty task_resource_dict.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
task_resource_dict = task_dict["resource_dict"].copy()
task_resource_dict.update(
{k: v for k, v in resource_dict.items() if k not in task_resource_dict}
)
task_resource_dict = task_dict.get("resource_dict", {}).copy()
task_resource_dict.update(
{k: v for k, v in resource_dict.items() if k not in task_resource_dict}
)

task_key, data_dict = serialize_funct_h5(
fn=task_dict["fn"],
fn_args=task_args,
fn_kwargs=task_kwargs,
resource_dict=resource_dict,
resource_dict=task_resource_dict,
)
if task_key not in memory_dict.keys():
if task_key + ".h5out" not in os.listdir(cache_directory):
Expand All @@ -115,12 +114,12 @@ def execute_tasks_h5(
process_dict[task_key] = execute_function(
command=_get_execute_command(
file_name=file_name,
cores=cores_per_worker,
cores=task_resource_dict["cores"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure 'cores' key exists in task_resource_dict before access

At line 117, accessing task_resource_dict["cores"] without checking if the 'cores' key exists may raise a KeyError if it is missing.

Consider providing a default value or adding a check:

Option 1: Provide a default value for 'cores':

-            cores=task_resource_dict["cores"],
+            cores=task_resource_dict.get("cores", 1),

Option 2: Add a check and raise an explicit error if 'cores' is missing:

if "cores" not in task_resource_dict:
    raise KeyError("'cores' key is missing in task_resource_dict")
cores = task_resource_dict["cores"]

),
task_dependent_lst=[
process_dict[k] for k in future_wait_key_lst
],
resource_dict=resource_dict,
resource_dict=task_resource_dict,
config_directory=config_directory,
backend=backend,
)
Expand Down
4 changes: 2 additions & 2 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ def create_executor(
max_cores (int): defines the number cores which can be used in parallel
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores_per_worker (int): number of MPI cores to be used for each function call
- cores (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
SLURM only) - default False
Expand Down
14 changes: 9 additions & 5 deletions tests/test_cache_executor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import shutil
import unittest

from executorlib.standalone.cache.spawner import execute_in_subprocess


try:
from executorlib import FileExecutor

skip_h5io_test = False
skip_h5py_test = False
except ImportError:
skip_h5io_test = True
skip_h5py_test = True


skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None
Expand All @@ -24,12 +26,14 @@ def mpi_funct(i):


@unittest.skipIf(
skip_h5io_test or skip_mpi4py_test,
"h5io or mpi4py are not installed, so the h5io and mpi4py tests are skipped.",
skip_h5py_test or skip_mpi4py_test,
"h5py or mpi4py are not installed, so the h5py and mpi4py tests are skipped.",
)
class TestCacheExecutorMPI(unittest.TestCase):
def test_executor(self):
with FileExecutor(cores_per_worker=2) as exe:
with FileExecutor(
resource_dict={"cores": 2}, execute_function=execute_in_subprocess
) as exe:
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
Expand Down
4 changes: 1 addition & 3 deletions tests/test_cache_executor_pysqa_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
try:
import flux.job
from executorlib import FileExecutor
from executorlib.standalone.cache.queue import execute_with_pysqa

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
Expand All @@ -32,8 +31,7 @@ def mpi_funct(i):
class TestCacheExecutorPysqa(unittest.TestCase):
def test_executor(self):
with FileExecutor(
cores_per_worker=2,
execute_function=execute_with_pysqa,
resource_dict={"cores": 2},
backend="flux",
) as exe:
fs1 = exe.submit(mpi_funct, 1)
Expand Down
28 changes: 15 additions & 13 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@
import shutil
import unittest

from executorlib.standalone.cache.spawner import (
execute_in_subprocess,
terminate_subprocess,
)
from executorlib.standalone.thread import RaisingThread

try:
from executorlib import FileExecutor
from executorlib.cache.shared import execute_tasks_h5
from executorlib.standalone.cache.spawner import (
execute_in_subprocess,
terminate_subprocess,
)

skip_h5io_test = False
skip_h5py_test = False
except ImportError:
skip_h5io_test = True
skip_h5py_test = True


def my_funct(a, b):
Expand All @@ -28,18 +28,18 @@ def list_files_in_working_directory():


@unittest.skipIf(
skip_h5io_test, "h5io is not installed, so the h5io tests are skipped."
skip_h5py_test, "h5py is not installed, so the h5py tests are skipped."
)
class TestCacheExecutorSerial(unittest.TestCase):
def test_executor_mixed(self):
with FileExecutor() as exe:
with FileExecutor(execute_function=execute_in_subprocess) as exe:
fs1 = exe.submit(my_funct, 1, b=2)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), 3)
self.assertTrue(fs1.done())

def test_executor_dependence_mixed(self):
with FileExecutor() as exe:
with FileExecutor(execute_function=execute_in_subprocess) as exe:
fs1 = exe.submit(my_funct, 1, b=2)
fs2 = exe.submit(my_funct, 1, b=fs1)
self.assertFalse(fs2.done())
Expand All @@ -48,7 +48,9 @@ def test_executor_dependence_mixed(self):

def test_executor_working_directory(self):
cwd = os.path.join(os.path.dirname(__file__), "executables")
with FileExecutor(cwd=cwd) as exe:
with FileExecutor(
resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess
) as exe:
fs1 = exe.submit(list_files_in_working_directory)
self.assertEqual(fs1.result(), os.listdir(cwd))

Expand All @@ -72,7 +74,7 @@ def test_executor_function(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"resource_dict": {"cores": 1, "cwd": None},
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -113,7 +115,7 @@ def test_executor_function_dependence_kwargs(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"resource_dict": {"cores": 1, "cwd": None},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring duplicate resource configurations.

The same resource configuration {"cores": 1, "cwd": None} is repeated across multiple test methods. Consider extracting this to a class-level constant or setUp method to improve maintainability.

class TestCacheExecutorSerial(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+        cls.default_resource_dict = {"cores": 1, "cwd": None}

     def test_executor_function(self):
         # ... existing code ...
         kwargs={
             "future_queue": q,
             "cache_directory": cache_dir,
             "execute_function": execute_in_subprocess,
-            "resource_dict": {"cores": 1, "cwd": None},
+            "resource_dict": self.default_resource_dict,
             "terminate_function": terminate_subprocess,
         }

Also applies to: 157-157

"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -154,7 +156,7 @@ def test_executor_function_dependence_args(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"resource_dict": {"cores": 1, "cwd": None},
"terminate_function": terminate_subprocess,
},
)
Expand Down
6 changes: 3 additions & 3 deletions tests/test_cache_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
try:
from executorlib.standalone.hdf import dump, load

skip_h5io_test = False
skip_h5py_test = False
except ImportError:
skip_h5io_test = True
skip_h5py_test = True


def my_funct(a, b):
return a + b


@unittest.skipIf(
skip_h5io_test, "h5io is not installed, so the h5io tests are skipped."
skip_h5py_test, "h5py is not installed, so the h5io tests are skipped."
)
class TestSharedFunctions(unittest.TestCase):
def test_hdf_mixed(self):
Expand Down
Loading