diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 329be6d0..3e95579f 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -138,9 +138,9 @@ def __new__( cache_directory (str, optional): The directory to store cache files. Defaults to "cache". max_cores (int): defines the number cores which can be used in parallel resource_dict (dict): A dictionary of resources required by the task. With the following keys: - - cores_per_worker (int): number of MPI cores to be used for each function call + - cores (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - - gpus_per_worker (int): number of GPUs per worker - defaults to 0 + - gpus_per_core (int): number of GPUs per worker - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index a2191e38..0195a14e 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -9,14 +9,19 @@ ) from executorlib.standalone.thread import RaisingThread +try: + from executorlib.standalone.cache.queue import execute_with_pysqa +except ImportError: + # If pysqa is not available fall back to executing tasks in a subprocess + execute_with_pysqa = execute_in_subprocess + class FileExecutor(ExecutorBase): def __init__( self, cache_directory: str = "cache", - cores_per_worker: int = 1, - cwd: Optional[str] = None, - execute_function: callable = execute_in_subprocess, + resource_dict: Optional[dict] = None, + execute_function: callable = execute_with_pysqa, terminate_function: Optional[callable] = None, config_directory: Optional[str] = None, backend: Optional[str] = None, @@ -26,14 +31,24 @@ def __init__( Args: cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - cwd (str/None): current working directory where the parallel python task is executed execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. - cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1. terminate_function (callable, optional): The function to terminate the tasks. - cwd (str, optional): current working directory where the parallel python task is executed config_directory (str, optional): path to the config directory. backend (str, optional): name of the backend used to spawn tasks. """ super().__init__() + default_resource_dict = { + "cores": 1, + "cwd": None, + } + if resource_dict is None: + resource_dict = {} + resource_dict.update( + {k: v for k, v in default_resource_dict.items() if k not in resource_dict} + ) if execute_function == execute_in_subprocess and terminate_function is None: terminate_function = terminate_subprocess cache_directory_path = os.path.abspath(cache_directory) @@ -45,8 +60,7 @@ def __init__( "future_queue": self._future_queue, "execute_function": execute_function, "cache_directory": cache_directory_path, - "cores_per_worker": cores_per_worker, - "cwd": cwd, + "resource_dict": resource_dict, "terminate_function": terminate_function, "config_directory": config_directory, "backend": backend, diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index f89e279a..dd09542b 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -50,8 +50,7 @@ def execute_tasks_h5( future_queue: queue.Queue, cache_directory: str, execute_function: callable, - cores_per_worker: int = 1, - cwd: Optional[str] = None, + resource_dict: dict, terminate_function: Optional[callable] = None, config_directory: Optional[str] = None, backend: Optional[str] = None, @@ -62,9 +61,10 @@ def execute_tasks_h5( Args: future_queue (queue.Queue): The queue containing the tasks. cache_directory (str): The directory to store the HDF5 files. - cores_per_worker (int): The number of cores per worker. + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - cwd (str/None): current working directory where the parallel python task is executed execute_function (callable): The function to execute the tasks. - cwd (str/None): current working directory where the parallel python task is executed terminate_function (callable): The function to terminate the tasks. config_directory (str, optional): path to the config directory. backend (str, optional): name of the backend used to spawn tasks. @@ -97,16 +97,15 @@ def execute_tasks_h5( memory_dict=memory_dict, file_name_dict=file_name_dict, ) - resource_dict = task_dict["resource_dict"].copy() - if "cores" not in resource_dict: - resource_dict["cores"] = cores_per_worker - if "cwd" not in resource_dict: - resource_dict["cwd"] = cwd + task_resource_dict = task_dict["resource_dict"].copy() + task_resource_dict.update( + {k: v for k, v in resource_dict.items() if k not in task_resource_dict} + ) task_key, data_dict = serialize_funct_h5( fn=task_dict["fn"], fn_args=task_args, fn_kwargs=task_kwargs, - resource_dict=resource_dict, + resource_dict=task_resource_dict, ) if task_key not in memory_dict.keys(): if task_key + ".h5out" not in os.listdir(cache_directory): @@ -115,12 +114,12 @@ def execute_tasks_h5( process_dict[task_key] = execute_function( command=_get_execute_command( file_name=file_name, - cores=cores_per_worker, + cores=task_resource_dict["cores"], ), task_dependent_lst=[ process_dict[k] for k in future_wait_key_lst ], - resource_dict=resource_dict, + resource_dict=task_resource_dict, config_directory=config_directory, backend=backend, ) diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 8228a234..52a9fb80 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -175,9 +175,9 @@ def create_executor( max_cores (int): defines the number cores which can be used in parallel cache_directory (str, optional): The directory to store cache files. Defaults to "cache". resource_dict (dict): A dictionary of resources required by the task. With the following keys: - - cores_per_worker (int): number of MPI cores to be used for each function call + - cores (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - - gpus_per_worker (int): number of GPUs per worker - defaults to 0 + - gpus_per_core (int): number of GPUs per worker - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False diff --git a/tests/test_cache_executor_mpi.py b/tests/test_cache_executor_mpi.py index 8fa66ccd..bc5e1226 100644 --- a/tests/test_cache_executor_mpi.py +++ b/tests/test_cache_executor_mpi.py @@ -3,13 +3,15 @@ import shutil import unittest +from executorlib.standalone.cache.spawner import execute_in_subprocess + try: from executorlib import FileExecutor - skip_h5io_test = False + skip_h5py_test = False except ImportError: - skip_h5io_test = True + skip_h5py_test = True skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None @@ -24,12 +26,14 @@ def mpi_funct(i): @unittest.skipIf( - skip_h5io_test or skip_mpi4py_test, - "h5io or mpi4py are not installed, so the h5io and mpi4py tests are skipped.", + skip_h5py_test or skip_mpi4py_test, + "h5py or mpi4py are not installed, so the h5py and mpi4py tests are skipped.", ) class TestCacheExecutorMPI(unittest.TestCase): def test_executor(self): - with FileExecutor(cores_per_worker=2) as exe: + with FileExecutor( + resource_dict={"cores": 2}, execute_function=execute_in_subprocess + ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index eb71c794..545f9f25 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -6,7 +6,6 @@ try: import flux.job from executorlib import FileExecutor - from executorlib.standalone.cache.queue import execute_with_pysqa skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) @@ -32,8 +31,7 @@ def mpi_funct(i): class TestCacheExecutorPysqa(unittest.TestCase): def test_executor(self): with FileExecutor( - cores_per_worker=2, - execute_function=execute_with_pysqa, + resource_dict={"cores": 2}, backend="flux", ) as exe: fs1 = exe.submit(mpi_funct, 1) diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 491167dc..96aa2df0 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -4,19 +4,19 @@ import shutil import unittest +from executorlib.standalone.cache.spawner import ( + execute_in_subprocess, + terminate_subprocess, +) from executorlib.standalone.thread import RaisingThread try: from executorlib import FileExecutor from executorlib.cache.shared import execute_tasks_h5 - from executorlib.standalone.cache.spawner import ( - execute_in_subprocess, - terminate_subprocess, - ) - skip_h5io_test = False + skip_h5py_test = False except ImportError: - skip_h5io_test = True + skip_h5py_test = True def my_funct(a, b): @@ -28,18 +28,18 @@ def list_files_in_working_directory(): @unittest.skipIf( - skip_h5io_test, "h5io is not installed, so the h5io tests are skipped." + skip_h5py_test, "h5py is not installed, so the h5py tests are skipped." ) class TestCacheExecutorSerial(unittest.TestCase): def test_executor_mixed(self): - with FileExecutor() as exe: + with FileExecutor(execute_function=execute_in_subprocess) as exe: fs1 = exe.submit(my_funct, 1, b=2) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), 3) self.assertTrue(fs1.done()) def test_executor_dependence_mixed(self): - with FileExecutor() as exe: + with FileExecutor(execute_function=execute_in_subprocess) as exe: fs1 = exe.submit(my_funct, 1, b=2) fs2 = exe.submit(my_funct, 1, b=fs1) self.assertFalse(fs2.done()) @@ -48,7 +48,9 @@ def test_executor_dependence_mixed(self): def test_executor_working_directory(self): cwd = os.path.join(os.path.dirname(__file__), "executables") - with FileExecutor(cwd=cwd) as exe: + with FileExecutor( + resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess + ) as exe: fs1 = exe.submit(list_files_in_working_directory) self.assertEqual(fs1.result(), os.listdir(cwd)) @@ -72,7 +74,7 @@ def test_executor_function(self): "future_queue": q, "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "cores_per_worker": 1, + "resource_dict": {"cores": 1, "cwd": None}, "terminate_function": terminate_subprocess, }, ) @@ -113,7 +115,7 @@ def test_executor_function_dependence_kwargs(self): "future_queue": q, "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "cores_per_worker": 1, + "resource_dict": {"cores": 1, "cwd": None}, "terminate_function": terminate_subprocess, }, ) @@ -154,7 +156,7 @@ def test_executor_function_dependence_args(self): "future_queue": q, "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "cores_per_worker": 1, + "resource_dict": {"cores": 1, "cwd": None}, "terminate_function": terminate_subprocess, }, ) diff --git a/tests/test_cache_hdf.py b/tests/test_cache_hdf.py index 47e6b8f4..f7143f3e 100644 --- a/tests/test_cache_hdf.py +++ b/tests/test_cache_hdf.py @@ -6,9 +6,9 @@ try: from executorlib.standalone.hdf import dump, load - skip_h5io_test = False + skip_h5py_test = False except ImportError: - skip_h5io_test = True + skip_h5py_test = True def my_funct(a, b): @@ -16,7 +16,7 @@ def my_funct(a, b): @unittest.skipIf( - skip_h5io_test, "h5io is not installed, so the h5io tests are skipped." + skip_h5py_test, "h5py is not installed, so the h5io tests are skipped." ) class TestSharedFunctions(unittest.TestCase): def test_hdf_mixed(self):