From ce64467ec2f54d67a9a26db9ab44c80e80492600 Mon Sep 17 00:00:00 2001 From: Vincent Beck Date: Fri, 19 May 2023 11:07:15 -0400 Subject: [PATCH 1/2] Add D400 pydocstyle check - Amazon provider only --- airflow/providers/amazon/aws/hooks/appflow.py | 2 +- airflow/providers/amazon/aws/hooks/athena.py | 10 ++--- .../providers/amazon/aws/hooks/base_aws.py | 24 ++++++------ .../amazon/aws/hooks/batch_client.py | 20 +++++----- .../amazon/aws/hooks/batch_waiters.py | 4 +- .../amazon/aws/hooks/cloud_formation.py | 2 +- .../providers/amazon/aws/hooks/datasync.py | 2 +- airflow/providers/amazon/aws/hooks/dms.py | 4 +- .../providers/amazon/aws/hooks/dynamodb.py | 2 +- airflow/providers/amazon/aws/hooks/ec2.py | 12 +++--- airflow/providers/amazon/aws/hooks/ecs.py | 30 ++++++++++++--- .../hooks/elasticache_replication_group.py | 8 ++-- airflow/providers/amazon/aws/hooks/emr.py | 2 +- airflow/providers/amazon/aws/hooks/glacier.py | 6 +-- airflow/providers/amazon/aws/hooks/glue.py | 2 +- .../amazon/aws/hooks/glue_catalog.py | 8 ++-- airflow/providers/amazon/aws/hooks/kinesis.py | 4 +- .../amazon/aws/hooks/lambda_function.py | 2 +- .../providers/amazon/aws/hooks/quicksight.py | 4 +- .../amazon/aws/hooks/redshift_cluster.py | 24 ++++++------ .../amazon/aws/hooks/redshift_data.py | 2 +- .../amazon/aws/hooks/redshift_sql.py | 16 ++++---- airflow/providers/amazon/aws/hooks/s3.py | 36 +++++++++--------- .../providers/amazon/aws/hooks/sagemaker.py | 37 ++++++++++--------- .../amazon/aws/hooks/secrets_manager.py | 4 +- airflow/providers/amazon/aws/hooks/ses.py | 4 +- airflow/providers/amazon/aws/hooks/sns.py | 2 +- airflow/providers/amazon/aws/hooks/sqs.py | 2 +- .../amazon/aws/hooks/step_function.py | 2 +- .../providers/amazon/aws/links/base_aws.py | 6 +-- airflow/providers/amazon/aws/links/batch.py | 6 +-- airflow/providers/amazon/aws/links/emr.py | 4 +- airflow/providers/amazon/aws/links/glue.py | 2 +- airflow/providers/amazon/aws/links/logs.py | 2 +- .../amazon/aws/log/s3_task_handler.py | 5 ++- .../providers/amazon/aws/operators/athena.py | 4 +- .../providers/amazon/aws/operators/batch.py | 16 ++++---- .../amazon/aws/operators/datasync.py | 4 +- airflow/providers/amazon/aws/operators/dms.py | 10 ++--- airflow/providers/amazon/aws/operators/ec2.py | 4 +- airflow/providers/amazon/aws/operators/ecs.py | 2 +- airflow/providers/amazon/aws/operators/emr.py | 14 +++---- .../providers/amazon/aws/operators/glacier.py | 4 +- .../providers/amazon/aws/operators/glue.py | 4 +- .../amazon/aws/operators/glue_crawler.py | 2 +- airflow/providers/amazon/aws/operators/rds.py | 24 ++++++------ .../amazon/aws/operators/redshift_cluster.py | 6 +-- .../amazon/aws/operators/redshift_data.py | 6 +-- airflow/providers/amazon/aws/operators/s3.py | 6 +-- airflow/providers/amazon/aws/operators/sns.py | 2 +- airflow/providers/amazon/aws/operators/sqs.py | 4 +- .../amazon/aws/secrets/secrets_manager.py | 21 ++++++----- .../amazon/aws/secrets/systems_manager.py | 14 +++---- .../providers/amazon/aws/sensors/athena.py | 2 +- airflow/providers/amazon/aws/sensors/batch.py | 6 +-- .../amazon/aws/sensors/cloud_formation.py | 4 +- airflow/providers/amazon/aws/sensors/dms.py | 2 +- .../providers/amazon/aws/sensors/dynamodb.py | 4 +- airflow/providers/amazon/aws/sensors/emr.py | 6 +-- .../providers/amazon/aws/sensors/glacier.py | 2 +- airflow/providers/amazon/aws/sensors/glue.py | 3 +- .../aws/sensors/glue_catalog_partition.py | 4 +- .../amazon/aws/sensors/glue_crawler.py | 5 ++- airflow/providers/amazon/aws/sensors/rds.py | 4 +- .../amazon/aws/sensors/redshift_cluster.py | 2 +- airflow/providers/amazon/aws/sensors/s3.py | 2 +- airflow/providers/amazon/aws/sensors/sqs.py | 4 +- .../amazon/aws/sensors/step_function.py | 2 +- .../providers/amazon/aws/transfers/base.py | 4 +- .../amazon/aws/transfers/dynamodb_to_s3.py | 4 +- .../amazon/aws/transfers/glacier_to_gcs.py | 2 +- .../amazon/aws/transfers/mongo_to_s3.py | 6 +-- .../amazon/aws/transfers/s3_to_redshift.py | 2 +- .../amazon/aws/transfers/s3_to_sql.py | 2 +- .../amazon/aws/transfers/sql_to_s3.py | 2 +- .../amazon/aws/triggers/redshift_cluster.py | 2 +- .../amazon/aws/utils/connection_wrapper.py | 2 +- airflow/providers/amazon/aws/utils/emailer.py | 2 +- airflow/providers/amazon/aws/utils/rds.py | 2 +- .../providers/amazon/aws/utils/redshift.py | 3 +- 80 files changed, 281 insertions(+), 253 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/appflow.py b/airflow/providers/amazon/aws/hooks/appflow.py index 14dee2ef10206..16d17a0084a9b 100644 --- a/airflow/providers/amazon/aws/hooks/appflow.py +++ b/airflow/providers/amazon/aws/hooks/appflow.py @@ -45,7 +45,7 @@ def __init__(self, *args, **kwargs) -> None: @cached_property def conn(self) -> AppflowClient: - """Get the underlying boto3 Appflow client (cached)""" + """Get the underlying boto3 Appflow client (cached).""" return super().conn def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str: diff --git a/airflow/providers/amazon/aws/hooks/athena.py b/airflow/providers/amazon/aws/hooks/athena.py index 88793d5165876..914ecbc2b03e9 100644 --- a/airflow/providers/amazon/aws/hooks/athena.py +++ b/airflow/providers/amazon/aws/hooks/athena.py @@ -77,7 +77,7 @@ def run_query( workgroup: str = "primary", ) -> str: """ - Run Presto query on athena with provided config and return submitted query_execution_id + Run Presto query on athena with provided config and return submitted query_execution_id. .. seealso:: - :external+boto3:py:meth:`Athena.Client.start_query_execution` @@ -153,7 +153,7 @@ def get_query_results( ) -> dict | None: """ Fetch submitted athena query results. returns none if query is in intermediate state or - failed/cancelled state else dict of query output + failed/cancelled state else dict of query output. .. seealso:: - :external+boto3:py:meth:`Athena.Client.get_query_results` @@ -188,7 +188,7 @@ def get_query_results_paginator( """ Fetch submitted athena query results. returns none if query is in intermediate state or failed/cancelled state else a paginator to iterate through pages of results. If you - wish to get all results at once, call build_full_result() on the returned PageIterator + wish to get all results at once, call build_full_result() on the returned PageIterator. .. seealso:: - :external+boto3:py:class:`Athena.Paginator.GetQueryResults` @@ -227,7 +227,7 @@ def poll_query_status( ) -> str | None: """ Poll the status of submitted athena query until query state reaches final state. - Returns one of the final states + Returns one of the final states. :param query_execution_id: Id of submitted athena query :param max_polling_attempts: Number of times to poll for query state before function exits @@ -298,7 +298,7 @@ def get_output_location(self, query_execution_id: str) -> str: def stop_query(self, query_execution_id: str) -> dict: """ - Cancel the submitted athena query + Cancel the submitted athena query. .. seealso:: - :external+boto3:py:meth:`Athena.Client.stop_query_execution` diff --git a/airflow/providers/amazon/aws/hooks/base_aws.py b/airflow/providers/amazon/aws/hooks/base_aws.py index c04b68704b1b8..efdedff516fac 100644 --- a/airflow/providers/amazon/aws/hooks/base_aws.py +++ b/airflow/providers/amazon/aws/hooks/base_aws.py @@ -125,7 +125,7 @@ def config(self) -> Config | None: @property def role_arn(self) -> str | None: - """Assume Role ARN from AWS Connection""" + """Assume Role ARN from AWS Connection.""" return self.conn.role_arn def _apply_session_kwargs(self, session): @@ -584,7 +584,7 @@ def get_session(self, region_name: str | None = None, deferrable: bool = False) def _get_config(self, config: Config | None = None) -> Config: """ No AWS Operators use the config argument to this method. - Keep backward compatibility with other users who might use it + Keep backward compatibility with other users who might use it. """ if config is None: config = deepcopy(self.config) @@ -604,7 +604,7 @@ def get_client_type( config: Config | None = None, deferrable: bool = False, ) -> boto3.client: - """Get the underlying boto3 client using boto3 session""" + """Get the underlying boto3 client using boto3 session.""" client_type = self.client_type session = self.get_session(region_name=region_name, deferrable=deferrable) if not isinstance(session, boto3.session.Session): @@ -627,7 +627,7 @@ def get_resource_type( region_name: str | None = None, config: Config | None = None, ) -> boto3.resource: - """Get the underlying boto3 resource using boto3 session""" + """Get the underlying boto3 resource using boto3 session.""" resource_type = self.resource_type session = self.get_session(region_name=region_name) return session.resource( @@ -640,7 +640,7 @@ def get_resource_type( @cached_property def conn(self) -> BaseAwsConnection: """ - Get the underlying boto3 client/resource (cached) + Get the underlying boto3 client/resource (cached). :return: boto3.client or boto3.resource """ @@ -682,7 +682,7 @@ def conn_partition(self) -> str: def get_conn(self) -> BaseAwsConnection: """ - Get the underlying boto3 client/resource (cached) + Get the underlying boto3 client/resource (cached). Implemented so that caching works as intended. It exists for compatibility with subclasses that rely on a super().get_conn() method. @@ -871,7 +871,7 @@ def get_waiter( @staticmethod def _apply_parameters_value(config: dict, waiter_name: str, parameters: dict[str, str] | None) -> dict: - """Replaces potential jinja templates in acceptors definition""" + """Replaces potential jinja templates in acceptors definition.""" # only process the waiter we're going to use to not raise errors for missing params for other waiters. acceptors = config["waiters"][waiter_name]["acceptors"] for a in acceptors: @@ -925,7 +925,7 @@ class AwsBaseHook(AwsGenericHook[Union[boto3.client, boto3.resource]]): def resolve_session_factory() -> type[BaseSessionFactory]: - """Resolves custom SessionFactory class""" + """Resolves custom SessionFactory class.""" clazz = conf.getimport("aws", "session_factory", fallback=None) if not clazz: return BaseSessionFactory @@ -941,7 +941,7 @@ def resolve_session_factory() -> type[BaseSessionFactory]: def _parse_s3_config(config_file_name: str, config_format: str | None = "boto", profile: str | None = None): - """For compatibility with airflow.contrib.hooks.aws_hook""" + """For compatibility with airflow.contrib.hooks.aws_hook.""" from airflow.providers.amazon.aws.utils.connection_wrapper import _parse_s3_config return _parse_s3_config( @@ -967,7 +967,9 @@ class BaseAsyncSessionFactory(BaseSessionFactory): """ async def get_role_credentials(self) -> dict: - """Get the role_arn, method credentials from connection details and get the role credentials detail""" + """Get the role_arn, method credentials from connection details and get the role credentials + detail. + """ async with self._basic_session.create_client("sts", region_name=self.region_name) as client: response = await client.assume_role( RoleArn=self.role_arn, @@ -1066,7 +1068,7 @@ def get_async_session(self) -> AioSession: ).create_session() async def get_client_async(self): - """Get the underlying aiobotocore client using aiobotocore session""" + """Get the underlying aiobotocore client using aiobotocore session.""" return self.get_async_session().create_client( self.client_type, region_name=self.region_name, diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py index 624869a06b128..449fa6069beb9 100644 --- a/airflow/providers/amazon/aws/hooks/batch_client.py +++ b/airflow/providers/amazon/aws/hooks/batch_client.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. """ -A client for AWS Batch services +A client for AWS Batch services. .. seealso:: @@ -53,7 +53,7 @@ class BatchProtocol(Protocol): def describe_jobs(self, jobs: list[str]) -> dict: """ - Get job descriptions from AWS Batch + Get job descriptions from AWS Batch. :param jobs: a list of JobId to describe @@ -63,7 +63,7 @@ def describe_jobs(self, jobs: list[str]) -> dict: def get_waiter(self, waiterName: str) -> botocore.waiter.Waiter: """ - Get an AWS Batch service waiter + Get an AWS Batch service waiter. :param waiterName: The name of the waiter. The name should match the name (including the casing) of the key name in the waiter @@ -98,7 +98,7 @@ def submit_job( tags: dict, ) -> dict: """ - Submit a Batch job + Submit a Batch job. :param jobName: the name for the AWS Batch job @@ -120,7 +120,7 @@ def submit_job( def terminate_job(self, jobId: str, reason: str) -> dict: """ - Terminate a Batch job + Terminate a Batch job. :param jobId: a job ID to terminate @@ -216,7 +216,7 @@ def client(self) -> BatchProtocol | botocore.client.BaseClient: def terminate_job(self, job_id: str, reason: str) -> dict: """ - Terminate a Batch job + Terminate a Batch job. :param job_id: a job ID to terminate @@ -231,7 +231,7 @@ def terminate_job(self, job_id: str, reason: str) -> dict: def check_job_success(self, job_id: str) -> bool: """ Check the final status of the Batch job; return True if the job - 'SUCCEEDED', else raise an AirflowException + 'SUCCEEDED', else raise an AirflowException. :param job_id: a Batch job ID @@ -255,7 +255,7 @@ def check_job_success(self, job_id: str) -> bool: def wait_for_job(self, job_id: str, delay: int | float | None = None) -> None: """ - Wait for Batch job to complete + Wait for Batch job to complete. :param job_id: a Batch job ID @@ -396,7 +396,7 @@ def get_job_description(self, job_id: str) -> dict: @staticmethod def parse_job_description(job_id: str, response: dict) -> dict: """ - Parse job description to extract description for job_id + Parse job description to extract description for job_id. :param job_id: a Batch job ID @@ -488,7 +488,7 @@ def get_job_all_awslogs_info(self, job_id: str) -> list[dict[str, str]]: @staticmethod def add_jitter(delay: int | float, width: int | float = 1, minima: int | float = 0) -> float: """ - Use delay +/- width for random jitter + Use delay +/- width for random jitter. Adding jitter to status polling can help to avoid AWS Batch API limits for monitoring Batch jobs with diff --git a/airflow/providers/amazon/aws/hooks/batch_waiters.py b/airflow/providers/amazon/aws/hooks/batch_waiters.py index cb852acf9d8b8..c746798dff949 100644 --- a/airflow/providers/amazon/aws/hooks/batch_waiters.py +++ b/airflow/providers/amazon/aws/hooks/batch_waiters.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. """ -AWS Batch service waiters +AWS Batch service waiters. .. seealso:: @@ -107,7 +107,7 @@ def __init__(self, *args, waiter_config: dict | None = None, **kwargs) -> None: @property def default_config(self) -> dict: """ - An immutable default waiter configuration + An immutable default waiter configuration. :return: a waiter configuration for AWS Batch services """ diff --git a/airflow/providers/amazon/aws/hooks/cloud_formation.py b/airflow/providers/amazon/aws/hooks/cloud_formation.py index c7157169e3554..b39ee7be5601b 100644 --- a/airflow/providers/amazon/aws/hooks/cloud_formation.py +++ b/airflow/providers/amazon/aws/hooks/cloud_formation.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS CloudFormation Hook""" +"""This module contains AWS CloudFormation Hook.""" from __future__ import annotations from boto3 import client, resource diff --git a/airflow/providers/amazon/aws/hooks/datasync.py b/airflow/providers/amazon/aws/hooks/datasync.py index 3e7c4e420aaf8..0c017a931244e 100644 --- a/airflow/providers/amazon/aws/hooks/datasync.py +++ b/airflow/providers/amazon/aws/hooks/datasync.py @@ -179,7 +179,7 @@ def delete_task(self, task_arn: str) -> None: self.get_conn().delete_task(TaskArn=task_arn) def _refresh_tasks(self) -> None: - """Refreshes the local list of Tasks""" + """Refreshes the local list of Tasks.""" self.tasks = [] next_token = None while True: diff --git a/airflow/providers/amazon/aws/hooks/dms.py b/airflow/providers/amazon/aws/hooks/dms.py index 94cb5ac8ba2eb..d018d5ae40690 100644 --- a/airflow/providers/amazon/aws/hooks/dms.py +++ b/airflow/providers/amazon/aws/hooks/dms.py @@ -51,7 +51,7 @@ def __init__(self, *args, **kwargs): def describe_replication_tasks(self, **kwargs) -> tuple[str | None, list]: """ - Describe replication tasks + Describe replication tasks. .. seealso:: - :external+boto3:py:meth:`DatabaseMigrationService.Client.describe_replication_tasks` @@ -65,7 +65,7 @@ def describe_replication_tasks(self, **kwargs) -> tuple[str | None, list]: def find_replication_tasks_by_arn(self, replication_task_arn: str, without_settings: bool | None = False): """ - Find and describe replication tasks by task ARN + Find and describe replication tasks by task ARN. .. seealso:: - :external+boto3:py:meth:`DatabaseMigrationService.Client.describe_replication_tasks` diff --git a/airflow/providers/amazon/aws/hooks/dynamodb.py b/airflow/providers/amazon/aws/hooks/dynamodb.py index e3cc1a76b5bb5..8710cff31764f 100644 --- a/airflow/providers/amazon/aws/hooks/dynamodb.py +++ b/airflow/providers/amazon/aws/hooks/dynamodb.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains the Amazon DynamoDB Hook""" +"""This module contains the Amazon DynamoDB Hook.""" from __future__ import annotations from typing import Iterable diff --git a/airflow/providers/amazon/aws/hooks/ec2.py b/airflow/providers/amazon/aws/hooks/ec2.py index 91b79aa8a319c..13bcb60196603 100644 --- a/airflow/providers/amazon/aws/hooks/ec2.py +++ b/airflow/providers/amazon/aws/hooks/ec2.py @@ -91,7 +91,7 @@ def get_instance(self, instance_id: str, filters: list | None = None): @only_client_type def stop_instances(self, instance_ids: list) -> dict: """ - Stop instances with given ids + Stop instances with given ids. :param instance_ids: List of instance ids to stop :return: Dict with key `StoppingInstances` and value as list of instances being stopped @@ -103,7 +103,7 @@ def stop_instances(self, instance_ids: list) -> dict: @only_client_type def start_instances(self, instance_ids: list) -> dict: """ - Start instances with given ids + Start instances with given ids. :param instance_ids: List of instance ids to start :return: Dict with key `StartingInstances` and value as list of instances being started @@ -115,7 +115,7 @@ def start_instances(self, instance_ids: list) -> dict: @only_client_type def terminate_instances(self, instance_ids: list) -> dict: """ - Terminate instances with given ids + Terminate instances with given ids. :param instance_ids: List of instance ids to terminate :return: Dict with key `TerminatingInstances` and value as list of instances being terminated @@ -127,7 +127,7 @@ def terminate_instances(self, instance_ids: list) -> dict: @only_client_type def describe_instances(self, filters: list | None = None, instance_ids: list | None = None): """ - Describe EC2 instances, optionally applying filters and selective instance ids + Describe EC2 instances, optionally applying filters and selective instance ids. :param filters: List of filters to specify instances to describe :param instance_ids: List of instance IDs to describe @@ -144,7 +144,7 @@ def describe_instances(self, filters: list | None = None, instance_ids: list | N @only_client_type def get_instances(self, filters: list | None = None, instance_ids: list | None = None) -> list: """ - Get list of instance details, optionally applying filters and selective instance ids + Get list of instance details, optionally applying filters and selective instance ids. :param instance_ids: List of ids to get instances for :param filters: List of filters to specify instances to get @@ -159,7 +159,7 @@ def get_instances(self, filters: list | None = None, instance_ids: list | None = @only_client_type def get_instance_ids(self, filters: list | None = None) -> list: """ - Get list of instance ids, optionally applying filters to fetch selective instances + Get list of instance ids, optionally applying filters to fetch selective instances. :param filters: List of filters to specify instances to get :return: List of instance ids diff --git a/airflow/providers/amazon/aws/hooks/ecs.py b/airflow/providers/amazon/aws/hooks/ecs.py index f8e80f7110663..13178372ef0bb 100644 --- a/airflow/providers/amazon/aws/hooks/ecs.py +++ b/airflow/providers/amazon/aws/hooks/ecs.py @@ -226,25 +226,43 @@ class EcsProtocol(Protocol): """ def run_task(self, **kwargs) -> dict: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task""" # noqa: E501 + """Run a task + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task + """ ... def get_waiter(self, x: str) -> Waiter: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.get_waiter""" # noqa: E501 + """Get a waiter + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.get_waiter + """ ... def describe_tasks(self, cluster: str, tasks) -> dict: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_tasks""" # noqa: E501 + """Describe tasks + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_tasks + """ ... def stop_task(self, cluster, task, reason: str) -> dict: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.stop_task""" # noqa: E501 + """Stop a task + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.stop_task + """ ... def describe_task_definition(self, taskDefinition: str) -> dict: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_task_definition""" # noqa: E501 + """Describe a task definition + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_task_definition + """ ... def list_tasks(self, cluster: str, launchType: str, desiredStatus: str, family: str) -> dict: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.list_tasks""" # noqa: E501 + """List tasks + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.list_tasks + """ ... diff --git a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py index 7debbd4e4c305..44b20bcdce87a 100644 --- a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py +++ b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py @@ -109,7 +109,7 @@ def get_replication_group_status(self, replication_group_id: str) -> str: def is_replication_group_available(self, replication_group_id: str) -> bool: """ - Helper for checking if replication group is available or not + Helper for checking if replication group is available or not. :param replication_group_id: ID of replication group to check for availability :return: True if available else False @@ -124,7 +124,7 @@ def wait_for_availability( max_retries: int | None = None, ) -> bool: """ - Check if replication group is available or not by performing a describe over it + Check if replication group is available or not by performing a describe over it. :param replication_group_id: ID of replication group to check for availability :param initial_sleep_time: Initial sleep time in seconds @@ -178,7 +178,7 @@ def wait_for_deletion( max_retries: int | None = None, ): """ - Helper for deleting a replication group ensuring it is either deleted or can't be deleted + Helper for deleting a replication group ensuring it is either deleted or can't be deleted. :param replication_group_id: ID of replication to delete :param initial_sleep_time: Initial sleep time in second @@ -253,7 +253,7 @@ def ensure_delete_replication_group( max_retries: int | None = None, ) -> dict: """ - Delete a replication group ensuring it is either deleted or can't be deleted + Delete a replication group ensuring it is either deleted or can't be deleted. :param replication_group_id: ID of replication to delete :param initial_sleep_time: Initial sleep time in second diff --git a/airflow/providers/amazon/aws/hooks/emr.py b/airflow/providers/amazon/aws/hooks/emr.py index 412e7c2e91f8c..4effe18ba2a03 100644 --- a/airflow/providers/amazon/aws/hooks/emr.py +++ b/airflow/providers/amazon/aws/hooks/emr.py @@ -495,7 +495,7 @@ def poll_query_status( def stop_query(self, job_id: str) -> dict: """ - Cancel the submitted job_run + Cancel the submitted job_run. .. seealso:: - :external+boto3:py:meth:`EMRContainers.Client.cancel_job_run` diff --git a/airflow/providers/amazon/aws/hooks/glacier.py b/airflow/providers/amazon/aws/hooks/glacier.py index 835e5f48e513a..cc886a64b72dd 100644 --- a/airflow/providers/amazon/aws/hooks/glacier.py +++ b/airflow/providers/amazon/aws/hooks/glacier.py @@ -40,7 +40,7 @@ def __init__(self, aws_conn_id: str = "aws_default") -> None: def retrieve_inventory(self, vault_name: str) -> dict[str, Any]: """ - Initiate an Amazon Glacier inventory-retrieval job + Initiate an Amazon Glacier inventory-retrieval job. .. seealso:: - :external+boto3:py:meth:`Glacier.Client.initiate_job` @@ -56,7 +56,7 @@ def retrieve_inventory(self, vault_name: str) -> dict[str, Any]: def retrieve_inventory_results(self, vault_name: str, job_id: str) -> dict[str, Any]: """ - Retrieve the results of an Amazon Glacier inventory-retrieval job + Retrieve the results of an Amazon Glacier inventory-retrieval job. .. seealso:: - :external+boto3:py:meth:`Glacier.Client.get_job_output` @@ -71,7 +71,7 @@ def retrieve_inventory_results(self, vault_name: str, job_id: str) -> dict[str, def describe_job(self, vault_name: str, job_id: str) -> dict[str, Any]: """ Retrieve the status of an Amazon S3 Glacier job, such as an - inventory-retrieval job + inventory-retrieval job. .. seealso:: - :external+boto3:py:meth:`Glacier.Client.describe_job` diff --git a/airflow/providers/amazon/aws/hooks/glue.py b/airflow/providers/amazon/aws/hooks/glue.py index 6f4ed8342e072..ca0d8238dbcd4 100644 --- a/airflow/providers/amazon/aws/hooks/glue.py +++ b/airflow/providers/amazon/aws/hooks/glue.py @@ -210,7 +210,7 @@ def print_job_logs( paginator = log_client.get_paginator("filter_log_events") def display_logs_from(log_group: str, continuation_token: str | None) -> str | None: - """Internal method to mutualize iteration over the 2 different log streams glue jobs write to""" + """Internal method to mutualize iteration over the 2 different log streams glue jobs write to.""" fetched_logs = [] next_token = continuation_token try: diff --git a/airflow/providers/amazon/aws/hooks/glue_catalog.py b/airflow/providers/amazon/aws/hooks/glue_catalog.py index ce94ac18d78d6..81c452cc0e9f2 100644 --- a/airflow/providers/amazon/aws/hooks/glue_catalog.py +++ b/airflow/providers/amazon/aws/hooks/glue_catalog.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS Glue Catalog Hook""" +"""This module contains AWS Glue Catalog Hook.""" from __future__ import annotations from botocore.exceptions import ClientError @@ -85,7 +85,7 @@ def get_partitions( def check_for_partition(self, database_name: str, table_name: str, expression: str) -> bool: """ - Checks whether a partition exists + Checks whether a partition exists. .. code-block:: python @@ -103,7 +103,7 @@ def check_for_partition(self, database_name: str, table_name: str, expression: s def get_table(self, database_name: str, table_name: str) -> dict: """ - Get the information of the table + Get the information of the table. .. seealso:: - :external+boto3:py:meth:`Glue.Client.get_table` @@ -137,7 +137,7 @@ def get_table_location(self, database_name: str, table_name: str) -> str: def get_partition(self, database_name: str, table_name: str, partition_values: list[str]) -> dict: """ - Gets a Partition + Gets a Partition. .. seealso:: - :external+boto3:py:meth:`Glue.Client.get_partition` diff --git a/airflow/providers/amazon/aws/hooks/kinesis.py b/airflow/providers/amazon/aws/hooks/kinesis.py index 6f6b850fe5dcb..e527aa6aaccb6 100644 --- a/airflow/providers/amazon/aws/hooks/kinesis.py +++ b/airflow/providers/amazon/aws/hooks/kinesis.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS Firehose hook""" +"""This module contains AWS Firehose hook.""" from __future__ import annotations from typing import Iterable @@ -43,7 +43,7 @@ def __init__(self, delivery_stream: str, *args, **kwargs) -> None: super().__init__(*args, **kwargs) def put_records(self, records: Iterable): - """Write batch records to Kinesis Firehose + """Write batch records to Kinesis Firehose. .. seealso:: - :external+boto3:py:meth:`Firehose.Client.put_record_batch` diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py index c8d0327be9b73..a3e82688aeed1 100644 --- a/airflow/providers/amazon/aws/hooks/lambda_function.py +++ b/airflow/providers/amazon/aws/hooks/lambda_function.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS Lambda hook""" +"""This module contains AWS Lambda hook.""" from __future__ import annotations from typing import Any diff --git a/airflow/providers/amazon/aws/hooks/quicksight.py b/airflow/providers/amazon/aws/hooks/quicksight.py index 11ea728e5ef8c..633b6a5cfc6e7 100644 --- a/airflow/providers/amazon/aws/hooks/quicksight.py +++ b/airflow/providers/amazon/aws/hooks/quicksight.py @@ -58,7 +58,7 @@ def create_ingestion( check_interval: int = 30, ) -> dict: """ - Creates and starts a new SPICE ingestion for a dataset. Refreshes the SPICE datasets + Creates and starts a new SPICE ingestion for a dataset. Refreshes the SPICE datasets. .. seealso:: - :external+boto3:py:meth:`QuickSight.Client.create_ingestion` @@ -143,7 +143,7 @@ def wait_for_state( check_interval: int, ): """ - Check status of a QuickSight Create Ingestion API + Check status of a QuickSight Create Ingestion API. :param aws_account_id: An AWS Account ID :param data_set_id: QuickSight Data Set ID diff --git a/airflow/providers/amazon/aws/hooks/redshift_cluster.py b/airflow/providers/amazon/aws/hooks/redshift_cluster.py index 27fc25a1de546..5bd76f2af0828 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_cluster.py +++ b/airflow/providers/amazon/aws/hooks/redshift_cluster.py @@ -52,7 +52,7 @@ def create_cluster( params: dict[str, Any], ) -> dict[str, Any]: """ - Creates a new cluster with the specified parameters + Creates a new cluster with the specified parameters. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.create_cluster` @@ -83,7 +83,7 @@ def create_cluster( # TODO: Wrap create_cluster_snapshot def cluster_status(self, cluster_identifier: str) -> str: """ - Return status of a cluster + Return status of a cluster. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.describe_clusters` @@ -105,7 +105,7 @@ def delete_cluster( final_cluster_snapshot_identifier: str | None = None, ): """ - Delete a cluster and optionally create a snapshot + Delete a cluster and optionally create a snapshot. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.delete_cluster` @@ -125,7 +125,7 @@ def delete_cluster( def describe_cluster_snapshots(self, cluster_identifier: str) -> list[str] | None: """ - Gets a list of snapshots for a cluster + Gets a list of snapshots for a cluster. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.describe_cluster_snapshots` @@ -142,7 +142,7 @@ def describe_cluster_snapshots(self, cluster_identifier: str) -> list[str] | Non def restore_from_cluster_snapshot(self, cluster_identifier: str, snapshot_identifier: str) -> str: """ - Restores a cluster from its snapshot + Restores a cluster from its snapshot. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.restore_from_cluster_snapshot` @@ -163,7 +163,7 @@ def create_cluster_snapshot( tags: list[Any] | None = None, ) -> str: """ - Creates a snapshot of a cluster + Creates a snapshot of a cluster. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.create_cluster_snapshot` @@ -186,7 +186,7 @@ def create_cluster_snapshot( def get_cluster_snapshot_status(self, snapshot_identifier: str): """ - Return Redshift cluster snapshot status. If cluster snapshot not found return ``None`` + Return Redshift cluster snapshot status. If cluster snapshot not found return ``None``. :param snapshot_identifier: A unique identifier for the snapshot that you are requesting """ @@ -202,7 +202,7 @@ def get_cluster_snapshot_status(self, snapshot_identifier: str): class RedshiftAsyncHook(AwsBaseAsyncHook): - """Interact with AWS Redshift using aiobotocore library""" + """Interact with AWS Redshift using aiobotocore library.""" def __init__(self, *args: Any, **kwargs: Any) -> None: kwargs["client_type"] = "redshift" @@ -211,7 +211,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: async def cluster_status(self, cluster_identifier: str, delete_operation: bool = False) -> dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and get the status - and returns the status of the cluster based on the cluster_identifier passed + and returns the status of the cluster based on the cluster_identifier passed. :param cluster_identifier: unique identifier of a cluster :param delete_operation: whether the method has been called as part of delete cluster operation @@ -231,7 +231,7 @@ async def cluster_status(self, cluster_identifier: str, delete_operation: bool = async def pause_cluster(self, cluster_identifier: str, poll_interval: float = 5.0) -> dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and - pause the cluster based on the cluster_identifier passed + pause the cluster based on the cluster_identifier passed. :param cluster_identifier: unique identifier of a cluster :param poll_interval: polling period in seconds to check for the status @@ -260,7 +260,7 @@ async def resume_cluster( ) -> dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and - resume the cluster for the cluster_identifier passed + resume the cluster for the cluster_identifier passed. :param cluster_identifier: unique identifier of a cluster :param polling_period_seconds: polling period in seconds to check for the status @@ -290,7 +290,7 @@ async def get_cluster_status( delete_operation: bool = False, ) -> dict[str, Any]: """ - check for expected Redshift cluster state + check for expected Redshift cluster state. :param cluster_identifier: unique identifier of a cluster :param expected_state: expected_state example("available", "pausing", "paused"") diff --git a/airflow/providers/amazon/aws/hooks/redshift_data.py b/airflow/providers/amazon/aws/hooks/redshift_data.py index e033624c4c8c1..b38fc8962bc73 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_data.py +++ b/airflow/providers/amazon/aws/hooks/redshift_data.py @@ -61,7 +61,7 @@ def execute_query( poll_interval: int = 10, ) -> str: """ - Execute a statement against Amazon Redshift + Execute a statement against Amazon Redshift. :param database: the name of the database :param sql: the SQL statement or list of SQL statement to run diff --git a/airflow/providers/amazon/aws/hooks/redshift_sql.py b/airflow/providers/amazon/aws/hooks/redshift_sql.py index e9c2b7fecc78b..6b41d1f696af7 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_sql.py +++ b/airflow/providers/amazon/aws/hooks/redshift_sql.py @@ -33,7 +33,7 @@ class RedshiftSQLHook(DbApiHook): """ - Execute statements against Amazon Redshift, using redshift_connector + Execute statements against Amazon Redshift, using redshift_connector. This hook requires the redshift_conn_id connection. @@ -64,7 +64,7 @@ def __init__(self, *args, aws_conn_id: str = "aws_default", **kwargs) -> None: @staticmethod def get_ui_field_behaviour() -> dict: - """Returns custom field behavior""" + """Returns custom field behavior.""" return { "hidden_fields": [], "relabeling": {"login": "User", "schema": "Database"}, @@ -75,7 +75,7 @@ def conn(self): return self.get_connection(self.redshift_conn_id) # type: ignore[attr-defined] def _get_conn_params(self) -> dict[str, str | int]: - """Helper method to retrieve connection args""" + """Helper method to retrieve connection args.""" conn = self.conn conn_params: dict[str, str | int] = {} @@ -99,7 +99,7 @@ def _get_conn_params(self) -> dict[str, str | int]: def get_iam_token(self, conn: Connection) -> tuple[str, str, int]: """ Uses AWSHook to retrieve a temporary password to connect to Redshift. - Port is required. If none is provided, default is used for each service + Port is required. If none is provided, default is used for each service. """ port = conn.port or 5439 # Pull the custer-identifier from the beginning of the Redshift URL @@ -118,7 +118,7 @@ def get_iam_token(self, conn: Connection) -> tuple[str, str, int]: return login, token, port def get_uri(self) -> str: - """Overrides DbApiHook get_uri to use redshift_connector sqlalchemy dialect as driver name""" + """Overrides DbApiHook get_uri to use redshift_connector sqlalchemy dialect as driver name.""" conn_params = self._get_conn_params() if "user" in conn_params: @@ -130,7 +130,7 @@ def get_uri(self) -> str: return str(create_url(drivername="redshift+redshift_connector", **conn_params)) def get_sqlalchemy_engine(self, engine_kwargs=None): - """Overrides DbApiHook get_sqlalchemy_engine to pass redshift_connector specific kwargs""" + """Overrides DbApiHook get_sqlalchemy_engine to pass redshift_connector specific kwargs.""" conn_kwargs = self.conn.extra_dejson if engine_kwargs is None: engine_kwargs = {} @@ -147,7 +147,7 @@ def get_table_primary_key(self, table: str, schema: str | None = "public") -> li Helper method that returns the table primary key :param table: Name of the target table :param schema: Name of the target schema, public by default - :return: Primary key columns list + :return: Primary key columns list. """ sql = """ select kcu.column_name @@ -164,7 +164,7 @@ def get_table_primary_key(self, table: str, schema: str | None = "public") -> li return pk_columns or None def get_conn(self) -> RedshiftConnection: - """Returns a redshift_connector.Connection object""" + """Returns a redshift_connector.Connection object.""" conn_params = self._get_conn_params() conn_kwargs_dejson = self.conn.extra_dejson conn_kwargs: dict = {**conn_params, **conn_kwargs_dejson} diff --git a/airflow/providers/amazon/aws/hooks/s3.py b/airflow/providers/amazon/aws/hooks/s3.py index 1cfa6b7b01eba..777839c1c3c0f 100644 --- a/airflow/providers/amazon/aws/hooks/s3.py +++ b/airflow/providers/amazon/aws/hooks/s3.py @@ -175,7 +175,7 @@ def parse_s3_url(s3url: str) -> tuple[str, str]: """ Parses the S3 Url into a bucket name and key. See https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html - for valid url formats + for valid url formats. :param s3url: The S3 Url to parse. :return: the parsed bucket name and key @@ -207,8 +207,8 @@ def get_s3_bucket_key( ) -> tuple[str, str]: """ Get the S3 bucket name and key from either: - - bucket name and key. Return the info as it is after checking `key` is a relative path - - key. Must be a full s3:// url + - bucket name and key. Return the info as it is after checking `key` is a relative path. + - key. Must be a full s3:// url. :param bucket: The S3 bucket name :param key: The S3 key @@ -261,7 +261,7 @@ def check_for_bucket(self, bucket_name: str | None = None) -> bool: @provide_bucket_name def get_bucket(self, bucket_name: str | None = None) -> object: """ - Returns a :py:class:`S3.Bucket` object + Returns a :py:class:`S3.Bucket` object. .. seealso:: - :external+boto3:py:meth:`S3.ServiceResource.Bucket` @@ -306,7 +306,7 @@ def create_bucket(self, bucket_name: str | None = None, region_name: str | None @provide_bucket_name def check_for_prefix(self, prefix: str, delimiter: str, bucket_name: str | None = None) -> bool: """ - Checks that a prefix exists in a bucket + Checks that a prefix exists in a bucket. :param bucket_name: the name of the bucket :param prefix: a key prefix @@ -329,7 +329,7 @@ def list_prefixes( max_items: int | None = None, ) -> list: """ - Lists prefixes in a bucket under prefix + Lists prefixes in a bucket under prefix. .. seealso:: - :external+boto3:py:class:`S3.Paginator.ListObjectsV2` @@ -386,7 +386,7 @@ def list_keys( object_filter: Callable[..., list] | None = None, ) -> list: """ - Lists keys in a bucket under prefix and not containing delimiter + Lists keys in a bucket under prefix and not containing delimiter. .. seealso:: - :external+boto3:py:class:`S3.Paginator.ListObjectsV2` @@ -461,7 +461,7 @@ def get_file_metadata( max_items: int | None = None, ) -> list: """ - Lists metadata objects in a bucket under prefix + Lists metadata objects in a bucket under prefix. .. seealso:: - :external+boto3:py:class:`S3.Paginator.ListObjectsV2` @@ -490,7 +490,7 @@ def get_file_metadata( @provide_bucket_name def head_object(self, key: str, bucket_name: str | None = None) -> dict | None: """ - Retrieves metadata of an object + Retrieves metadata of an object. .. seealso:: - :external+boto3:py:meth:`S3.Client.head_object` @@ -511,7 +511,7 @@ def head_object(self, key: str, bucket_name: str | None = None) -> dict | None: @provide_bucket_name def check_for_key(self, key: str, bucket_name: str | None = None) -> bool: """ - Checks if a key exists in a bucket + Checks if a key exists in a bucket. .. seealso:: - :external+boto3:py:meth:`S3.Client.head_object` @@ -550,7 +550,7 @@ def get_key(self, key: str, bucket_name: str | None = None) -> S3ResourceObject: @provide_bucket_name def read_key(self, key: str, bucket_name: str | None = None) -> str: """ - Reads a key from S3 + Reads a key from S3. .. seealso:: - :external+boto3:py:meth:`S3.Object.get` @@ -614,7 +614,7 @@ def check_for_wildcard_key( self, wildcard_key: str, bucket_name: str | None = None, delimiter: str = "" ) -> bool: """ - Checks that a key matching a wildcard expression exists in a bucket + Checks that a key matching a wildcard expression exists in a bucket. :param wildcard_key: the path to the key :param bucket_name: the name of the bucket @@ -632,7 +632,7 @@ def get_wildcard_key( self, wildcard_key: str, bucket_name: str | None = None, delimiter: str = "" ) -> S3ResourceObject | None: """ - Returns a boto3.s3.Object object matching the wildcard expression + Returns a boto3.s3.Object object matching the wildcard expression. :param wildcard_key: the path to the key :param bucket_name: the name of the bucket @@ -659,7 +659,7 @@ def load_file( acl_policy: str | None = None, ) -> None: """ - Loads a local file to S3 + Loads a local file to S3. .. seealso:: - :external+boto3:py:meth:`S3.Client.upload_file` @@ -709,7 +709,7 @@ def load_string( compression: str | None = None, ) -> None: """ - Loads a string to S3 + Loads a string to S3. This is provided as a convenience to drop a string in S3. It uses the boto infrastructure to ship a file to s3. @@ -760,7 +760,7 @@ def load_bytes( acl_policy: str | None = None, ) -> None: """ - Loads bytes to S3 + Loads bytes to S3. This is provided as a convenience to drop bytes data into S3. It uses the boto infrastructure to ship a file to s3. @@ -794,7 +794,7 @@ def load_file_obj( acl_policy: str | None = None, ) -> None: """ - Loads a file object to S3 + Loads a file object to S3. .. seealso:: - :external+boto3:py:meth:`S3.Client.upload_fileobj` @@ -1034,7 +1034,7 @@ def generate_presigned_url( http_method: str | None = None, ) -> str | None: """ - Generate a presigned url given a client, its method, and arguments + Generate a presigned url given a client, its method, and arguments. .. seealso:: - :external+boto3:py:meth:`S3.Client.generate_presigned_url` diff --git a/airflow/providers/amazon/aws/hooks/sagemaker.py b/airflow/providers/amazon/aws/hooks/sagemaker.py index 31c2eaaf5bd2d..b6e7dd487d222 100644 --- a/airflow/providers/amazon/aws/hooks/sagemaker.py +++ b/airflow/providers/amazon/aws/hooks/sagemaker.py @@ -41,6 +41,7 @@ class LogState: """ Enum-style class holding all possible states of CloudWatch log streams. + https://sagemaker.readthedocs.io/en/stable/session.html#sagemaker.session.LogState """ @@ -57,7 +58,7 @@ class LogState: def argmin(arr, f: Callable) -> int | None: - """Return the index, i, in arr that minimizes f(arr[i])""" + """Return the index, i, in arr that minimizes f(arr[i]).""" min_value = None min_idx = None for idx, item in enumerate(arr): @@ -158,7 +159,7 @@ def __init__(self, *args, **kwargs): def tar_and_s3_upload(self, path: str, key: str, bucket: str) -> None: """ - Tar the local file or directory and upload to s3 + Tar the local file or directory and upload to s3. :param path: local file or directory :param key: s3 key @@ -197,7 +198,7 @@ def configure_s3_resources(self, config: dict) -> None: def check_s3_url(self, s3url: str) -> bool: """ - Check if an S3 URL exists + Check if an S3 URL exists. :param s3url: S3 url """ @@ -219,7 +220,7 @@ def check_s3_url(self, s3url: str) -> bool: def check_training_config(self, training_config: dict) -> None: """ - Check if a training configuration is valid + Check if a training configuration is valid. :param training_config: training_config :return: None @@ -231,7 +232,7 @@ def check_training_config(self, training_config: dict) -> None: def check_tuning_config(self, tuning_config: dict) -> None: """ - Check if a tuning configuration is valid + Check if a tuning configuration is valid. :param tuning_config: tuning_config :return: None @@ -543,7 +544,7 @@ def update_endpoint( def describe_training_job(self, name: str): """ - Return the training job info associated with the name + Return the training job info associated with the name. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.describe_training_job` @@ -563,7 +564,7 @@ def describe_training_job_with_log( last_description: dict, last_describe_job_call: float, ): - """Return the training job info associated with job_name and print CloudWatch logs""" + """Return the training job info associated with job_name and print CloudWatch logs.""" log_group = "/aws/sagemaker/TrainingJobs" if len(stream_names) < instance_count: @@ -616,7 +617,7 @@ def describe_training_job_with_log( def describe_tuning_job(self, name: str) -> dict: """ - Return the tuning job info associated with the name + Return the tuning job info associated with the name. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.describe_hyper_parameter_tuning_job` @@ -628,7 +629,7 @@ def describe_tuning_job(self, name: str) -> dict: def describe_model(self, name: str) -> dict: """ - Return the SageMaker model info associated with the name + Return the SageMaker model info associated with the name. :param name: the name of the SageMaker model :return: A dict contains all the model info @@ -637,7 +638,7 @@ def describe_model(self, name: str) -> dict: def describe_transform_job(self, name: str) -> dict: """ - Return the transform job info associated with the name + Return the transform job info associated with the name. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.describe_transform_job` @@ -649,7 +650,7 @@ def describe_transform_job(self, name: str) -> dict: def describe_processing_job(self, name: str) -> dict: """ - Return the processing job info associated with the name + Return the processing job info associated with the name. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.describe_processing_job` @@ -661,7 +662,7 @@ def describe_processing_job(self, name: str) -> dict: def describe_endpoint_config(self, name: str) -> dict: """ - Return the endpoint config info associated with the name + Return the endpoint config info associated with the name. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.describe_endpoint_config` @@ -693,7 +694,7 @@ def check_status( non_terminal_states: set | None = None, ) -> dict: """ - Check status of a SageMaker resource + Check status of a SageMaker resource. :param job_name: name of the resource to check status, can be a job but also pipeline for instance. :param key: the key of the response dict that points to the state @@ -833,7 +834,7 @@ def list_training_jobs( """ This method wraps boto3's `list_training_jobs`. The training job name and max results are configurable via arguments. Other arguments are not, and should be provided via kwargs. Note boto3 expects these in - CamelCase format, for example: + CamelCase format, for example. .. code-block:: python @@ -861,7 +862,7 @@ def list_transform_jobs( This method wraps boto3's `list_transform_jobs`. The transform job name and max results are configurable via arguments. Other arguments are not, and should be provided via kwargs. Note boto3 expects these in - CamelCase format, for example: + CamelCase format, for example. .. code-block:: python @@ -885,7 +886,7 @@ def list_transform_jobs( def list_processing_jobs(self, **kwargs) -> list[dict]: """ This method wraps boto3's `list_processing_jobs`. All arguments should be provided via kwargs. - Note boto3 expects these in CamelCase format, for example: + Note boto3 expects these in CamelCase format, for example. .. code-block:: python @@ -1116,7 +1117,7 @@ def stop_pipeline( verbose: bool = True, fail_if_not_running: bool = False, ) -> str: - """Stop SageMaker pipeline execution + """Stop SageMaker pipeline execution. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.stop_pipeline_execution` @@ -1182,7 +1183,7 @@ def stop_pipeline( def create_model_package_group(self, package_group_name: str, package_group_desc: str = "") -> bool: """ - Creates a Model Package Group if it does not already exist + Creates a Model Package Group if it does not already exist. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.create_model_package_group` diff --git a/airflow/providers/amazon/aws/hooks/secrets_manager.py b/airflow/providers/amazon/aws/hooks/secrets_manager.py index f64b834bbedd1..c82d543b0cc68 100644 --- a/airflow/providers/amazon/aws/hooks/secrets_manager.py +++ b/airflow/providers/amazon/aws/hooks/secrets_manager.py @@ -42,7 +42,7 @@ def __init__(self, *args, **kwargs): def get_secret(self, secret_name: str) -> str | bytes: """ Retrieve secret value from AWS Secrets Manager as a str or bytes - reflecting format it stored in the AWS Secrets Manager + reflecting format it stored in the AWS Secrets Manager. .. seealso:: - :external+boto3:py:meth:`SecretsManager.Client.get_secret_value` @@ -61,7 +61,7 @@ def get_secret(self, secret_name: str) -> str | bytes: def get_secret_as_dict(self, secret_name: str) -> dict: """ - Retrieve secret value from AWS Secrets Manager in a dict representation + Retrieve secret value from AWS Secrets Manager in a dict representation. :param secret_name: name of the secrets. :return: dict with the information about the secrets diff --git a/airflow/providers/amazon/aws/hooks/ses.py b/airflow/providers/amazon/aws/hooks/ses.py index b919fcc75713c..2f5a6a91646fc 100644 --- a/airflow/providers/amazon/aws/hooks/ses.py +++ b/airflow/providers/amazon/aws/hooks/ses.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS SES Hook""" +"""This module contains AWS SES Hook.""" from __future__ import annotations from typing import Any, Iterable @@ -55,7 +55,7 @@ def send_email( custom_headers: dict[str, Any] | None = None, ) -> dict: """ - Send email using Amazon Simple Email Service + Send email using Amazon Simple Email Service. .. seealso:: - :external+boto3:py:meth:`SES.Client.send_raw_email` diff --git a/airflow/providers/amazon/aws/hooks/sns.py b/airflow/providers/amazon/aws/hooks/sns.py index ee022dd00a7d3..199376c7ffe87 100644 --- a/airflow/providers/amazon/aws/hooks/sns.py +++ b/airflow/providers/amazon/aws/hooks/sns.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS SNS hook""" +"""This module contains AWS SNS hook.""" from __future__ import annotations import json diff --git a/airflow/providers/amazon/aws/hooks/sqs.py b/airflow/providers/amazon/aws/hooks/sqs.py index 43699787afbf3..c59beba9225df 100644 --- a/airflow/providers/amazon/aws/hooks/sqs.py +++ b/airflow/providers/amazon/aws/hooks/sqs.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS SQS hook""" +"""This module contains AWS SQS hook.""" from __future__ import annotations from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook diff --git a/airflow/providers/amazon/aws/hooks/step_function.py b/airflow/providers/amazon/aws/hooks/step_function.py index 7baf23eadee68..8b5df2bdba62a 100644 --- a/airflow/providers/amazon/aws/hooks/step_function.py +++ b/airflow/providers/amazon/aws/hooks/step_function.py @@ -70,7 +70,7 @@ def start_execution( def describe_execution(self, execution_arn: str) -> dict: """ - Describes a State Machine Execution + Describes a State Machine Execution. .. seealso:: - :external+boto3:py:meth:`SFN.Client.describe_execution` diff --git a/airflow/providers/amazon/aws/links/base_aws.py b/airflow/providers/amazon/aws/links/base_aws.py index 82acd337bfc41..97130fabd6cc7 100644 --- a/airflow/providers/amazon/aws/links/base_aws.py +++ b/airflow/providers/amazon/aws/links/base_aws.py @@ -31,7 +31,7 @@ class BaseAwsLink(BaseOperatorLink): - """Base Helper class for constructing AWS Console Link""" + """Base Helper class for constructing AWS Console Link.""" name: ClassVar[str] key: ClassVar[str] @@ -50,7 +50,7 @@ def get_aws_domain(aws_partition) -> str | None: def format_link(self, **kwargs) -> str: """ - Format AWS Service Link + Format AWS Service Link. Some AWS Service Link should require additional escaping in this case this method should be overridden. @@ -80,7 +80,7 @@ def get_link( def persist( cls, context: Context, operator: BaseOperator, region_name: str, aws_partition: str, **kwargs ) -> None: - """Store link information into XCom""" + """Store link information into XCom.""" if not operator.do_xcom_push: return diff --git a/airflow/providers/amazon/aws/links/batch.py b/airflow/providers/amazon/aws/links/batch.py index 432d129a7c328..4c5bdd8016040 100644 --- a/airflow/providers/amazon/aws/links/batch.py +++ b/airflow/providers/amazon/aws/links/batch.py @@ -20,7 +20,7 @@ class BatchJobDefinitionLink(BaseAwsLink): - """Helper class for constructing AWS Batch Job Definition Link""" + """Helper class for constructing AWS Batch Job Definition Link.""" name = "Batch Job Definition" key = "batch_job_definition" @@ -30,7 +30,7 @@ class BatchJobDefinitionLink(BaseAwsLink): class BatchJobDetailsLink(BaseAwsLink): - """Helper class for constructing AWS Batch Job Details Link""" + """Helper class for constructing AWS Batch Job Details Link.""" name = "Batch Job Details" key = "batch_job_details" @@ -38,7 +38,7 @@ class BatchJobDetailsLink(BaseAwsLink): class BatchJobQueueLink(BaseAwsLink): - """Helper class for constructing AWS Batch Job Queue Link""" + """Helper class for constructing AWS Batch Job Queue Link.""" name = "Batch Job Queue" key = "batch_job_queue" diff --git a/airflow/providers/amazon/aws/links/emr.py b/airflow/providers/amazon/aws/links/emr.py index 1739f76a1fa4c..67278ac46ab07 100644 --- a/airflow/providers/amazon/aws/links/emr.py +++ b/airflow/providers/amazon/aws/links/emr.py @@ -27,7 +27,7 @@ class EmrClusterLink(BaseAwsLink): - """Helper class for constructing AWS EMR Cluster Link""" + """Helper class for constructing AWS EMR Cluster Link.""" name = "EMR Cluster" key = "emr_cluster" @@ -35,7 +35,7 @@ class EmrClusterLink(BaseAwsLink): class EmrLogsLink(BaseAwsLink): - """Helper class for constructing AWS EMR Logs Link""" + """Helper class for constructing AWS EMR Logs Link.""" name = "EMR Cluster Logs" key = "emr_logs" diff --git a/airflow/providers/amazon/aws/links/glue.py b/airflow/providers/amazon/aws/links/glue.py index 44a7110118ee0..ad9c1765a94bd 100644 --- a/airflow/providers/amazon/aws/links/glue.py +++ b/airflow/providers/amazon/aws/links/glue.py @@ -20,7 +20,7 @@ class GlueJobRunDetailsLink(BaseAwsLink): - """Helper class for constructing AWS Glue Job Run Details Link""" + """Helper class for constructing AWS Glue Job Run Details Link.""" name = "AWS Glue Job Run Details" key = "glue_job_run_details" diff --git a/airflow/providers/amazon/aws/links/logs.py b/airflow/providers/amazon/aws/links/logs.py index 7998191d9226d..90296957ca61b 100644 --- a/airflow/providers/amazon/aws/links/logs.py +++ b/airflow/providers/amazon/aws/links/logs.py @@ -22,7 +22,7 @@ class CloudWatchEventsLink(BaseAwsLink): - """Helper class for constructing AWS CloudWatch Events Link""" + """Helper class for constructing AWS CloudWatch Events Link.""" name = "CloudWatch Events" key = "cloudwatch_events" diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index 20754075a2dac..d1c30bf97cf46 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -31,7 +31,8 @@ def get_default_delete_local_copy(): - """Load delete_local_logs conf if Airflow version > 2.6 and return False if not + """Load delete_local_logs conf if Airflow version > 2.6 and return False if not. + TODO: delete this function when min airflow version >= 2.6 """ from airflow.version import version @@ -158,7 +159,7 @@ def _read(self, ti, try_number, metadata=None): def s3_log_exists(self, remote_log_location: str) -> bool: """ - Check if remote_log_location exists in remote storage + Check if remote_log_location exists in remote storage. :param remote_log_location: log's location in remote storage :return: True if location exists else False diff --git a/airflow/providers/amazon/aws/operators/athena.py b/airflow/providers/amazon/aws/operators/athena.py index 1d544985693eb..3bdf03fb03296 100644 --- a/airflow/providers/amazon/aws/operators/athena.py +++ b/airflow/providers/amazon/aws/operators/athena.py @@ -91,7 +91,7 @@ def hook(self) -> AthenaHook: return AthenaHook(self.aws_conn_id, sleep_time=self.sleep_time, log_query=self.log_query) def execute(self, context: Context) -> str | None: - """Run Presto Query on Athena""" + """Run Presto Query on Athena.""" self.query_execution_context["Database"] = self.database self.result_configuration["OutputLocation"] = self.output_location self.query_execution_id = self.hook.run_query( @@ -121,7 +121,7 @@ def execute(self, context: Context) -> str | None: return self.query_execution_id def on_kill(self) -> None: - """Cancel the submitted athena query""" + """Cancel the submitted athena query.""" if self.query_execution_id: self.log.info("Received a kill signal.") response = self.hook.stop_query(self.query_execution_id) diff --git a/airflow/providers/amazon/aws/operators/batch.py b/airflow/providers/amazon/aws/operators/batch.py index 272122d1093b9..9c3f07fb9373b 100644 --- a/airflow/providers/amazon/aws/operators/batch.py +++ b/airflow/providers/amazon/aws/operators/batch.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. """ -An Airflow operator for AWS Batch services +An Airflow operator for AWS Batch services. .. seealso:: @@ -46,7 +46,7 @@ class BatchOperator(BaseOperator): """ - Execute a job on AWS Batch + Execute a job on AWS Batch. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -193,7 +193,7 @@ def hook(self) -> BatchClientHook: def execute(self, context: Context): """ - Submit and monitor an AWS Batch job + Submit and monitor an AWS Batch job. :raises: AirflowException """ @@ -210,7 +210,7 @@ def on_kill(self): def submit_job(self, context: Context): """ - Submit an AWS Batch job + Submit an AWS Batch job. :raises: AirflowException """ @@ -265,7 +265,7 @@ def monitor_job(self, context: Context): Monitor an AWS Batch job monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout is given while creating the task. These exceptions should be handled in taskinstance.py - instead of here like it was previously done + instead of here like it was previously done. :raises: AirflowException """ @@ -331,7 +331,7 @@ def monitor_job(self, context: Context): class BatchCreateComputeEnvironmentOperator(BaseOperator): """ - Create an AWS Batch compute environment + Create an AWS Batch compute environment. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -406,7 +406,7 @@ def __init__( @cached_property def hook(self): - """Create and return a BatchClientHook""" + """Create and return a BatchClientHook.""" return BatchClientHook( max_retries=self.max_retries, status_retries=self.status_retries, @@ -415,7 +415,7 @@ def hook(self): ) def execute(self, context: Context): - """Create an AWS batch compute environment""" + """Create an AWS batch compute environment.""" kwargs: dict[str, Any] = { "computeEnvironmentName": self.compute_environment_name, "type": self.environment_type, diff --git a/airflow/providers/amazon/aws/operators/datasync.py b/airflow/providers/amazon/aws/operators/datasync.py index 508d87e163658..76b288a54805f 100644 --- a/airflow/providers/amazon/aws/operators/datasync.py +++ b/airflow/providers/amazon/aws/operators/datasync.py @@ -261,7 +261,7 @@ def _get_tasks_and_locations(self) -> None: self.log.info("Found candidate DataSync TaskArns %s", self.candidate_task_arns) def choose_task(self, task_arn_list: list) -> str | None: - """Select 1 DataSync TaskArn from a list""" + """Select 1 DataSync TaskArn from a list.""" if not task_arn_list: return None if len(task_arn_list) == 1: @@ -275,7 +275,7 @@ def choose_task(self, task_arn_list: list) -> str | None: raise AirflowException(f"Unable to choose a Task from {task_arn_list}") def choose_location(self, location_arn_list: list[str] | None) -> str | None: - """Select 1 DataSync LocationArn from a list""" + """Select 1 DataSync LocationArn from a list.""" if not location_arn_list: return None if len(location_arn_list) == 1: diff --git a/airflow/providers/amazon/aws/operators/dms.py b/airflow/providers/amazon/aws/operators/dms.py index 6303afcbaf318..afa81ab040bae 100644 --- a/airflow/providers/amazon/aws/operators/dms.py +++ b/airflow/providers/amazon/aws/operators/dms.py @@ -88,7 +88,7 @@ def __init__( def execute(self, context: Context): """ - Creates AWS DMS replication task from Airflow + Creates AWS DMS replication task from Airflow. :return: replication task arn """ @@ -141,7 +141,7 @@ def __init__( def execute(self, context: Context): """ - Deletes AWS DMS replication task from Airflow + Deletes AWS DMS replication task from Airflow. :return: replication task arn """ @@ -183,7 +183,7 @@ def __init__( def execute(self, context: Context) -> tuple[str | None, list]: """ - Describes AWS DMS replication tasks from Airflow + Describes AWS DMS replication tasks from Airflow. :return: Marker and list of replication tasks """ @@ -235,7 +235,7 @@ def __init__( def execute(self, context: Context): """ - Starts AWS DMS replication task from Airflow + Starts AWS DMS replication task from Airflow. :return: replication task arn """ @@ -282,7 +282,7 @@ def __init__( def execute(self, context: Context): """ - Stops AWS DMS replication task from Airflow + Stops AWS DMS replication task from Airflow. :return: replication task arn """ diff --git a/airflow/providers/amazon/aws/operators/ec2.py b/airflow/providers/amazon/aws/operators/ec2.py index 5f6b76ce154f9..b9de533378324 100644 --- a/airflow/providers/amazon/aws/operators/ec2.py +++ b/airflow/providers/amazon/aws/operators/ec2.py @@ -120,7 +120,7 @@ def execute(self, context: Context): class EC2CreateInstanceOperator(BaseOperator): """ - Create and start a specified number of EC2 Instances using boto3 + Create and start a specified number of EC2 Instances using boto3. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -201,7 +201,7 @@ def execute(self, context: Context): class EC2TerminateInstanceOperator(BaseOperator): """ - Terminate EC2 Instances using boto3 + Terminate EC2 Instances using boto3. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/ecs.py b/airflow/providers/amazon/aws/operators/ecs.py index eb031d24148f6..c2a065c28af28 100644 --- a/airflow/providers/amazon/aws/operators/ecs.py +++ b/airflow/providers/amazon/aws/operators/ecs.py @@ -335,7 +335,7 @@ def execute(self, context: Context): class EcsRunTaskOperator(EcsBaseOperator): """ - Execute a task on AWS ECS (Elastic Container Service) + Execute a task on AWS ECS (Elastic Container Service). .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 3e36231993c12..ba162a59e395e 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -415,7 +415,7 @@ def hook(self) -> EmrContainerHook: return EmrContainerHook(self.aws_conn_id) def execute(self, context: Context) -> str | None: - """Create EMR on EKS virtual Cluster""" + """Create EMR on EKS virtual Cluster.""" self.virtual_cluster_id = self.hook.create_emr_on_eks_cluster( self.virtual_cluster_name, self.eks_cluster_name, self.eks_namespace, self.tags ) @@ -514,7 +514,7 @@ def hook(self) -> EmrContainerHook: ) def execute(self, context: Context) -> str | None: - """Run job on EMR Containers""" + """Run job on EMR Containers.""" self.job_id = self.hook.submit_job( self.name, self.execution_role_arn, @@ -546,7 +546,7 @@ def execute(self, context: Context) -> str | None: return self.job_id def on_kill(self) -> None: - """Cancel the submitted job run""" + """Cancel the submitted job run.""" if self.job_id: self.log.info("Stopping job run with jobId - %s", self.job_id) response = self.hook.stop_query(self.job_id) @@ -839,7 +839,7 @@ def execute(self, context: Context) -> None: class EmrServerlessCreateApplicationOperator(BaseOperator): """ - Operator to create Serverless EMR Application + Operator to create Serverless EMR Application. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -1057,7 +1057,7 @@ def execute(self, context: Context) -> str | None: return self.job_id def on_kill(self) -> None: - """Cancel the submitted job run""" + """Cancel the submitted job run.""" if self.job_id: self.log.info("Stopping job run with jobId - %s", self.job_id) response = self.hook.conn.cancel_job_run(applicationId=self.application_id, jobRunId=self.job_id) @@ -1090,7 +1090,7 @@ def on_kill(self) -> None: class EmrServerlessStopApplicationOperator(BaseOperator): """ - Operator to stop an EMR Serverless application + Operator to stop an EMR Serverless application. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -1168,7 +1168,7 @@ def execute(self, context: Context) -> None: class EmrServerlessDeleteApplicationOperator(EmrServerlessStopApplicationOperator): """ - Operator to delete EMR Serverless application + Operator to delete EMR Serverless application. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/glacier.py b/airflow/providers/amazon/aws/operators/glacier.py index 4e7c8b5e17421..54123e586dab1 100644 --- a/airflow/providers/amazon/aws/operators/glacier.py +++ b/airflow/providers/amazon/aws/operators/glacier.py @@ -28,7 +28,7 @@ class GlacierCreateJobOperator(BaseOperator): """ - Initiate an Amazon Glacier inventory-retrieval job + Initiate an Amazon Glacier inventory-retrieval job. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -58,7 +58,7 @@ def execute(self, context: Context): class GlacierUploadArchiveOperator(BaseOperator): """ - This operator add an archive to an Amazon S3 Glacier vault + This operator add an archive to an Amazon S3 Glacier vault. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/glue.py b/airflow/providers/amazon/aws/operators/glue.py index 497df84d31418..a0c3bf560d4d9 100644 --- a/airflow/providers/amazon/aws/operators/glue.py +++ b/airflow/providers/amazon/aws/operators/glue.py @@ -34,7 +34,7 @@ class GlueJobOperator(BaseOperator): """ Creates an AWS Glue Job. AWS Glue is a serverless Spark ETL service for running Spark Jobs on the AWS cloud. - Language support: Python and Scala + Language support: Python and Scala. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -117,7 +117,7 @@ def __init__( def execute(self, context: Context): """ - Executes AWS Glue Job from Airflow + Executes AWS Glue Job from Airflow. :return: the id of the current glue job. """ diff --git a/airflow/providers/amazon/aws/operators/glue_crawler.py b/airflow/providers/amazon/aws/operators/glue_crawler.py index 59ba2031fdd7e..d689b3164518b 100644 --- a/airflow/providers/amazon/aws/operators/glue_crawler.py +++ b/airflow/providers/amazon/aws/operators/glue_crawler.py @@ -69,7 +69,7 @@ def hook(self) -> GlueCrawlerHook: def execute(self, context: Context): """ - Executes AWS Glue Crawler from Airflow + Executes AWS Glue Crawler from Airflow. :return: the name of the current glue crawler. """ diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index df010261896fd..4d9b0889789ae 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -32,7 +32,7 @@ class RdsBaseOperator(BaseOperator): - """Base operator that implements common functions for all operators""" + """Base operator that implements common functions for all operators.""" ui_color = "#eeaa88" ui_fgcolor = "#ffffff" @@ -45,11 +45,11 @@ def __init__(self, *args, aws_conn_id: str = "aws_conn_id", hook_params: dict | self._await_interval = 60 # seconds def execute(self, context: Context) -> str: - """Different implementations for snapshots, tasks and events""" + """Different implementations for snapshots, tasks and events.""" raise NotImplementedError def on_kill(self) -> None: - """Different implementations for snapshots, tasks and events""" + """Different implementations for snapshots, tasks and events.""" raise NotImplementedError @@ -124,7 +124,7 @@ def execute(self, context: Context) -> str: class RdsCopyDbSnapshotOperator(RdsBaseOperator): """ - Copies the specified DB instance or DB cluster snapshot + Copies the specified DB instance or DB cluster snapshot. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -230,7 +230,7 @@ def execute(self, context: Context) -> str: class RdsDeleteDbSnapshotOperator(RdsBaseOperator): """ - Deletes a DB instance or cluster snapshot or terminating the copy operation + Deletes a DB instance or cluster snapshot or terminating the copy operation. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -353,7 +353,7 @@ def execute(self, context: Context) -> str: class RdsCancelExportTaskOperator(RdsBaseOperator): """ - Cancels an export task in progress that is exporting a snapshot to Amazon S3 + Cancels an export task in progress that is exporting a snapshot to Amazon S3. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -396,7 +396,7 @@ def execute(self, context: Context) -> str: class RdsCreateEventSubscriptionOperator(RdsBaseOperator): """ - Creates an RDS event notification subscription + Creates an RDS event notification subscription. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -470,7 +470,7 @@ def execute(self, context: Context) -> str: class RdsDeleteEventSubscriptionOperator(RdsBaseOperator): """ - Deletes an RDS event notification subscription + Deletes an RDS event notification subscription. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -507,7 +507,7 @@ def execute(self, context: Context) -> str: class RdsCreateDbInstanceOperator(RdsBaseOperator): """ - Creates an RDS DB instance + Creates an RDS DB instance. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -561,7 +561,7 @@ def execute(self, context: Context) -> str: class RdsDeleteDbInstanceOperator(RdsBaseOperator): """ - Deletes an RDS DB Instance + Deletes an RDS DB Instance. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -605,7 +605,7 @@ def execute(self, context: Context) -> str: class RdsStartDbOperator(RdsBaseOperator): """ - Starts an RDS DB instance / cluster + Starts an RDS DB instance / cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -658,7 +658,7 @@ def _wait_until_db_available(self): class RdsStopDbOperator(RdsBaseOperator): """ - Stops an RDS DB instance / cluster + Stops an RDS DB instance / cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py b/airflow/providers/amazon/aws/operators/redshift_cluster.py index 2880240b1532d..36e7a7df07cfa 100644 --- a/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -288,7 +288,7 @@ def execute_complete(self, context, event=None): class RedshiftCreateClusterSnapshotOperator(BaseOperator): """ - Creates a manual snapshot of the specified cluster. The cluster must be in the available state + Creates a manual snapshot of the specified cluster. The cluster must be in the available state. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -361,7 +361,7 @@ def execute(self, context: Context) -> Any: class RedshiftDeleteClusterSnapshotOperator(BaseOperator): """ - Deletes the specified manual snapshot + Deletes the specified manual snapshot. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -416,7 +416,7 @@ def get_status(self) -> str: class RedshiftResumeClusterOperator(BaseOperator): """ - Resume a paused AWS Redshift Cluster + Resume a paused AWS Redshift Cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/redshift_data.py b/airflow/providers/amazon/aws/operators/redshift_data.py index b0fad66faedfb..9bf2f34d16f09 100644 --- a/airflow/providers/amazon/aws/operators/redshift_data.py +++ b/airflow/providers/amazon/aws/operators/redshift_data.py @@ -31,7 +31,7 @@ class RedshiftDataOperator(BaseOperator): """ - Executes SQL Statements against an Amazon Redshift cluster using Redshift Data + Executes SQL Statements against an Amazon Redshift cluster using Redshift Data. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -112,7 +112,7 @@ def hook(self) -> RedshiftDataHook: return RedshiftDataHook(aws_conn_id=self.aws_conn_id, region_name=self.region) def execute(self, context: Context) -> GetStatementResultResponseTypeDef | str: - """Execute a statement against Amazon Redshift""" + """Execute a statement against Amazon Redshift.""" self.log.info("Executing statement: %s", self.sql) self.statement_id = self.hook.execute_query( @@ -136,7 +136,7 @@ def execute(self, context: Context) -> GetStatementResultResponseTypeDef | str: return self.statement_id def on_kill(self) -> None: - """Cancel the submitted redshift query""" + """Cancel the submitted redshift query.""" if self.statement_id: self.log.info("Received a kill signal.") self.log.info("Stopping Query with statementId - %s", self.statement_id) diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py index d9ab1ab75c62e..8357a21ed6fa3 100644 --- a/airflow/providers/amazon/aws/operators/s3.py +++ b/airflow/providers/amazon/aws/operators/s3.py @@ -37,7 +37,7 @@ class S3CreateBucketOperator(BaseOperator): """ - This operator creates an S3 bucket + This operator creates an S3 bucket. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -78,7 +78,7 @@ def execute(self, context: Context): class S3DeleteBucketOperator(BaseOperator): """ - This operator deletes an S3 bucket + This operator deletes an S3 bucket. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -118,7 +118,7 @@ def execute(self, context: Context): class S3GetBucketTaggingOperator(BaseOperator): """ - This operator gets tagging from an S3 bucket + This operator gets tagging from an S3 bucket. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/sns.py b/airflow/providers/amazon/aws/operators/sns.py index 2f5b9844bf7f3..6b16dc074156c 100644 --- a/airflow/providers/amazon/aws/operators/sns.py +++ b/airflow/providers/amazon/aws/operators/sns.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Publish message to SNS queue""" +"""Publish message to SNS queue.""" from __future__ import annotations from typing import TYPE_CHECKING, Sequence diff --git a/airflow/providers/amazon/aws/operators/sqs.py b/airflow/providers/amazon/aws/operators/sqs.py index 0b0cfc4f16297..13bff5538f826 100644 --- a/airflow/providers/amazon/aws/operators/sqs.py +++ b/airflow/providers/amazon/aws/operators/sqs.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Publish message to SQS queue""" +"""Publish message to SQS queue.""" from __future__ import annotations from typing import TYPE_CHECKING, Sequence @@ -75,7 +75,7 @@ def __init__( def execute(self, context: Context) -> dict: """ - Publish the message to the Amazon SQS queue + Publish the message to the Amazon SQS queue. :param context: the context object :return: dict with information about the message sent diff --git a/airflow/providers/amazon/aws/secrets/secrets_manager.py b/airflow/providers/amazon/aws/secrets/secrets_manager.py index f075f6e5e9376..a970b7be65f59 100644 --- a/airflow/providers/amazon/aws/secrets/secrets_manager.py +++ b/airflow/providers/amazon/aws/secrets/secrets_manager.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Objects relating to sourcing secrets from AWS Secrets Manager""" +"""Objects relating to sourcing secrets from AWS Secrets Manager.""" from __future__ import annotations import json @@ -33,7 +33,7 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin): """ - Retrieves Connection or Variables from AWS Secrets Manager + Retrieves Connection or Variables from AWS Secrets Manager. Configurable via ``airflow.cfg`` like so: @@ -178,7 +178,7 @@ def __init__( @cached_property def client(self): - """Create a Secrets Manager client""" + """Create a Secrets Manager client.""" from airflow.providers.amazon.aws.hooks.base_aws import SessionFactory from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper @@ -198,7 +198,7 @@ def client(self): return session.client(service_name="secretsmanager", **client_kwargs) def _standardize_secret_keys(self, secret: dict[str, Any]) -> dict[str, Any]: - """Standardize the names of the keys in the dict. These keys align with""" + """Standardize the names of the keys in the dict. These keys align with.""" possible_words_for_conn_fields = { "login": ["login", "user", "username", "user_name"], "password": ["password", "pass", "key"], @@ -225,7 +225,7 @@ def _standardize_secret_keys(self, secret: dict[str, Any]) -> dict[str, Any]: return conn_d def _remove_escaping_in_secret_dict(self, secret: dict[str, Any]) -> dict[str, Any]: - """Un-escape secret values that are URL-encoded""" + """Un-escape secret values that are URL-encoded.""" for k, v in secret.copy().items(): if k == "extra" and isinstance(v, dict): # The old behavior was that extras were _not_ urlencoded inside the secret. @@ -239,7 +239,7 @@ def _remove_escaping_in_secret_dict(self, secret: dict[str, Any]) -> dict[str, A def get_conn_value(self, conn_id: str) -> str | None: """ - Get serialized representation of Connection + Get serialized representation of Connection. :param conn_id: connection id """ @@ -270,7 +270,8 @@ def get_conn_value(self, conn_id: str) -> str | None: def get_variable(self, key: str) -> str | None: """ - Get Airflow Variable + Get Airflow Variable. + :param key: Variable Key :return: Variable Value """ @@ -281,7 +282,8 @@ def get_variable(self, key: str) -> str | None: def get_config(self, key: str) -> str | None: """ - Get Airflow Configuration + Get Airflow Configuration. + :param key: Configuration Option Key :return: Configuration Option Value """ @@ -292,7 +294,8 @@ def get_config(self, key: str) -> str | None: def _get_secret(self, path_prefix, secret_id: str, lookup_pattern: str | None) -> str | None: """ - Get secret value from Secrets Manager + Get secret value from Secrets Manager. + :param path_prefix: Prefix for the Path to get Secret :param secret_id: Secret Key :param lookup_pattern: If provided, `secret_id` must match this pattern to look up the secret in diff --git a/airflow/providers/amazon/aws/secrets/systems_manager.py b/airflow/providers/amazon/aws/secrets/systems_manager.py index e4ec9a391f9bf..f0367c8b36c8f 100644 --- a/airflow/providers/amazon/aws/secrets/systems_manager.py +++ b/airflow/providers/amazon/aws/secrets/systems_manager.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Objects relating to sourcing connections from AWS SSM Parameter Store""" +"""Objects relating to sourcing connections from AWS SSM Parameter Store.""" from __future__ import annotations import re @@ -28,7 +28,7 @@ class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin): """ - Retrieves Connection or Variables from AWS SSM Parameter Store + Retrieves Connection or Variables from AWS SSM Parameter Store. Configurable via ``airflow.cfg`` like so: @@ -112,7 +112,7 @@ def __init__( @cached_property def client(self): - """Create a SSM client""" + """Create a SSM client.""" from airflow.providers.amazon.aws.hooks.base_aws import SessionFactory from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper @@ -133,7 +133,7 @@ def client(self): def get_conn_value(self, conn_id: str) -> str | None: """ - Get param value + Get param value. :param conn_id: connection id """ @@ -144,7 +144,7 @@ def get_conn_value(self, conn_id: str) -> str | None: def get_variable(self, key: str) -> str | None: """ - Get Airflow Variable + Get Airflow Variable. :param key: Variable Key :return: Variable Value @@ -156,7 +156,7 @@ def get_variable(self, key: str) -> str | None: def get_config(self, key: str) -> str | None: """ - Get Airflow Configuration + Get Airflow Configuration. :param key: Configuration Option Key :return: Configuration Option Value @@ -190,7 +190,7 @@ def _get_secret(self, path_prefix: str, secret_id: str, lookup_pattern: str | No def _ensure_leading_slash(self, ssm_path: str): """ - AWS Systems Manager mandate to have a leading "/". Adding it dynamically if not there to the SSM path + AWS Systems Manager mandate to have a leading "/". Adding it dynamically if not there to the SSM path. :param ssm_path: SSM parameter path """ diff --git a/airflow/providers/amazon/aws/sensors/athena.py b/airflow/providers/amazon/aws/sensors/athena.py index 1954d15a8e004..7869d108d5377 100644 --- a/airflow/providers/amazon/aws/sensors/athena.py +++ b/airflow/providers/amazon/aws/sensors/athena.py @@ -87,5 +87,5 @@ def poke(self, context: Context) -> bool: @cached_property def hook(self) -> AthenaHook: - """Create and return an AthenaHook""" + """Create and return an AthenaHook.""" return AthenaHook(self.aws_conn_id, sleep_time=self.sleep_time) diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index 26a5e910a7e1a..1584c8746ea43 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/providers/amazon/aws/sensors/batch.py @@ -77,7 +77,7 @@ def poke(self, context: Context) -> bool: @deprecated(reason="use `hook` property instead.") def get_hook(self) -> BatchClientHook: - """Create and return a BatchClientHook""" + """Create and return a BatchClientHook.""" return self.hook @cached_property @@ -122,7 +122,7 @@ def __init__( @cached_property def hook(self) -> BatchClientHook: - """Create and return a BatchClientHook""" + """Create and return a BatchClientHook.""" return BatchClientHook( aws_conn_id=self.aws_conn_id, region_name=self.region_name, @@ -188,7 +188,7 @@ def __init__( @cached_property def hook(self) -> BatchClientHook: - """Create and return a BatchClientHook""" + """Create and return a BatchClientHook.""" return BatchClientHook( aws_conn_id=self.aws_conn_id, region_name=self.region_name, diff --git a/airflow/providers/amazon/aws/sensors/cloud_formation.py b/airflow/providers/amazon/aws/sensors/cloud_formation.py index d2bd45592654f..92b9f337f3994 100644 --- a/airflow/providers/amazon/aws/sensors/cloud_formation.py +++ b/airflow/providers/amazon/aws/sensors/cloud_formation.py @@ -61,7 +61,7 @@ def poke(self, context: Context): @cached_property def hook(self) -> CloudFormationHook: - """Create and return a CloudFormationHook""" + """Create and return a CloudFormationHook.""" return CloudFormationHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) @@ -105,5 +105,5 @@ def poke(self, context: Context): @cached_property def hook(self) -> CloudFormationHook: - """Create and return a CloudFormationHook""" + """Create and return a CloudFormationHook.""" return CloudFormationHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) diff --git a/airflow/providers/amazon/aws/sensors/dms.py b/airflow/providers/amazon/aws/sensors/dms.py index 9e2e9ea63cc56..c2c3edd1502f5 100644 --- a/airflow/providers/amazon/aws/sensors/dms.py +++ b/airflow/providers/amazon/aws/sensors/dms.py @@ -64,7 +64,7 @@ def __init__( @deprecated(reason="use `hook` property instead.") def get_hook(self) -> DmsHook: - """Get DmsHook""" + """Get DmsHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/dynamodb.py b/airflow/providers/amazon/aws/sensors/dynamodb.py index a63c97177edb5..6da91986d852e 100644 --- a/airflow/providers/amazon/aws/sensors/dynamodb.py +++ b/airflow/providers/amazon/aws/sensors/dynamodb.py @@ -80,7 +80,7 @@ def __init__( self.region_name = region_name def poke(self, context: Context) -> bool: - """Test DynamoDB item for matching attribute value""" + """Test DynamoDB item for matching attribute value.""" key = {self.partition_key_name: self.partition_key_value} msg = ( f"Checking table {self.table_name} for " @@ -110,5 +110,5 @@ def poke(self, context: Context) -> bool: @cached_property def hook(self) -> DynamoDBHook: - """Create and return a DynamoDBHook""" + """Create and return a DynamoDBHook.""" return DynamoDBHook(self.aws_conn_id, region_name=self.region_name) diff --git a/airflow/providers/amazon/aws/sensors/emr.py b/airflow/providers/amazon/aws/sensors/emr.py index 8bb97f0ac46ae..e0055007d2416 100644 --- a/airflow/providers/amazon/aws/sensors/emr.py +++ b/airflow/providers/amazon/aws/sensors/emr.py @@ -157,7 +157,7 @@ def poke(self, context: Context) -> bool: @cached_property def hook(self) -> EmrServerlessHook: - """Create and return an EmrServerlessHook""" + """Create and return an EmrServerlessHook.""" return EmrServerlessHook(aws_conn_id=self.aws_conn_id) @staticmethod @@ -213,7 +213,7 @@ def poke(self, context: Context) -> bool: @cached_property def hook(self) -> EmrServerlessHook: - """Create and return an EmrServerlessHook""" + """Create and return an EmrServerlessHook.""" return EmrServerlessHook(aws_conn_id=self.aws_conn_id) @staticmethod @@ -293,7 +293,7 @@ def poke(self, context: Context) -> bool: @cached_property def hook(self) -> EmrContainerHook: - """Create and return an EmrContainerHook""" + """Create and return an EmrContainerHook.""" return EmrContainerHook(self.aws_conn_id, virtual_cluster_id=self.virtual_cluster_id) diff --git a/airflow/providers/amazon/aws/sensors/glacier.py b/airflow/providers/amazon/aws/sensors/glacier.py index 222027b2792be..b56327264e016 100644 --- a/airflow/providers/amazon/aws/sensors/glacier.py +++ b/airflow/providers/amazon/aws/sensors/glacier.py @@ -30,7 +30,7 @@ class JobStatus(Enum): - """Glacier jobs description""" + """Glacier jobs description.""" IN_PROGRESS = "InProgress" SUCCEEDED = "Succeeded" diff --git a/airflow/providers/amazon/aws/sensors/glue.py b/airflow/providers/amazon/aws/sensors/glue.py index 761a51609bedc..f58c10bf01a21 100644 --- a/airflow/providers/amazon/aws/sensors/glue.py +++ b/airflow/providers/amazon/aws/sensors/glue.py @@ -30,7 +30,8 @@ class GlueJobSensor(BaseSensorOperator): """ - Waits for an AWS Glue Job to reach any of the status below + Waits for an AWS Glue Job to reach any of the status below. + 'FAILED', 'STOPPED', 'SUCCEEDED' .. seealso:: diff --git a/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py b/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py index d86136746687c..3b8a0c5b6b800 100644 --- a/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py +++ b/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py @@ -76,7 +76,7 @@ def __init__( self.database_name = database_name def poke(self, context: Context): - """Checks for existence of the partition in the AWS Glue Catalog table""" + """Checks for existence of the partition in the AWS Glue Catalog table.""" if "." in self.table_name: self.database_name, self.table_name = self.table_name.split(".") self.log.info( @@ -87,7 +87,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> GlueCatalogHook: - """Gets the GlueCatalogHook""" + """Gets the GlueCatalogHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/glue_crawler.py b/airflow/providers/amazon/aws/sensors/glue_crawler.py index 6b8b4fcaea26d..2bb57a6a14b61 100644 --- a/airflow/providers/amazon/aws/sensors/glue_crawler.py +++ b/airflow/providers/amazon/aws/sensors/glue_crawler.py @@ -32,7 +32,8 @@ class GlueCrawlerSensor(BaseSensorOperator): """ - Waits for an AWS Glue crawler to reach any of the statuses below + Waits for an AWS Glue crawler to reach any of the statuses below. + 'FAILED', 'CANCELLED', 'SUCCEEDED' .. seealso:: @@ -68,7 +69,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> GlueCrawlerHook: - """Returns a new or pre-existing GlueCrawlerHook""" + """Returns a new or pre-existing GlueCrawlerHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/rds.py b/airflow/providers/amazon/aws/sensors/rds.py index 50f197ef0c48d..91b4ad0e35c85 100644 --- a/airflow/providers/amazon/aws/sensors/rds.py +++ b/airflow/providers/amazon/aws/sensors/rds.py @@ -29,7 +29,7 @@ class RdsBaseSensor(BaseSensorOperator): - """Base operator that implements common functions for all sensors""" + """Base operator that implements common functions for all sensors.""" ui_color = "#ddbb77" ui_fgcolor = "#ffffff" @@ -140,7 +140,7 @@ def poke(self, context: Context): class RdsDbSensor(RdsBaseSensor): """ - Waits for an RDS instance or cluster to enter one of a number of states + Waits for an RDS instance or cluster to enter one of a number of states. .. seealso:: For more information on how to use this sensor, take a look at the guide: diff --git a/airflow/providers/amazon/aws/sensors/redshift_cluster.py b/airflow/providers/amazon/aws/sensors/redshift_cluster.py index 653ccaf0015ef..f9b2463c6aa72 100644 --- a/airflow/providers/amazon/aws/sensors/redshift_cluster.py +++ b/airflow/providers/amazon/aws/sensors/redshift_cluster.py @@ -67,7 +67,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> RedshiftHook: - """Create and return a RedshiftHook""" + """Create and return a RedshiftHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/s3.py b/airflow/providers/amazon/aws/sensors/s3.py index 407a054184748..29cb42295a0cf 100644 --- a/airflow/providers/amazon/aws/sensors/s3.py +++ b/airflow/providers/amazon/aws/sensors/s3.py @@ -133,7 +133,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> S3Hook: - """Create and return an S3Hook""" + """Create and return an S3Hook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/sqs.py b/airflow/providers/amazon/aws/sensors/sqs.py index 6dc032c3fe95b..f7f7a1b3bfc58 100644 --- a/airflow/providers/amazon/aws/sensors/sqs.py +++ b/airflow/providers/amazon/aws/sensors/sqs.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Reads and then deletes the message from SQS queue""" +"""Reads and then deletes the message from SQS queue.""" from __future__ import annotations import json @@ -185,7 +185,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> SqsHook: - """Create and return an SqsHook""" + """Create and return an SqsHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/step_function.py b/airflow/providers/amazon/aws/sensors/step_function.py index 2a0c8b10db993..adfb66f16e3ce 100644 --- a/airflow/providers/amazon/aws/sensors/step_function.py +++ b/airflow/providers/amazon/aws/sensors/step_function.py @@ -89,7 +89,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> StepFunctionHook: - """Create and return a StepFunctionHook""" + """Create and return a StepFunctionHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/transfers/base.py b/airflow/providers/amazon/aws/transfers/base.py index b8ebfb55eb874..2a58c4844962a 100644 --- a/airflow/providers/amazon/aws/transfers/base.py +++ b/airflow/providers/amazon/aws/transfers/base.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains base AWS to AWS transfer operator""" +"""This module contains base AWS to AWS transfer operator.""" from __future__ import annotations import warnings @@ -33,7 +33,7 @@ class AwsToAwsBaseOperator(BaseOperator): """ - Base class for AWS to AWS transfer operators + Base class for AWS to AWS transfer operators. :param source_aws_conn_id: The Airflow connection used for AWS credentials to access DynamoDB. If this is None or empty then the default boto3 diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py index 48067a666fad2..1134a38e4372f 100644 --- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py @@ -41,7 +41,7 @@ class JSONEncoder(json.JSONEncoder): - """Custom json encoder implementation""" + """Custom json encoder implementation.""" def default(self, obj): """Convert decimal objects in a json serializable format.""" @@ -136,7 +136,7 @@ def __init__( @cached_property def hook(self): - """Create DynamoDBHook""" + """Create DynamoDBHook.""" return DynamoDBHook(aws_conn_id=self.source_aws_conn_id) def execute(self, context: Context) -> None: diff --git a/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py b/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py index 3814d3f44ff57..60ee579025737 100644 --- a/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py +++ b/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py @@ -30,7 +30,7 @@ class GlacierToGCSOperator(BaseOperator): """ - Transfers data from Amazon Glacier to Google Cloud Storage + Transfers data from Amazon Glacier to Google Cloud Storage. .. note:: Please be warn that GlacierToGCSOperator may depends on memory usage. diff --git a/airflow/providers/amazon/aws/transfers/mongo_to_s3.py b/airflow/providers/amazon/aws/transfers/mongo_to_s3.py index 64390d9e351ab..d7432c395902d 100644 --- a/airflow/providers/amazon/aws/transfers/mongo_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/mongo_to_s3.py @@ -93,7 +93,7 @@ def __init__( self.compression = compression def execute(self, context: Context): - """Is written to depend on transform method""" + """Is written to depend on transform method.""" s3_conn = S3Hook(self.aws_conn_id) # Grab collection and execute query according to whether or not it is a pipeline @@ -129,7 +129,7 @@ def execute(self, context: Context): def _stringify(iterable: Iterable, joinable: str = "\n") -> str: """ Takes an iterable (pymongo Cursor or Array) containing dictionaries and - returns a stringified version using python join + returns a stringified version using python join. """ return joinable.join([json.dumps(doc, default=json_util.default) for doc in iterable]) @@ -138,7 +138,7 @@ def transform(docs: Any) -> Any: """This method is meant to be extended by child classes to perform transformations unique to those operators needs. Processes pyMongo cursor and returns an iterable with each element being - a JSON serializable dictionary + a JSON serializable dictionary. Base transform() assumes no processing is needed ie. docs is a pyMongo cursor of documents and cursor just diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py index f573c73ca4327..0d2a059f6e727 100644 --- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py +++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py @@ -34,7 +34,7 @@ class S3ToRedshiftOperator(BaseOperator): """ - Executes an COPY command to load files from s3 to Redshift + Executes an COPY command to load files from s3 to Redshift. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/transfers/s3_to_sql.py b/airflow/providers/amazon/aws/transfers/s3_to_sql.py index 99d4fab6bd4ed..b9474f38c3251 100644 --- a/airflow/providers/amazon/aws/transfers/s3_to_sql.py +++ b/airflow/providers/amazon/aws/transfers/s3_to_sql.py @@ -33,7 +33,7 @@ class S3ToSqlOperator(BaseOperator): """ Loads Data from S3 into a SQL Database. You need to provide a parser function that takes a filename as an input - and returns an iterable of rows + and returns an iterable of rows. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/transfers/sql_to_s3.py b/airflow/providers/amazon/aws/transfers/sql_to_s3.py index 8cee9b6cffb15..9dc9602ff1f31 100644 --- a/airflow/providers/amazon/aws/transfers/sql_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/sql_to_s3.py @@ -186,7 +186,7 @@ def execute(self, context: Context) -> None: ) def _partition_dataframe(self, df: DataFrame) -> Iterable[tuple[str, DataFrame]]: - """Partition dataframe using pandas groupby() method""" + """Partition dataframe using pandas groupby() method.""" if not self.groupby_kwargs: yield "", df else: diff --git a/airflow/providers/amazon/aws/triggers/redshift_cluster.py b/airflow/providers/amazon/aws/triggers/redshift_cluster.py index ef19d0b5a1d66..61d7f575e0629 100644 --- a/airflow/providers/amazon/aws/triggers/redshift_cluster.py +++ b/airflow/providers/amazon/aws/triggers/redshift_cluster.py @@ -24,7 +24,7 @@ class RedshiftClusterTrigger(BaseTrigger): - """AWS Redshift trigger""" + """AWS Redshift trigger.""" def __init__( self, diff --git a/airflow/providers/amazon/aws/utils/connection_wrapper.py b/airflow/providers/amazon/aws/utils/connection_wrapper.py index 9a62dc2c84ef4..3e13a25f0c65c 100644 --- a/airflow/providers/amazon/aws/utils/connection_wrapper.py +++ b/airflow/providers/amazon/aws/utils/connection_wrapper.py @@ -426,7 +426,7 @@ def _parse_s3_config( ) -> tuple[str | None, str | None]: """ Parses a config file for s3 credentials. Can currently - parse boto, s3cmd.conf and AWS SDK config formats + parse boto, s3cmd.conf and AWS SDK config formats. :param config_file_name: path to the config file :param config_format: config type. One of "boto", "s3cmd" or "aws". diff --git a/airflow/providers/amazon/aws/utils/emailer.py b/airflow/providers/amazon/aws/utils/emailer.py index 3e00abc78a08c..5a5cdd15c8bc0 100644 --- a/airflow/providers/amazon/aws/utils/emailer.py +++ b/airflow/providers/amazon/aws/utils/emailer.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Airflow module for email backend using AWS SES""" +"""Airflow module for email backend using AWS SES.""" from __future__ import annotations from typing import Any diff --git a/airflow/providers/amazon/aws/utils/rds.py b/airflow/providers/amazon/aws/utils/rds.py index 873f2cf83ecf0..1ba511c4b4e9a 100644 --- a/airflow/providers/amazon/aws/utils/rds.py +++ b/airflow/providers/amazon/aws/utils/rds.py @@ -20,7 +20,7 @@ class RdsDbType(Enum): - """Only available types for the RDS""" + """Only available types for the RDS.""" INSTANCE: str = "instance" CLUSTER: str = "cluster" diff --git a/airflow/providers/amazon/aws/utils/redshift.py b/airflow/providers/amazon/aws/utils/redshift.py index d931cb047430d..1ef490422d2d0 100644 --- a/airflow/providers/amazon/aws/utils/redshift.py +++ b/airflow/providers/amazon/aws/utils/redshift.py @@ -26,7 +26,8 @@ def build_credentials_block(credentials: ReadOnlyCredentials) -> str: """ Generate AWS credentials block for Redshift COPY and UNLOAD - commands, as noted in AWS docs + commands, as noted in AWS docs. + https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html#copy-credentials :param credentials: ReadOnlyCredentials object from `botocore` From 947bdd05a4cfe517d886a858608230bed304c0fe Mon Sep 17 00:00:00 2001 From: Vincent Beck Date: Thu, 1 Jun 2023 10:30:09 -0400 Subject: [PATCH 2/2] Apply suggestions --- airflow/providers/amazon/aws/hooks/athena.py | 6 ++++-- airflow/providers/amazon/aws/hooks/base_aws.py | 1 + airflow/providers/amazon/aws/hooks/batch_client.py | 6 +++--- airflow/providers/amazon/aws/hooks/ecs.py | 12 ++++++------ airflow/providers/amazon/aws/hooks/redshift_sql.py | 5 +++-- airflow/providers/amazon/aws/hooks/s3.py | 9 ++++++--- 6 files changed, 23 insertions(+), 16 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/athena.py b/airflow/providers/amazon/aws/hooks/athena.py index 914ecbc2b03e9..c0c06a26fdc85 100644 --- a/airflow/providers/amazon/aws/hooks/athena.py +++ b/airflow/providers/amazon/aws/hooks/athena.py @@ -152,8 +152,9 @@ def get_query_results( self, query_execution_id: str, next_token_id: str | None = None, max_results: int = 1000 ) -> dict | None: """ - Fetch submitted athena query results. returns none if query is in intermediate state or - failed/cancelled state else dict of query output. + Fetch submitted athena query results. + + Returns none if query is in intermediate state or failed/cancelled state else dict of query output. .. seealso:: - :external+boto3:py:meth:`Athena.Client.get_query_results` @@ -227,6 +228,7 @@ def poll_query_status( ) -> str | None: """ Poll the status of submitted athena query until query state reaches final state. + Returns one of the final states. :param query_execution_id: Id of submitted athena query diff --git a/airflow/providers/amazon/aws/hooks/base_aws.py b/airflow/providers/amazon/aws/hooks/base_aws.py index aaecb47040bbd..ee8d849c1bbf2 100644 --- a/airflow/providers/amazon/aws/hooks/base_aws.py +++ b/airflow/providers/amazon/aws/hooks/base_aws.py @@ -583,6 +583,7 @@ def get_session(self, region_name: str | None = None, deferrable: bool = False) def _get_config(self, config: Config | None = None) -> Config: """ No AWS Operators use the config argument to this method. + Keep backward compatibility with other users who might use it. """ if config is None: diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py index 449fa6069beb9..4f6e217341a0c 100644 --- a/airflow/providers/amazon/aws/hooks/batch_client.py +++ b/airflow/providers/amazon/aws/hooks/batch_client.py @@ -230,11 +230,11 @@ def terminate_job(self, job_id: str, reason: str) -> dict: def check_job_success(self, job_id: str) -> bool: """ - Check the final status of the Batch job; return True if the job - 'SUCCEEDED', else raise an AirflowException. + Check the final status of the Batch job. - :param job_id: a Batch job ID + Return True if the job 'SUCCEEDED', else raise an AirflowException. + :param job_id: a Batch job ID :raises: AirflowException """ diff --git a/airflow/providers/amazon/aws/hooks/ecs.py b/airflow/providers/amazon/aws/hooks/ecs.py index 13178372ef0bb..5f74b4c138d35 100644 --- a/airflow/providers/amazon/aws/hooks/ecs.py +++ b/airflow/providers/amazon/aws/hooks/ecs.py @@ -226,42 +226,42 @@ class EcsProtocol(Protocol): """ def run_task(self, **kwargs) -> dict: - """Run a task + """Run a task. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task """ ... def get_waiter(self, x: str) -> Waiter: - """Get a waiter + """Get a waiter. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.get_waiter """ ... def describe_tasks(self, cluster: str, tasks) -> dict: - """Describe tasks + """Describe tasks. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_tasks """ ... def stop_task(self, cluster, task, reason: str) -> dict: - """Stop a task + """Stop a task. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.stop_task """ ... def describe_task_definition(self, taskDefinition: str) -> dict: - """Describe a task definition + """Describe a task definition. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_task_definition """ ... def list_tasks(self, cluster: str, launchType: str, desiredStatus: str, family: str) -> dict: - """List tasks + """List tasks. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.list_tasks """ diff --git a/airflow/providers/amazon/aws/hooks/redshift_sql.py b/airflow/providers/amazon/aws/hooks/redshift_sql.py index 1dd37251c4b3e..afc2e797c916e 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_sql.py +++ b/airflow/providers/amazon/aws/hooks/redshift_sql.py @@ -150,10 +150,11 @@ def get_sqlalchemy_engine(self, engine_kwargs=None): def get_table_primary_key(self, table: str, schema: str | None = "public") -> list[str] | None: """ - Helper method that returns the table primary key + Helper method that returns the table primary key. + :param table: Name of the target table :param schema: Name of the target schema, public by default - :return: Primary key columns list. + :return: Primary key columns list """ sql = """ select kcu.column_name diff --git a/airflow/providers/amazon/aws/hooks/s3.py b/airflow/providers/amazon/aws/hooks/s3.py index 777839c1c3c0f..0ed28b3960541 100644 --- a/airflow/providers/amazon/aws/hooks/s3.py +++ b/airflow/providers/amazon/aws/hooks/s3.py @@ -174,8 +174,9 @@ def extra_args(self): def parse_s3_url(s3url: str) -> tuple[str, str]: """ Parses the S3 Url into a bucket name and key. - See https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html - for valid url formats. + + See https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html + for valid url formats. :param s3url: The S3 Url to parse. :return: the parsed bucket name and key @@ -206,7 +207,9 @@ def get_s3_bucket_key( bucket: str | None, key: str, bucket_param_name: str, key_param_name: str ) -> tuple[str, str]: """ - Get the S3 bucket name and key from either: + Get the S3 bucket name and key. + + From either: - bucket name and key. Return the info as it is after checking `key` is a relative path. - key. Must be a full s3:// url.