Skip to content

Add type checking with mypy #535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/mypy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: MyPy

on:
push:
branches: [ main ]
pull_request:

jobs:
mypy:
runs-on: ubuntu-latest
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.13"
architecture: x64
Comment on lines +15 to +16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider using a stable Python version

Python 3.13 is currently in development/alpha and not recommended for CI environments. Consider using the latest stable version (3.12) instead.

-          python-version: "3.13"
+          python-version: "3.12"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
python-version: "3.13"
architecture: x64
python-version: "3.12"
architecture: x64

- name: Checkout
uses: actions/checkout@v4
- name: Install mypy
run: pip install mypy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Pin MyPy version for consistent CI results

Installing MyPy without version pinning could lead to inconsistent results as new versions are released.

-        run: pip install mypy
+        run: pip install 'mypy==1.8.0'
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
run: pip install mypy
run: pip install 'mypy==1.8.0'

- name: Test
run: mypy --ignore-missing-imports ${{ github.event.repository.name }}
10 changes: 5 additions & 5 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import Callable, Optional

from executorlib._version import get_versions as _get_versions
from executorlib.interactive.executor import (
Expand All @@ -16,7 +16,7 @@
)

__version__ = _get_versions()["version"]
__all__ = []
__all__: list = []


class Executor:
Expand Down Expand Up @@ -100,7 +100,7 @@ def __init__(
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
init_function: Optional[Callable] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
Expand All @@ -123,7 +123,7 @@ def __new__(
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
init_function: Optional[Callable] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
Expand Down Expand Up @@ -177,7 +177,7 @@ def __new__(
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.

"""
default_resource_dict = {
default_resource_dict: dict = {
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
Expand Down
6 changes: 3 additions & 3 deletions executorlib/backend/cache_parallel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pickle
import sys
import time
from typing import Any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove unused import

The Any type from typing module is imported but never used in the code.

Apply this diff to remove the unused import:

-from typing import Any
🧰 Tools
🪛 Ruff (0.8.2)

4-4: typing.Any imported but unused

Remove unused import: typing.Any

(F401)


import cloudpickle

Expand All @@ -24,7 +25,7 @@ def main() -> None:
"""
from mpi4py import MPI

MPI.pickle.__init__(
MPI.pickle.__init__( # type: ignore
cloudpickle.dumps,
cloudpickle.loads,
pickle.HIGHEST_PROTOCOL,
Expand All @@ -34,10 +35,9 @@ def main() -> None:
file_name = sys.argv[1]

time_start = time.time()
apply_dict = {}
if mpi_rank_zero:
apply_dict = backend_load_file(file_name=file_name)
else:
apply_dict = None
apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0)
output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
if mpi_size_larger_one:
Expand Down
12 changes: 6 additions & 6 deletions executorlib/backend/interactive_parallel.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import pickle
import sys
from os.path import abspath
from typing import Optional

import cloudpickle
import zmq

from executorlib.standalone.interactive.backend import call_funct, parse_arguments
from executorlib.standalone.interactive.communication import (
Expand All @@ -24,7 +26,7 @@ def main() -> None:
"""
from mpi4py import MPI

MPI.pickle.__init__(
MPI.pickle.__init__( # type: ignore
cloudpickle.dumps,
cloudpickle.loads,
pickle.HIGHEST_PROTOCOL,
Expand All @@ -33,13 +35,12 @@ def main() -> None:
mpi_size_larger_one = MPI.COMM_WORLD.Get_size() > 1

argument_dict = parse_arguments(argument_lst=sys.argv)
context: Optional[zmq.Context] = None
socket: Optional[zmq.Socket] = None
if mpi_rank_zero:
context, socket = interface_connect(
host=argument_dict["host"], port=argument_dict["zmqport"]
)
else:
context = None
socket = None

memory = None

Expand All @@ -50,10 +51,9 @@ def main() -> None:

while True:
# Read from socket
input_dict: dict = {}
if mpi_rank_zero:
input_dict = interface_receive(socket=socket)
else:
input_dict = None
input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0)

# Parse input
Expand Down
46 changes: 25 additions & 21 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from concurrent.futures import (
Future,
)
from typing import Optional
from typing import Callable, List, Optional, Union

from executorlib.standalone.inputcheck import check_resource_dict
from executorlib.standalone.queue import cancel_items_in_queue
Expand All @@ -27,8 +27,8 @@ def __init__(self, max_cores: Optional[int] = None):
"""
cloudpickle_register(ind=3)
self._max_cores = max_cores
self._future_queue: queue.Queue = queue.Queue()
self._process: Optional[RaisingThread] = None
self._future_queue: Optional[queue.Queue] = queue.Queue()
self._process: Optional[Union[RaisingThread, List[RaisingThread]]] = None

@property
def info(self) -> Optional[dict]:
Expand All @@ -39,21 +39,21 @@ def info(self) -> Optional[dict]:
Optional[dict]: Information about the executor.
"""
if self._process is not None and isinstance(self._process, list):
meta_data_dict = self._process[0]._kwargs.copy()
meta_data_dict = self._process[0].get_kwargs().copy()
if "future_queue" in meta_data_dict.keys():
del meta_data_dict["future_queue"]
meta_data_dict["max_workers"] = len(self._process)
return meta_data_dict
elif self._process is not None:
meta_data_dict = self._process._kwargs.copy()
meta_data_dict = self._process.get_kwargs().copy()
if "future_queue" in meta_data_dict.keys():
del meta_data_dict["future_queue"]
return meta_data_dict
else:
return None

@property
def future_queue(self) -> queue.Queue:
def future_queue(self) -> Optional[queue.Queue]:
"""
Get the future queue.

Expand All @@ -62,7 +62,7 @@ def future_queue(self) -> queue.Queue:
"""
return self._future_queue

def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Future: # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid mutable defaults for resource_dict.
It’s a best practice to avoid using mutable objects (dict, list) as default parameters.

-def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
+def submit(self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs) -> Future:
    ...
+    if resource_dict is None:
+        resource_dict = {}

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.8.2)

65-65: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

"""
Submits a callable to be executed with the given arguments.

Expand Down Expand Up @@ -97,16 +97,17 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut
"The specified number of cores is larger than the available number of cores."
)
check_resource_dict(function=fn)
f = Future()
self._future_queue.put(
{
"fn": fn,
"args": args,
"kwargs": kwargs,
"future": f,
"resource_dict": resource_dict,
}
)
f: Future = Future()
if self._future_queue is not None:
self._future_queue.put(
{
"fn": fn,
"args": args,
"kwargs": kwargs,
"future": f,
"resource_dict": resource_dict,
}
)
return f

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
Expand All @@ -124,11 +125,11 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
futures. Futures that are completed or running will not be
cancelled.
"""
if cancel_futures:
if cancel_futures and self._future_queue is not None:
cancel_items_in_queue(que=self._future_queue)
if self._process is not None:
if self._process is not None and self._future_queue is not None:
self._future_queue.put({"shutdown": True, "wait": wait})
if wait:
if wait and isinstance(self._process, RaisingThread):
self._process.join()
self._future_queue.join()
self._process = None
Expand All @@ -151,7 +152,10 @@ def __len__(self) -> int:
Returns:
int: The length of the executor.
"""
return self._future_queue.qsize()
queue_size = 0
if self._future_queue is not None:
queue_size = self._future_queue.qsize()
return queue_size

def __del__(self):
"""
Expand Down
18 changes: 9 additions & 9 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Optional
from typing import Callable, Optional

from executorlib.base.executor import ExecutorBase
from executorlib.cache.shared import execute_tasks_h5
Expand All @@ -21,16 +21,16 @@
from executorlib.cache.queue_spawner import execute_with_pysqa
except ImportError:
# If pysqa is not available fall back to executing tasks in a subprocess
execute_with_pysqa = execute_in_subprocess
execute_with_pysqa = execute_in_subprocess # type: ignore


class FileExecutor(ExecutorBase):
def __init__(
self,
cache_directory: str = "cache",
resource_dict: Optional[dict] = None,
execute_function: callable = execute_with_pysqa,
terminate_function: Optional[callable] = None,
execute_function: Callable = execute_with_pysqa,
terminate_function: Optional[Callable] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
disable_dependencies: bool = False,
Expand All @@ -43,8 +43,8 @@ def __init__(
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- cwd (str/None): current working directory where the parallel python task is executed
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
terminate_function (callable, optional): The function to terminate the tasks.
execute_function (Callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
terminate_function (Callable, optional): The function to terminate the tasks.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
backend (str, optional): name of the backend used to spawn tasks.
disable_dependencies (boolean): Disable resolving future objects during the submission.
Expand Down Expand Up @@ -81,9 +81,9 @@ def __init__(


def create_file_executor(
max_workers: int = 1,
max_workers: Optional[int] = None,
backend: str = "flux_submission",
max_cores: int = 1,
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
Expand All @@ -93,7 +93,7 @@ def create_file_executor(
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
init_function: Optional[Callable] = None,
disable_dependencies: bool = False,
):
if cache_directory is None:
Expand Down
2 changes: 1 addition & 1 deletion executorlib/cache/queue_spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def execute_with_pysqa(
config_directory: Optional[str] = None,
backend: Optional[str] = None,
cache_directory: Optional[str] = None,
) -> Tuple[int, int]:
) -> Optional[int]:
"""
Execute a command by submitting it to the queuing system

Expand Down
16 changes: 9 additions & 7 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import queue
import sys
from concurrent.futures import Future
from typing import Optional, Tuple
from typing import Any, Callable, Optional, Tuple

from executorlib.standalone.command import get_command_path
from executorlib.standalone.hdf import dump, get_output
Expand All @@ -21,7 +21,7 @@ def __init__(self, file_name: str):
"""
self._file_name = file_name

def result(self) -> str:
def result(self) -> Any:
"""
Get the result of the future item.

Expand Down Expand Up @@ -49,9 +49,9 @@ def done(self) -> bool:
def execute_tasks_h5(
future_queue: queue.Queue,
cache_directory: str,
execute_function: callable,
execute_function: Callable,
resource_dict: dict,
terminate_function: Optional[callable] = None,
terminate_function: Optional[Callable] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
disable_dependencies: bool = False,
Expand All @@ -65,16 +65,18 @@ def execute_tasks_h5(
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- cwd (str/None): current working directory where the parallel python task is executed
execute_function (callable): The function to execute the tasks.
terminate_function (callable): The function to terminate the tasks.
execute_function (Callable): The function to execute the tasks.
terminate_function (Callable): The function to terminate the tasks.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
backend (str, optional): name of the backend used to spawn tasks.

Returns:
None

"""
memory_dict, process_dict, file_name_dict = {}, {}, {}
memory_dict: dict = {}
process_dict: dict = {}
file_name_dict: dict = {}
while True:
task_dict = None
try:
Expand Down
Loading
Loading