Skip to content

Commit

Permalink
Add pydocstyle for docstring checks
Browse files Browse the repository at this point in the history
pydocstyle is a static analysis tool for checking compliance with Python docstring conventions.

https://www.pydocstyle.org/en/stable/index.html
  • Loading branch information
kaxil committed Mar 9, 2022
1 parent bd07c3d commit 1d21f65
Show file tree
Hide file tree
Showing 38 changed files with 312 additions and 235 deletions.
24 changes: 24 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
repos:
- repo: meta
hooks:
- id: check-hooks-apply
- id: check-useless-excludes
- repo: https://github.com/psf/black
rev: 22.1.0
hooks:
Expand Down Expand Up @@ -51,3 +55,23 @@ repos:
- id: python-no-log-warn
- id: python-check-mock-methods
- id: python-check-blanket-type-ignore

- repo: https://github.com/Lucas-C/pre-commit-hooks
rev: v1.1.13
hooks:
- id: forbid-crlf
- id: remove-crlf
- id: forbid-tabs
- id: remove-tabs

- repo: https://github.com/pycqa/pydocstyle
rev: 6.1.1
hooks:
- id: pydocstyle
name: Run pydocstyle
exclude: |
^tests/.*\.py$|
^scripts/.*\.py$|
^dev|
.*example_dags/.*|
additional_dependencies: ['toml']
5 changes: 2 additions & 3 deletions astronomer/providers/amazon/aws/hooks/base_aws_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@


class AwsBaseHookAsync(AwsBaseHook):
"""
Interacts with AWS using aiobotocore asynchronously
"""
"""Interacts with AWS using aiobotocore asynchronously."""

async def get_client_async(self) -> AioBaseClient:
"""Create an Async Client object to communicate with AWS services."""
# Fetch the Airflow connection object
connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
extra_config = connection_object.extra_dejson
Expand Down
6 changes: 2 additions & 4 deletions astronomer/providers/amazon/aws/hooks/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@


