Skip to content

Commit

Permalink
Add Mypy checks
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil committed Mar 9, 2022
1 parent 6a817fd commit bcbdb72
Show file tree
Hide file tree
Showing 26 changed files with 185 additions and 104 deletions.
26 changes: 26 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ workflows:
test:
jobs:
- static-checks
- mypy
- test:
name: test-python<< matrix.python_version >>
matrix:
Expand Down Expand Up @@ -49,6 +50,31 @@ jobs:
- ~/.cache/pre-commit
- ~/.pyenv/versions/

mypy:
description: "Mypy"
executor:
name: docker-executor
python_version: "3.9"
steps:
- checkout
- restore_cache:
keys:
- mypy-{{ .Branch }}-{{ checksum "setup.cfg" }}-{{ checksum "/home/circleci/.pyenv/version" }}
- mypy-main-{{ checksum "setup.cfg" }}-{{ checksum "/home/circleci/.pyenv/version" }}
- run:
name: Install Dependencies
command: pip install -U -e .[mypy]
- run:
name: Run Mypy
command: |
mypy --version
mypy astronomer/
- save_cache:
paths:
- ~/.cache/pip
- ~/.pyenv/versions/
key: mypy-{{ .Branch }}-{{ checksum "setup.cfg" }}-{{ checksum "/home/circleci/.pyenv/version" }}

