Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache: create method #459

Merged
merged 12 commits into from
Oct 28, 2024
31 changes: 22 additions & 9 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@
__all__ = []


try:
from executorlib.cache.executor import FileExecutor

__all__ += [FileExecutor]
except ImportError:
pass


class Executor:
"""
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
Expand All @@ -47,6 +39,7 @@ class Executor:
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -95,6 +88,7 @@ def __init__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = True,
init_function: Optional[callable] = None,
Expand All @@ -115,6 +109,7 @@ def __new__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = True,
init_function: Optional[callable] = None,
Expand Down Expand Up @@ -149,6 +144,7 @@ def __new__(
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -180,7 +176,24 @@ def __new__(
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
if not disable_dependencies:
if "pysqa_" in backend and not plot_dependency_graph:
from executorlib.cache.executor import create_file_executor

return create_file_executor(
max_workers=max_workers,
backend=backend,
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
)
Comment on lines +179 to +195
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the gathered information, I can now provide the final response:

pysqa backend implementation is properly integrated but lacks tests

The implementation shows:

  • Proper backend integration in executorlib/cache/executor.py and executorlib/cache/shared.py
  • Comprehensive error handling for unsupported features with clear error messages
  • Clear parameter documentation and type hints

However:

  • No dedicated test files found for pysqa functionality
  • No usage examples in documentation
🔗 Analysis chain

Verify pysqa backend implementation and testing.

Let's verify the existence of pysqa backend implementations and related tests:


Let me gather more information to verify the implementation and testing thoroughly.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify pysqa backend implementation and testing

# Check for pysqa backend implementations
echo "Checking for pysqa backend implementations..."
rg -l "pysqa_" --type py

# Check for related tests
echo "Checking for pysqa-related tests..."
fd -e py -x grep -l "pysqa" {} \; tests/

# Check for usage examples
echo "Checking for usage examples..."
rg -l "pysqa" docs/ || true

Length of output: 760


Script:

#!/bin/bash
# Check implementation details of pysqa backend
echo "Checking pysqa backend implementation in executor.py..."
rg "pysqa_" executorlib/cache/executor.py -A 5

echo -e "\nChecking pysqa-related shared functionality..."
rg "pysqa" executorlib/cache/shared.py -A 5

echo -e "\nLooking for tests in the correct path..."
fd -e py -x grep -l "pysqa" {} \; .

echo -e "\nChecking for error handling in pysqa implementation..."
rg "raise" -A 2 executorlib/cache/executor.py

Length of output: 3355

elif not disable_dependencies:
return ExecutorWithDependencies(
max_workers=max_workers,
backend=backend,
Expand Down
60 changes: 57 additions & 3 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
execute_in_subprocess,
terminate_subprocess,
)
from executorlib.standalone.inputcheck import (
check_executor,
check_nested_flux_executor,
)
from executorlib.standalone.thread import RaisingThread

try:
Expand All @@ -23,7 +27,7 @@ def __init__(
resource_dict: Optional[dict] = None,
execute_function: callable = execute_with_pysqa,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Expand All @@ -36,7 +40,7 @@ def __init__(
- cwd (str/None): current working directory where the parallel python task is executed
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
terminate_function (callable, optional): The function to terminate the tasks.
config_directory (str, optional): path to the config directory.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
backend (str, optional): name of the backend used to spawn tasks.
"""
super().__init__()
Expand All @@ -62,8 +66,58 @@ def __init__(
"cache_directory": cache_directory_path,
"resource_dict": resource_dict,
"terminate_function": terminate_function,
"config_directory": config_directory,
"pysqa_config_directory": pysqa_config_directory,
"backend": backend,
},
)
)


def create_file_executor(
max_workers: int = 1,
backend: str = "pysqa_flux",
max_cores: int = 1,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
):
if cache_directory is None:
cache_directory = "executorlib_cache"
if max_workers != 1:
raise ValueError(
"The number of workers cannot be controlled with the pysqa based backend."
)
if max_cores != 1:
raise ValueError(
"The number of cores cannot be controlled with the pysqa based backend."
)
if hostname_localhost is not None:
raise ValueError(
"The option to connect to hosts based on their hostname is not available with the pysqa based backend."
)
if block_allocation:
raise ValueError(
"The option block_allocation is not available with the pysqa based backend."
)
if init_function is not None:
raise ValueError(
"The option to specify an init_function is not available with the pysqa based backend."
)
if flux_executor_pmi_mode is not None:
raise ValueError(
"The option to specify the flux pmi mode is not available with the pysqa based backend."
)
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("pysqa_")[-1],
)
Comment on lines +116 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add validation for backend name format and safer string manipulation.