class RedshiftHookAsync(AwsBaseHookAsync):
"""
Interact with AWS Redshift using aiobotocore python library
"""
"""Interact with AWS Redshift using aiobotocore python library"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs["client_type"] = "redshift"
Expand Down Expand Up @@ -88,7 +86,7 @@ async def get_cluster_status(
self, cluster_identifier: str, expected_state: str, flag: asyncio.Event
) -> Dict[str, Any]:
"""
make call self.cluster_status to know the status and run till the expected_state is met and set the flag
Make call self.cluster_status to know the status and run till the expected_state is met and set the flag
:param cluster_identifier: unique identifier of a cluster
:param expected_state: expected_state example("available", "pausing", "paused"")
Expand Down
2 changes: 1 addition & 1 deletion astronomer/providers/amazon/aws/hooks/redshift_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from snowflake.connector.util_text import split_statements


class RedshiftDataHook(AwsBaseHook):
class RedshiftDataHook(AwsBaseHook): # noqa: D101
def __init__(self, *args: Any, **kwargs: Any) -> None:
client_type: str = "redshift-data"
kwargs["client_type"] = "redshift-data"
Expand Down
2 changes: 1 addition & 1 deletion astronomer/providers/amazon/aws/hooks/redshift_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
log = logging.getLogger(__name__)


class RedshiftSQLHookAsync(RedshiftDataHook):
class RedshiftSQLHookAsync(RedshiftDataHook): # noqa: D101
async def get_query_status(self, query_ids: List[str]) -> Dict[str, Union[str, List[str]]]:
"""
Async function to get the Query status by query Ids, this function
Expand Down
4 changes: 1 addition & 3 deletions astronomer/providers/amazon/aws/hooks/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@


class S3HookAsync(AwsBaseHookAsync):
"""
Interact with AWS S3, using the aiobotocore library.
"""
"""Interact with AWS S3, using the aiobotocore library."""

conn_type = "s3"
hook_name = "S3"
Expand Down
8 changes: 8 additions & 0 deletions astronomer/providers/amazon/aws/operators/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def __init__(
super().__init__(**kwargs)

def execute(self, context: "Context") -> None:
"""
Logic that the operator uses to correctly identify which trigger to
execute, and defer execution as expected.
"""
redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
if cluster_state == "paused":
Expand Down Expand Up @@ -89,6 +93,10 @@ def __init__(
super().__init__(**kwargs)

def execute(self, context: "Context") -> None:
"""
Logic that the operator uses to correctly identify which trigger to
execute, and defer execution as expected.
"""
redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
if cluster_state == "available":
Expand Down
6 changes: 2 additions & 4 deletions astronomer/providers/amazon/aws/operators/redshift_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@


class RedshiftSQLOperatorAsync(RedshiftSQLOperator):
"""
Executes SQL Statements against an Amazon Redshift cluster
"""
"""Executes SQL Statements against an Amazon Redshift cluster"""

def __init__(
self,
Expand All @@ -24,7 +22,7 @@ def __init__(
self.poll_interval = poll_interval
super().__init__(**kwargs)

def execute(self, context: "Context") -> None:
def execute(self, context: "Context") -> None: # noqa: D102
redshift_data_hook = RedshiftDataHook(aws_conn_id=self.redshift_conn_id)
query_ids, response = redshift_data_hook.execute_query(sql=cast(str, self.sql), params=self.params)
if response.get("status") == "error":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(
self.poll_interval = poll_interval
super().__init__(**kwargs)

def execute(self, context: Dict[str, Any]) -> None:
def execute(self, context: Dict[str, Any]) -> None: # noqa: D102
self.defer(
timeout=self.execution_timeout,
trigger=RedshiftClusterSensorTrigger(
Expand Down
32 changes: 26 additions & 6 deletions astronomer/providers/amazon/aws/sensors/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class S3KeySensorAsync(BaseOperator):
Waits for a key (a file-like instance on S3) to be present in a S3 bucket
asynchronously. S3 being a key/value it does not support folders. The path
is just a key a resource.
:param bucket_key: The key being waited on. Supports full s3:// style url
or relative path from root level. When it's specified as a full s3://
url, please leave bucket_name as `None`.
Expand Down Expand Up @@ -73,7 +74,7 @@ def _resolve_bucket_and_key(self) -> None:
if parsed_url.scheme != "" or parsed_url.netloc != "":
raise AirflowException("If bucket_name provided, bucket_key must be relative path, not URI.")

def execute(self, context: Dict[str, Any]) -> None:
def execute(self, context: Dict[str, Any]) -> None: # noqa: D102
self._resolve_bucket_and_key()
self.defer(
timeout=self.execution_timeout,
Expand All @@ -88,6 +89,11 @@ def execute(self, context: Dict[str, Any]) -> None:
)

def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event["status"] == "error":
raise AirflowException(event["message"])
return None
Expand Down Expand Up @@ -135,7 +141,7 @@ def __init__(
super().__init__(**kwargs)
self.check_fn_user = check_fn

def execute(self, context: Dict[str, Any]) -> None:
def execute(self, context: Dict[str, Any]) -> None: # noqa: D102
self._resolve_bucket_and_key()
self.defer(
timeout=self.execution_timeout,
Expand All @@ -151,6 +157,11 @@ def execute(self, context: Dict[str, Any]) -> None:
)

def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event["status"] == "error":
raise AirflowException(event["message"])
return None
Expand Down Expand Up @@ -217,7 +228,7 @@ def __init__(
self.verify = verify
self.last_activity_time: Optional[datetime] = None

def execute(self, context: Dict[str, Any]) -> None:
def execute(self, context: Dict[str, Any]) -> None: # noqa: D102
self.defer(
timeout=self.execution_timeout,
trigger=S3KeysUnchangedTrigger(
Expand All @@ -236,6 +247,11 @@ def execute(self, context: Dict[str, Any]) -> None:
)

def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event["status"] == "error":
raise AirflowException(event["message"])
return None
Expand Down Expand Up @@ -289,7 +305,7 @@ def __init__(
self.aws_conn_id = aws_conn_id
self.verify = verify

def execute(self, context: Dict[Any, Any]) -> None:
def execute(self, context: Dict[Any, Any]) -> None: # noqa: D102
self.log.info("Poking for prefix : %s in bucket s3://%s", self.prefix, self.bucket_name)
self.defer(
timeout=self.execution_timeout,
Expand All @@ -303,8 +319,12 @@ def execute(self, context: Dict[Any, Any]) -> None:
method_name="execute_complete",
)

def execute_complete(self, context: Dict[Any, Any], event: Dict[str, str]) -> None: # pylint:
# disable=unused-argument
def execute_complete(self, context: Dict[Any, Any], event: Dict[str, str]) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info(event["message"])
Expand Down
18 changes: 7 additions & 11 deletions astronomer/providers/amazon/aws/triggers/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from astronomer.providers.amazon.aws.hooks.redshift_cluster import RedshiftHookAsync


class RedshiftClusterTrigger(BaseTrigger):
class RedshiftClusterTrigger(BaseTrigger): # noqa: D101
def __init__(
self,
task_id: str,
Expand All @@ -16,16 +16,15 @@ def __init__(
cluster_identifier: str,
operation_type: str,
):
super().__init__()
self.task_id = task_id
self.polling_period_seconds = polling_period_seconds
self.aws_conn_id = aws_conn_id
self.cluster_identifier = cluster_identifier
self.operation_type = operation_type

def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""
Serializes RedshiftClusterTrigger arguments and classpath.
"""
"""Serializes RedshiftClusterTrigger arguments and classpath."""
return (
"astronomer.providers.amazon.aws.triggers.redshift_cluster.RedshiftClusterTrigger",
{
Expand Down Expand Up @@ -67,7 +66,7 @@ async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
await asyncio.sleep(self.polling_period_seconds)


class RedshiftClusterSensorTrigger(BaseTrigger):
class RedshiftClusterSensorTrigger(BaseTrigger): # noqa: D101
def __init__(
self,
task_id: str,
Expand All @@ -76,16 +75,15 @@ def __init__(
target_status: str,
polling_period_seconds: float,
):
super().__init__()
self.task_id = task_id
self.aws_conn_id = aws_conn_id
self.cluster_identifier = cluster_identifier
self.target_status = target_status
self.polling_period_seconds = polling_period_seconds

def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""
Serializes RedshiftClusterSensorTrigger arguments and classpath.
"""
"""Serializes RedshiftClusterSensorTrigger arguments and classpath."""
return (
"astronomer.providers.amazon.aws.triggers.redshift_cluster.RedshiftClusterSensorTrigger",
{
Expand All @@ -98,9 +96,7 @@ def serialize(self) -> Tuple[str, Dict[str, Any]]:
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
"""
Simple async function run until the cluster status match the target status.
"""
"""Simple async function run until the cluster status match the target status."""
try:
hook = RedshiftHookAsync(aws_conn_id=self.aws_conn_id)
while True:
Expand Down
12 changes: 4 additions & 8 deletions astronomer/providers/amazon/aws/triggers/redshift_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@
from astronomer.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHookAsync


class RedshiftSQLTrigger(BaseTrigger):
class RedshiftSQLTrigger(BaseTrigger): # noqa: D101
def __init__(
self,
task_id: str,
polling_period_seconds: float,
aws_conn_id: str,
query_ids: List[str],
):
super().__init__()
self.task_id = task_id
self.polling_period_seconds = polling_period_seconds
self.aws_conn_id = aws_conn_id
self.query_ids = query_ids

def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""
Serializes RedshiftSQLTrigger arguments and classpath.
"""
"""Serializes RedshiftSQLTrigger arguments and classpath."""
return (
"astronomer.providers.amazon.aws.triggers.redshift_sql.RedshiftSQLTrigger",
{
Expand All @@ -33,10 +32,7 @@ def serialize(self) -> Tuple[str, Dict[str, Any]]:
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
"""
Make async connection to redshiftSQL and execute query using
the Amazon Redshift Data API to interact with Amazon Redshift clusters
"""
"""Make async connection and execute query using the Amazon Redshift Data API."""
hook = RedshiftSQLHookAsync(aws_conn_id=self.aws_conn_id)
try:
response = await hook.get_query_status(self.query_ids)
Expand Down
Loading

0 comments on commit 1d21f65

Please sign in to comment.