test:
parameters:
python_version:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ docs/*/_api/
test-report/
.coverage
coverage.xml

# Mypy Cache
.mypy_cache
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ run-mypy: ## Run MyPy in Container
docker build -f dev/Dockerfile . -t astronomer-providers-dev
docker run -v `pwd`:/usr/local/airflow/astronomer_providers -v `pwd`/dev/.cache:/home/astro/.cache \
-w /usr/local/airflow/astronomer_providers \
--rm -it astronomer-providers-dev -- mypy --install-types $(RUN_ARGS)
--rm -it astronomer-providers-dev \
-- mypy --install-types --cache-dir /home/astro/.cache/.mypy_cache $(RUN_ARGS)

help: ## Prints this message
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
14 changes: 7 additions & 7 deletions astronomer/providers/amazon/aws/hooks/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,14 @@ async def is_keys_unchanged(

if current_num_objects >= min_objects:
success_message = (
"SUCCESS: \nSensor found %s objects at %s.\n"
"Waited at least %s seconds, with no new objects uploaded.",
current_num_objects,
path,
inactivity_period,
"SUCCESS: Sensor found %s objects at %s. "
"Waited at least %s seconds, with no new objects uploaded."
)
self.log.info(success_message)
return {"status": "success", "message": success_message}
self.log.info(success_message, current_num_objects, path, inactivity_period)
return {
"status": "success",
"message": success_message % (current_num_objects, path, inactivity_period),
}

self.log.error("FAILURE: Inactivity Period passed, not enough objects found in %s", path)
return {
Expand Down
4 changes: 2 additions & 2 deletions astronomer/providers/amazon/aws/operators/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def execute(self, context: "Context") -> None:
"Unable to resume cluster since cluster is currently in status: %s", cluster_state
)

def execute_complete(self, context: Dict[Any, Any], event: Any = None) -> None:
def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
Expand Down Expand Up @@ -108,7 +108,7 @@ def execute(self, context: "Context") -> None:
"Unable to pause cluster since cluster is currently in status: %s", cluster_state
)

def execute_complete(self, context: Dict[Any, Any], event: Any = None) -> None:
def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(
self.poll_interval = poll_interval
super().__init__(**kwargs)

def execute(self, context: Dict[Any, Any]) -> None:
def execute(self, context: Dict[str, Any]) -> None:
self.defer(
timeout=self.execution_timeout,
trigger=RedshiftClusterSensorTrigger(
Expand Down
12 changes: 6 additions & 6 deletions astronomer/providers/amazon/aws/sensors/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _resolve_bucket_and_key(self) -> None:
if parsed_url.scheme != "" or parsed_url.netloc != "":
raise AirflowException("If bucket_name provided, bucket_key must be relative path, not URI.")

def execute(self, context: Dict[Any, Any]) -> None:
def execute(self, context: Dict[str, Any]) -> None:
self._resolve_bucket_and_key()
self.defer(
timeout=self.execution_timeout,
Expand All @@ -89,7 +89,7 @@ def execute(self, context: Dict[Any, Any]) -> None:
method_name="execute_complete",
)

def execute_complete(self, context: Dict[Any, Any], event: Any = None) -> None:
def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None:
if event["status"] == "error":
raise AirflowException(event["message"])
return None
Expand Down Expand Up @@ -137,7 +137,7 @@ def __init__(
super().__init__(**kwargs)
self.check_fn_user = check_fn

def execute(self, context: Dict[Any, Any]) -> None:
def execute(self, context: Dict[str, Any]) -> None:
self._resolve_bucket_and_key()
self.defer(
timeout=self.execution_timeout,
Expand All @@ -152,7 +152,7 @@ def execute(self, context: Dict[Any, Any]) -> None:
method_name="execute_complete",
)

def execute_complete(self, context: Dict[Any, Any], event: Any = None) -> None:
def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None:
if event["status"] == "error":
raise AirflowException(event["message"])
return None
Expand Down Expand Up @@ -219,7 +219,7 @@ def __init__(
self.verify = verify
self.last_activity_time: Optional[datetime] = None

def execute(self, context: Dict[Any, Any]) -> None:
def execute(self, context: Dict[str, Any]) -> None:
self.defer(
timeout=self.execution_timeout,
trigger=S3KeysUnchangedTrigger(
Expand All @@ -237,7 +237,7 @@ def execute(self, context: Dict[Any, Any]) -> None:
method_name="execute_complete",
)

def execute_complete(self, context: Dict[Any, Any], event: Any = None) -> None:
def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None:
if event["status"] == "error":
raise AirflowException(event["message"])
return None
10 changes: 5 additions & 5 deletions astronomer/providers/cncf/kubernetes/hooks/kubernetes_async.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import aiofiles # type: ignore[import]
import aiofiles
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from kubernetes_asyncio import client, config
Expand All @@ -19,13 +19,13 @@ async def _load_config(self) -> client.ApiClient:
extras = {}
in_cluster = self._coalesce_param(
self.in_cluster, extras.get("extra__kubernetes__in_cluster") or None
) # type: ignore[no-untyped-call]
)
cluster_context = self._coalesce_param(
self.cluster_context, extras.get("extra__kubernetes__cluster_context") or None
) # type: ignore[no-untyped-call]
)
kubeconfig_path = self._coalesce_param(
self.config_file, extras.get("extra__kubernetes__kube_config_path") or None
) # type: ignore[no-untyped-call]
)
kubeconfig = extras.get("extra__kubernetes__kube_config") or None
num_selected_configuration = len([o for o in [in_cluster, kubeconfig, kubeconfig_path] if o])

Expand All @@ -50,7 +50,7 @@ async def _load_config(self) -> client.ApiClient:
return client.ApiClient()

if kubeconfig is not None:
async with aiofiles.tempfile.NamedTemporaryFile() as temp_config:
async with aiofiles.tempfile.NamedTemporaryFile() as temp_config: # type: ignore[attr-defined]
self.log.debug("loading kube_config from: connection kube_config")
temp_config.write(kubeconfig.encode())
temp_config.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def raise_for_trigger_status(event: Dict[str, Any]) -> None:
raise AirflowException(description)

def execute(self, context: Context) -> None:
self.pod_request_obj = self.build_pod_request_obj(context) # type: ignore[no-untyped-call]
self.pod_request_obj = self.build_pod_request_obj(context)
self.pod = self.get_or_create_pod(self.pod_request_obj, context)
self.defer(
trigger=WaitContainerTrigger(
Expand All @@ -62,7 +62,7 @@ def execute(self, context: Context) -> None:
def execute_complete(self, context: Context, event: Dict[str, Any]) -> Any:
remote_pod = None
try:
self.pod_request_obj = self.build_pod_request_obj(context) # type: ignore[no-untyped-call]
self.pod_request_obj = self.build_pod_request_obj(context)
self.pod = self.find_pod(
namespace=self.namespace or self.pod_request_obj.metadata.namespace,
context=context,
Expand All @@ -80,7 +80,7 @@ def execute_complete(self, context: Context, event: Dict[str, Any]) -> Any:
container_name=self.BASE_CONTAINER_NAME,
)
if self.do_xcom_push:
result = self.extract_xcom(pod=self.pod) # type: ignore[no-untyped-call]
result = self.extract_xcom(pod=self.pod)
remote_pod = self.pod_manager.await_pod_completion(self.pod)
finally:
self.cleanup(
Expand Down
15 changes: 5 additions & 10 deletions astronomer/providers/cncf/kubernetes/triggers/wait_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,27 @@ class WaitContainerTrigger(BaseTrigger):
Next, waits for ``container_name`` to reach a terminal state.
:param kubernetes_conn_id: Airflow connection ID to use
:type kubernetes_conn_id: str
:param hook_params: kwargs for hook
:type hook_params: dict
:param container_name: container to wait for
:type container_name: str
:param pod_name: name of pod to monitor
:type pod_name: str
:param pod_namespace: pod namespace
:type pod_namespace: str
:param pending_phase_timeout: max time in seconds to wait for pod to leave pending phase
:type pending_phase_timeout: float
:param poll_interval: number of seconds between reading pod state
:type poll_interval: float
"""

def __init__(
self,
*,
container_name: str,
pod_name: str,
pod_namespace: str,
kubernetes_conn_id: Optional[str] = None,
hook_params: Optional[Dict[str, Any]] = None,
container_name: str = None,
pod_name: Optional[str] = None,
pod_namespace: Optional[str] = None,
pending_phase_timeout: float = 120,
poll_interval: float = 5,
):
super().__init__()
self.kubernetes_conn_id = kubernetes_conn_id
self.hook_params = hook_params
self.container_name = container_name
Expand Down
14 changes: 11 additions & 3 deletions astronomer/providers/core/sensors/external_task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from airflow.exceptions import AirflowException
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.session import provide_session
Expand All @@ -7,9 +10,12 @@
TaskStateTrigger,
)

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session


class ExternalTaskSensorAsync(ExternalTaskSensor):
def execute(self, context):
def execute(self, context: Dict[str, Any]) -> None:
"""
Logic that the sensor uses to correctly identify which trigger to
execute, and defer execution as expected.
Expand Down Expand Up @@ -44,7 +50,9 @@ def execute(self, context):
)

@provide_session
def execute_complete(self, context, session, event=None):
def execute_complete(
self, context: Dict[str, Any], session: "Session", event: Optional[Dict[str, Any]] = None
) -> None:
"""
Callback for when the trigger fires - returns immediately.
Verifies that there is a success status for each task via execution date.
Expand All @@ -60,7 +68,7 @@ def execute_complete(self, context, session, event=None):
raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
return None

def get_execution_dates(self, context):
def get_execution_dates(self, context: Dict[str, Any]) -> List[datetime.datetime]:
"""
Helper function to set execution dates depending on which context and/or
internal fields are populated.
Expand Down
9 changes: 3 additions & 6 deletions astronomer/providers/core/sensors/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from typing import Any, Dict, Optional

from airflow.hooks.filesystem import FSHook
from airflow.sensors.filesystem import FileSensor
Expand All @@ -17,16 +18,13 @@ class FileSensorAsync(FileSensor):
any files exist inside it (either directly, or within a subdirectory)
:param fs_conn_id: reference to the File (path)
:type fs_conn_id: str
:param filepath: File or folder name (relative to the base path set within the connection), can
be a glob.
:type filepath: str
:param recursive: when set to ``True``, enables recursive directory matching behavior of
``**`` in glob filepath parameter. Defaults to ``False``.
:type recursive: bool
"""

def execute(self, context):
def execute(self, context: Dict[str, Any]) -> None:
if not self.poke(context=context):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
Expand All @@ -43,11 +41,10 @@ def execute(self, context):
method_name="execute_complete",
)

def execute_complete(self, context, event=None):
def execute_complete(self, context: Dict[str, Any], event: Optional[Dict[str, Any]]) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
self.log.info("%s completed successfully.", self.task_id)
return None
Loading

0 comments on commit bcbdb72

Please sign in to comment.