The backend name transformation assumes the "pysqa_" prefix exists and could fail if an invalid backend is provided.

Apply this diff:

     check_executor(executor=flux_executor)
     check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
+    if not backend.startswith("pysqa_"):
+        raise ValueError(
+            f"Backend must start with 'pysqa_'. Got: {backend}. "
+            "Supported backends: pysqa_flux, pysqa_slurm"
+        )
+    actual_backend = backend[len("pysqa_"):]
+    if actual_backend not in ["flux", "slurm"]:
+        raise ValueError(
+            f"Unsupported backend: {backend}. "
+            "Supported backends: pysqa_flux, pysqa_slurm"
+        )
     return FileExecutor(
         cache_directory=cache_directory,
         resource_dict=resource_dict,
         pysqa_config_directory=pysqa_config_directory,
-        backend=backend.split("pysqa_")[-1],
+        backend=actual_backend,
     )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("pysqa_")[-1],
)
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
if not backend.startswith("pysqa_"):
raise ValueError(
f"Backend must start with 'pysqa_'. Got: {backend}. "
"Supported backends: pysqa_flux, pysqa_slurm"
)
actual_backend = backend[len("pysqa_"):]
if actual_backend not in ["flux", "slurm"]:
raise ValueError(
f"Unsupported backend: {backend}. "
"Supported backends: pysqa_flux, pysqa_slurm"
)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=actual_backend,
)

6 changes: 3 additions & 3 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def execute_tasks_h5(
execute_function: callable,
resource_dict: dict,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
) -> None:
"""
Expand All @@ -66,7 +66,7 @@ def execute_tasks_h5(
- cwd (str/None): current working directory where the parallel python task is executed
execute_function (callable): The function to execute the tasks.
terminate_function (callable): The function to terminate the tasks.
config_directory (str, optional): path to the config directory.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
backend (str, optional): name of the backend used to spawn tasks.

Returns:
Expand Down Expand Up @@ -120,7 +120,7 @@ def execute_tasks_h5(
process_dict[k] for k in future_wait_key_lst
],
resource_dict=task_resource_dict,
config_directory=config_directory,
config_directory=pysqa_config_directory,
backend=backend,
)
file_name_dict[task_key] = os.path.join(
Expand Down
9 changes: 9 additions & 0 deletions executorlib/standalone/cache/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,14 @@ def execute_with_pysqa(
"working_directory": resource_dict["cwd"],
}
del resource_dict["cwd"]
unsupported_keys = [
"threads_per_core",
"gpus_per_core",
"openmpi_oversubscribe",
"slurm_cmd_args",
]
for k in unsupported_keys:
if k in resource_dict:
del resource_dict[k]
submit_kwargs.update(resource_dict)
return qa.submit_job(**submit_kwargs)
2 changes: 1 addition & 1 deletion tests/test_cache_executor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


try:
from executorlib import FileExecutor
from executorlib.cache.executor import FileExecutor

skip_h5py_test = False
except ImportError:
Expand Down
12 changes: 8 additions & 4 deletions tests/test_cache_executor_pysqa_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import unittest
import shutil

from executorlib import Executor
from executorlib.standalone.serialize import cloudpickle_register

try:
import flux.job
from executorlib import FileExecutor

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
Expand All @@ -30,10 +32,12 @@ def mpi_funct(i):
)
class TestCacheExecutorPysqa(unittest.TestCase):
def test_executor(self):
with FileExecutor(
resource_dict={"cores": 2},
backend="flux",
with Executor(
backend="pysqa_flux",
resource_dict={"cores": 2, "cwd": os.path.abspath("cwd")},
block_allocation=False,
) as exe:
cloudpickle_register(ind=1)
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from executorlib.standalone.thread import RaisingThread

try:
from executorlib import FileExecutor
from executorlib.cache.executor import FileExecutor
from executorlib.cache.shared import execute_tasks_h5

skip_h5py_test = False
Expand Down
Loading