diff --git a/airflow/providers/amazon/aws/hooks/appflow.py b/airflow/providers/amazon/aws/hooks/appflow.py index aaaf1cc5ace18..cc356880215fc 100644 --- a/airflow/providers/amazon/aws/hooks/appflow.py +++ b/airflow/providers/amazon/aws/hooks/appflow.py @@ -45,7 +45,7 @@ def __init__(self, *args, **kwargs) -> None: @cached_property def conn(self) -> AppflowClient: - """Get the underlying boto3 Appflow client (cached)""" + """Get the underlying boto3 Appflow client (cached).""" return super().conn def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str: diff --git a/airflow/providers/amazon/aws/hooks/athena.py b/airflow/providers/amazon/aws/hooks/athena.py index 88793d5165876..c0c06a26fdc85 100644 --- a/airflow/providers/amazon/aws/hooks/athena.py +++ b/airflow/providers/amazon/aws/hooks/athena.py @@ -77,7 +77,7 @@ def run_query( workgroup: str = "primary", ) -> str: """ - Run Presto query on athena with provided config and return submitted query_execution_id + Run Presto query on athena with provided config and return submitted query_execution_id. .. seealso:: - :external+boto3:py:meth:`Athena.Client.start_query_execution` @@ -152,8 +152,9 @@ def get_query_results( self, query_execution_id: str, next_token_id: str | None = None, max_results: int = 1000 ) -> dict | None: """ - Fetch submitted athena query results. returns none if query is in intermediate state or - failed/cancelled state else dict of query output + Fetch submitted athena query results. + + Returns none if query is in intermediate state or failed/cancelled state else dict of query output. .. seealso:: - :external+boto3:py:meth:`Athena.Client.get_query_results` @@ -188,7 +189,7 @@ def get_query_results_paginator( """ Fetch submitted athena query results. returns none if query is in intermediate state or failed/cancelled state else a paginator to iterate through pages of results. If you - wish to get all results at once, call build_full_result() on the returned PageIterator + wish to get all results at once, call build_full_result() on the returned PageIterator. .. seealso:: - :external+boto3:py:class:`Athena.Paginator.GetQueryResults` @@ -227,7 +228,8 @@ def poll_query_status( ) -> str | None: """ Poll the status of submitted athena query until query state reaches final state. - Returns one of the final states + + Returns one of the final states. :param query_execution_id: Id of submitted athena query :param max_polling_attempts: Number of times to poll for query state before function exits @@ -298,7 +300,7 @@ def get_output_location(self, query_execution_id: str) -> str: def stop_query(self, query_execution_id: str) -> dict: """ - Cancel the submitted athena query + Cancel the submitted athena query. .. seealso:: - :external+boto3:py:meth:`Athena.Client.stop_query_execution` diff --git a/airflow/providers/amazon/aws/hooks/base_aws.py b/airflow/providers/amazon/aws/hooks/base_aws.py index de52edd72fdca..120845b69ac1f 100644 --- a/airflow/providers/amazon/aws/hooks/base_aws.py +++ b/airflow/providers/amazon/aws/hooks/base_aws.py @@ -126,7 +126,7 @@ def config(self) -> Config | None: @property def role_arn(self) -> str | None: - """Assume Role ARN from AWS Connection""" + """Assume Role ARN from AWS Connection.""" return self.conn.role_arn def _apply_session_kwargs(self, session): @@ -585,7 +585,8 @@ def get_session(self, region_name: str | None = None, deferrable: bool = False) def _get_config(self, config: Config | None = None) -> Config: """ No AWS Operators use the config argument to this method. - Keep backward compatibility with other users who might use it + + Keep backward compatibility with other users who might use it. """ if config is None: config = deepcopy(self.config) @@ -605,7 +606,7 @@ def get_client_type( config: Config | None = None, deferrable: bool = False, ) -> boto3.client: - """Get the underlying boto3 client using boto3 session""" + """Get the underlying boto3 client using boto3 session.""" client_type = self.client_type session = self.get_session(region_name=region_name, deferrable=deferrable) if not isinstance(session, boto3.session.Session): @@ -628,7 +629,7 @@ def get_resource_type( region_name: str | None = None, config: Config | None = None, ) -> boto3.resource: - """Get the underlying boto3 resource using boto3 session""" + """Get the underlying boto3 resource using boto3 session.""" resource_type = self.resource_type session = self.get_session(region_name=region_name) return session.resource( @@ -641,7 +642,7 @@ def get_resource_type( @cached_property def conn(self) -> BaseAwsConnection: """ - Get the underlying boto3 client/resource (cached) + Get the underlying boto3 client/resource (cached). :return: boto3.client or boto3.resource """ @@ -683,7 +684,7 @@ def conn_partition(self) -> str: def get_conn(self) -> BaseAwsConnection: """ - Get the underlying boto3 client/resource (cached) + Get the underlying boto3 client/resource (cached). Implemented so that caching works as intended. It exists for compatibility with subclasses that rely on a super().get_conn() method. @@ -873,7 +874,7 @@ def get_waiter( @staticmethod def _apply_parameters_value(config: dict, waiter_name: str, parameters: dict[str, str] | None) -> dict: - """Replaces potential jinja templates in acceptors definition""" + """Replaces potential jinja templates in acceptors definition.""" # only process the waiter we're going to use to not raise errors for missing params for other waiters. acceptors = config["waiters"][waiter_name]["acceptors"] for a in acceptors: @@ -927,7 +928,7 @@ class AwsBaseHook(AwsGenericHook[Union[boto3.client, boto3.resource]]): def resolve_session_factory() -> type[BaseSessionFactory]: - """Resolves custom SessionFactory class""" + """Resolves custom SessionFactory class.""" clazz = conf.getimport("aws", "session_factory", fallback=None) if not clazz: return BaseSessionFactory @@ -943,7 +944,7 @@ def resolve_session_factory() -> type[BaseSessionFactory]: def _parse_s3_config(config_file_name: str, config_format: str | None = "boto", profile: str | None = None): - """For compatibility with airflow.contrib.hooks.aws_hook""" + """For compatibility with airflow.contrib.hooks.aws_hook.""" from airflow.providers.amazon.aws.utils.connection_wrapper import _parse_s3_config return _parse_s3_config( @@ -978,7 +979,9 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) async def get_role_credentials(self) -> dict: - """Get the role_arn, method credentials from connection details and get the role credentials detail""" + """Get the role_arn, method credentials from connection details and get the role credentials + detail. + """ async with self._basic_session.create_client("sts", region_name=self.region_name) as client: response = await client.assume_role( RoleArn=self.role_arn, @@ -1086,7 +1089,7 @@ def get_async_session(self) -> AioSession: ).create_session() async def get_client_async(self): - """Get the underlying aiobotocore client using aiobotocore session""" + """Get the underlying aiobotocore client using aiobotocore session.""" return self.get_async_session().create_client( self.client_type, region_name=self.region_name, diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py index 624869a06b128..4f6e217341a0c 100644 --- a/airflow/providers/amazon/aws/hooks/batch_client.py +++ b/airflow/providers/amazon/aws/hooks/batch_client.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. """ -A client for AWS Batch services +A client for AWS Batch services. .. seealso:: @@ -53,7 +53,7 @@ class BatchProtocol(Protocol): def describe_jobs(self, jobs: list[str]) -> dict: """ - Get job descriptions from AWS Batch + Get job descriptions from AWS Batch. :param jobs: a list of JobId to describe @@ -63,7 +63,7 @@ def describe_jobs(self, jobs: list[str]) -> dict: def get_waiter(self, waiterName: str) -> botocore.waiter.Waiter: """ - Get an AWS Batch service waiter + Get an AWS Batch service waiter. :param waiterName: The name of the waiter. The name should match the name (including the casing) of the key name in the waiter @@ -98,7 +98,7 @@ def submit_job( tags: dict, ) -> dict: """ - Submit a Batch job + Submit a Batch job. :param jobName: the name for the AWS Batch job @@ -120,7 +120,7 @@ def submit_job( def terminate_job(self, jobId: str, reason: str) -> dict: """ - Terminate a Batch job + Terminate a Batch job. :param jobId: a job ID to terminate @@ -216,7 +216,7 @@ def client(self) -> BatchProtocol | botocore.client.BaseClient: def terminate_job(self, job_id: str, reason: str) -> dict: """ - Terminate a Batch job + Terminate a Batch job. :param job_id: a job ID to terminate @@ -230,11 +230,11 @@ def terminate_job(self, job_id: str, reason: str) -> dict: def check_job_success(self, job_id: str) -> bool: """ - Check the final status of the Batch job; return True if the job - 'SUCCEEDED', else raise an AirflowException + Check the final status of the Batch job. - :param job_id: a Batch job ID + Return True if the job 'SUCCEEDED', else raise an AirflowException. + :param job_id: a Batch job ID :raises: AirflowException """ @@ -255,7 +255,7 @@ def check_job_success(self, job_id: str) -> bool: def wait_for_job(self, job_id: str, delay: int | float | None = None) -> None: """ - Wait for Batch job to complete + Wait for Batch job to complete. :param job_id: a Batch job ID @@ -396,7 +396,7 @@ def get_job_description(self, job_id: str) -> dict: @staticmethod def parse_job_description(job_id: str, response: dict) -> dict: """ - Parse job description to extract description for job_id + Parse job description to extract description for job_id. :param job_id: a Batch job ID @@ -488,7 +488,7 @@ def get_job_all_awslogs_info(self, job_id: str) -> list[dict[str, str]]: @staticmethod def add_jitter(delay: int | float, width: int | float = 1, minima: int | float = 0) -> float: """ - Use delay +/- width for random jitter + Use delay +/- width for random jitter. Adding jitter to status polling can help to avoid AWS Batch API limits for monitoring Batch jobs with diff --git a/airflow/providers/amazon/aws/hooks/batch_waiters.py b/airflow/providers/amazon/aws/hooks/batch_waiters.py index cb852acf9d8b8..c746798dff949 100644 --- a/airflow/providers/amazon/aws/hooks/batch_waiters.py +++ b/airflow/providers/amazon/aws/hooks/batch_waiters.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. """ -AWS Batch service waiters +AWS Batch service waiters. .. seealso:: @@ -107,7 +107,7 @@ def __init__(self, *args, waiter_config: dict | None = None, **kwargs) -> None: @property def default_config(self) -> dict: """ - An immutable default waiter configuration + An immutable default waiter configuration. :return: a waiter configuration for AWS Batch services """ diff --git a/airflow/providers/amazon/aws/hooks/cloud_formation.py b/airflow/providers/amazon/aws/hooks/cloud_formation.py index c7157169e3554..b39ee7be5601b 100644 --- a/airflow/providers/amazon/aws/hooks/cloud_formation.py +++ b/airflow/providers/amazon/aws/hooks/cloud_formation.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS CloudFormation Hook""" +"""This module contains AWS CloudFormation Hook.""" from __future__ import annotations from boto3 import client, resource diff --git a/airflow/providers/amazon/aws/hooks/datasync.py b/airflow/providers/amazon/aws/hooks/datasync.py index 3e7c4e420aaf8..0c017a931244e 100644 --- a/airflow/providers/amazon/aws/hooks/datasync.py +++ b/airflow/providers/amazon/aws/hooks/datasync.py @@ -179,7 +179,7 @@ def delete_task(self, task_arn: str) -> None: self.get_conn().delete_task(TaskArn=task_arn) def _refresh_tasks(self) -> None: - """Refreshes the local list of Tasks""" + """Refreshes the local list of Tasks.""" self.tasks = [] next_token = None while True: diff --git a/airflow/providers/amazon/aws/hooks/dms.py b/airflow/providers/amazon/aws/hooks/dms.py index a8ab6bf0a0dd6..2d9b75bbf4baf 100644 --- a/airflow/providers/amazon/aws/hooks/dms.py +++ b/airflow/providers/amazon/aws/hooks/dms.py @@ -51,7 +51,7 @@ def __init__(self, *args, **kwargs): def describe_replication_tasks(self, **kwargs) -> tuple[str | None, list]: """ - Describe replication tasks + Describe replication tasks. .. seealso:: - :external+boto3:py:meth:`DatabaseMigrationService.Client.describe_replication_tasks` @@ -65,7 +65,7 @@ def describe_replication_tasks(self, **kwargs) -> tuple[str | None, list]: def find_replication_tasks_by_arn(self, replication_task_arn: str, without_settings: bool | None = False): """ - Find and describe replication tasks by task ARN + Find and describe replication tasks by task ARN. .. seealso:: - :external+boto3:py:meth:`DatabaseMigrationService.Client.describe_replication_tasks` diff --git a/airflow/providers/amazon/aws/hooks/dynamodb.py b/airflow/providers/amazon/aws/hooks/dynamodb.py index e3cc1a76b5bb5..8710cff31764f 100644 --- a/airflow/providers/amazon/aws/hooks/dynamodb.py +++ b/airflow/providers/amazon/aws/hooks/dynamodb.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains the Amazon DynamoDB Hook""" +"""This module contains the Amazon DynamoDB Hook.""" from __future__ import annotations from typing import Iterable diff --git a/airflow/providers/amazon/aws/hooks/ec2.py b/airflow/providers/amazon/aws/hooks/ec2.py index 91b79aa8a319c..13bcb60196603 100644 --- a/airflow/providers/amazon/aws/hooks/ec2.py +++ b/airflow/providers/amazon/aws/hooks/ec2.py @@ -91,7 +91,7 @@ def get_instance(self, instance_id: str, filters: list | None = None): @only_client_type def stop_instances(self, instance_ids: list) -> dict: """ - Stop instances with given ids + Stop instances with given ids. :param instance_ids: List of instance ids to stop :return: Dict with key `StoppingInstances` and value as list of instances being stopped @@ -103,7 +103,7 @@ def stop_instances(self, instance_ids: list) -> dict: @only_client_type def start_instances(self, instance_ids: list) -> dict: """ - Start instances with given ids + Start instances with given ids. :param instance_ids: List of instance ids to start :return: Dict with key `StartingInstances` and value as list of instances being started @@ -115,7 +115,7 @@ def start_instances(self, instance_ids: list) -> dict: @only_client_type def terminate_instances(self, instance_ids: list) -> dict: """ - Terminate instances with given ids + Terminate instances with given ids. :param instance_ids: List of instance ids to terminate :return: Dict with key `TerminatingInstances` and value as list of instances being terminated @@ -127,7 +127,7 @@ def terminate_instances(self, instance_ids: list) -> dict: @only_client_type def describe_instances(self, filters: list | None = None, instance_ids: list | None = None): """ - Describe EC2 instances, optionally applying filters and selective instance ids + Describe EC2 instances, optionally applying filters and selective instance ids. :param filters: List of filters to specify instances to describe :param instance_ids: List of instance IDs to describe @@ -144,7 +144,7 @@ def describe_instances(self, filters: list | None = None, instance_ids: list | N @only_client_type def get_instances(self, filters: list | None = None, instance_ids: list | None = None) -> list: """ - Get list of instance details, optionally applying filters and selective instance ids + Get list of instance details, optionally applying filters and selective instance ids. :param instance_ids: List of ids to get instances for :param filters: List of filters to specify instances to get @@ -159,7 +159,7 @@ def get_instances(self, filters: list | None = None, instance_ids: list | None = @only_client_type def get_instance_ids(self, filters: list | None = None) -> list: """ - Get list of instance ids, optionally applying filters to fetch selective instances + Get list of instance ids, optionally applying filters to fetch selective instances. :param filters: List of filters to specify instances to get :return: List of instance ids diff --git a/airflow/providers/amazon/aws/hooks/ecs.py b/airflow/providers/amazon/aws/hooks/ecs.py index f8e80f7110663..5f74b4c138d35 100644 --- a/airflow/providers/amazon/aws/hooks/ecs.py +++ b/airflow/providers/amazon/aws/hooks/ecs.py @@ -226,25 +226,43 @@ class EcsProtocol(Protocol): """ def run_task(self, **kwargs) -> dict: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task""" # noqa: E501 + """Run a task. + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task + """ ... def get_waiter(self, x: str) -> Waiter: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.get_waiter""" # noqa: E501 + """Get a waiter. + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.get_waiter + """ ... def describe_tasks(self, cluster: str, tasks) -> dict: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_tasks""" # noqa: E501 + """Describe tasks. + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_tasks + """ ... def stop_task(self, cluster, task, reason: str) -> dict: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.stop_task""" # noqa: E501 + """Stop a task. + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.stop_task + """ ... def describe_task_definition(self, taskDefinition: str) -> dict: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_task_definition""" # noqa: E501 + """Describe a task definition. + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_task_definition + """ ... def list_tasks(self, cluster: str, launchType: str, desiredStatus: str, family: str) -> dict: - """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.list_tasks""" # noqa: E501 + """List tasks. + + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.list_tasks + """ ... diff --git a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py index 7debbd4e4c305..44b20bcdce87a 100644 --- a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py +++ b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py @@ -109,7 +109,7 @@ def get_replication_group_status(self, replication_group_id: str) -> str: def is_replication_group_available(self, replication_group_id: str) -> bool: """ - Helper for checking if replication group is available or not + Helper for checking if replication group is available or not. :param replication_group_id: ID of replication group to check for availability :return: True if available else False @@ -124,7 +124,7 @@ def wait_for_availability( max_retries: int | None = None, ) -> bool: """ - Check if replication group is available or not by performing a describe over it + Check if replication group is available or not by performing a describe over it. :param replication_group_id: ID of replication group to check for availability :param initial_sleep_time: Initial sleep time in seconds @@ -178,7 +178,7 @@ def wait_for_deletion( max_retries: int | None = None, ): """ - Helper for deleting a replication group ensuring it is either deleted or can't be deleted + Helper for deleting a replication group ensuring it is either deleted or can't be deleted. :param replication_group_id: ID of replication to delete :param initial_sleep_time: Initial sleep time in second @@ -253,7 +253,7 @@ def ensure_delete_replication_group( max_retries: int | None = None, ) -> dict: """ - Delete a replication group ensuring it is either deleted or can't be deleted + Delete a replication group ensuring it is either deleted or can't be deleted. :param replication_group_id: ID of replication to delete :param initial_sleep_time: Initial sleep time in second diff --git a/airflow/providers/amazon/aws/hooks/emr.py b/airflow/providers/amazon/aws/hooks/emr.py index 412e7c2e91f8c..4effe18ba2a03 100644 --- a/airflow/providers/amazon/aws/hooks/emr.py +++ b/airflow/providers/amazon/aws/hooks/emr.py @@ -495,7 +495,7 @@ def poll_query_status( def stop_query(self, job_id: str) -> dict: """ - Cancel the submitted job_run + Cancel the submitted job_run. .. seealso:: - :external+boto3:py:meth:`EMRContainers.Client.cancel_job_run` diff --git a/airflow/providers/amazon/aws/hooks/glacier.py b/airflow/providers/amazon/aws/hooks/glacier.py index 835e5f48e513a..cc886a64b72dd 100644 --- a/airflow/providers/amazon/aws/hooks/glacier.py +++ b/airflow/providers/amazon/aws/hooks/glacier.py @@ -40,7 +40,7 @@ def __init__(self, aws_conn_id: str = "aws_default") -> None: def retrieve_inventory(self, vault_name: str) -> dict[str, Any]: """ - Initiate an Amazon Glacier inventory-retrieval job + Initiate an Amazon Glacier inventory-retrieval job. .. seealso:: - :external+boto3:py:meth:`Glacier.Client.initiate_job` @@ -56,7 +56,7 @@ def retrieve_inventory(self, vault_name: str) -> dict[str, Any]: def retrieve_inventory_results(self, vault_name: str, job_id: str) -> dict[str, Any]: """ - Retrieve the results of an Amazon Glacier inventory-retrieval job + Retrieve the results of an Amazon Glacier inventory-retrieval job. .. seealso:: - :external+boto3:py:meth:`Glacier.Client.get_job_output` @@ -71,7 +71,7 @@ def retrieve_inventory_results(self, vault_name: str, job_id: str) -> dict[str, def describe_job(self, vault_name: str, job_id: str) -> dict[str, Any]: """ Retrieve the status of an Amazon S3 Glacier job, such as an - inventory-retrieval job + inventory-retrieval job. .. seealso:: - :external+boto3:py:meth:`Glacier.Client.describe_job` diff --git a/airflow/providers/amazon/aws/hooks/glue.py b/airflow/providers/amazon/aws/hooks/glue.py index 753d43dce6d95..e313b95f93315 100644 --- a/airflow/providers/amazon/aws/hooks/glue.py +++ b/airflow/providers/amazon/aws/hooks/glue.py @@ -217,7 +217,7 @@ def print_job_logs( paginator = log_client.get_paginator("filter_log_events") def display_logs_from(log_group: str, continuation_token: str | None) -> str | None: - """Internal method to mutualize iteration over the 2 different log streams glue jobs write to""" + """Internal method to mutualize iteration over the 2 different log streams glue jobs write to.""" fetched_logs = [] next_token = continuation_token try: diff --git a/airflow/providers/amazon/aws/hooks/glue_catalog.py b/airflow/providers/amazon/aws/hooks/glue_catalog.py index ce94ac18d78d6..81c452cc0e9f2 100644 --- a/airflow/providers/amazon/aws/hooks/glue_catalog.py +++ b/airflow/providers/amazon/aws/hooks/glue_catalog.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS Glue Catalog Hook""" +"""This module contains AWS Glue Catalog Hook.""" from __future__ import annotations from botocore.exceptions import ClientError @@ -85,7 +85,7 @@ def get_partitions( def check_for_partition(self, database_name: str, table_name: str, expression: str) -> bool: """ - Checks whether a partition exists + Checks whether a partition exists. .. code-block:: python @@ -103,7 +103,7 @@ def check_for_partition(self, database_name: str, table_name: str, expression: s def get_table(self, database_name: str, table_name: str) -> dict: """ - Get the information of the table + Get the information of the table. .. seealso:: - :external+boto3:py:meth:`Glue.Client.get_table` @@ -137,7 +137,7 @@ def get_table_location(self, database_name: str, table_name: str) -> str: def get_partition(self, database_name: str, table_name: str, partition_values: list[str]) -> dict: """ - Gets a Partition + Gets a Partition. .. seealso:: - :external+boto3:py:meth:`Glue.Client.get_partition` diff --git a/airflow/providers/amazon/aws/hooks/kinesis.py b/airflow/providers/amazon/aws/hooks/kinesis.py index 6f6b850fe5dcb..e527aa6aaccb6 100644 --- a/airflow/providers/amazon/aws/hooks/kinesis.py +++ b/airflow/providers/amazon/aws/hooks/kinesis.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS Firehose hook""" +"""This module contains AWS Firehose hook.""" from __future__ import annotations from typing import Iterable @@ -43,7 +43,7 @@ def __init__(self, delivery_stream: str, *args, **kwargs) -> None: super().__init__(*args, **kwargs) def put_records(self, records: Iterable): - """Write batch records to Kinesis Firehose + """Write batch records to Kinesis Firehose. .. seealso:: - :external+boto3:py:meth:`Firehose.Client.put_record_batch` diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py index c8d0327be9b73..a3e82688aeed1 100644 --- a/airflow/providers/amazon/aws/hooks/lambda_function.py +++ b/airflow/providers/amazon/aws/hooks/lambda_function.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS Lambda hook""" +"""This module contains AWS Lambda hook.""" from __future__ import annotations from typing import Any diff --git a/airflow/providers/amazon/aws/hooks/quicksight.py b/airflow/providers/amazon/aws/hooks/quicksight.py index 74c77652a5ed9..9ba840ea0262b 100644 --- a/airflow/providers/amazon/aws/hooks/quicksight.py +++ b/airflow/providers/amazon/aws/hooks/quicksight.py @@ -58,7 +58,7 @@ def create_ingestion( check_interval: int = 30, ) -> dict: """ - Creates and starts a new SPICE ingestion for a dataset. Refreshes the SPICE datasets + Creates and starts a new SPICE ingestion for a dataset. Refreshes the SPICE datasets. .. seealso:: - :external+boto3:py:meth:`QuickSight.Client.create_ingestion` @@ -143,7 +143,7 @@ def wait_for_state( check_interval: int, ): """ - Check status of a QuickSight Create Ingestion API + Check status of a QuickSight Create Ingestion API. :param aws_account_id: An AWS Account ID :param data_set_id: QuickSight Data Set ID diff --git a/airflow/providers/amazon/aws/hooks/redshift_cluster.py b/airflow/providers/amazon/aws/hooks/redshift_cluster.py index 56954d6a984da..872c69273285d 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_cluster.py +++ b/airflow/providers/amazon/aws/hooks/redshift_cluster.py @@ -54,7 +54,7 @@ def create_cluster( params: dict[str, Any], ) -> dict[str, Any]: """ - Creates a new cluster with the specified parameters + Creates a new cluster with the specified parameters. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.create_cluster` @@ -85,7 +85,7 @@ def create_cluster( # TODO: Wrap create_cluster_snapshot def cluster_status(self, cluster_identifier: str) -> str: """ - Return status of a cluster + Return status of a cluster. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.describe_clusters` @@ -107,7 +107,7 @@ def delete_cluster( final_cluster_snapshot_identifier: str | None = None, ): """ - Delete a cluster and optionally create a snapshot + Delete a cluster and optionally create a snapshot. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.delete_cluster` @@ -127,7 +127,7 @@ def delete_cluster( def describe_cluster_snapshots(self, cluster_identifier: str) -> list[str] | None: """ - Gets a list of snapshots for a cluster + Gets a list of snapshots for a cluster. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.describe_cluster_snapshots` @@ -144,7 +144,7 @@ def describe_cluster_snapshots(self, cluster_identifier: str) -> list[str] | Non def restore_from_cluster_snapshot(self, cluster_identifier: str, snapshot_identifier: str) -> str: """ - Restores a cluster from its snapshot + Restores a cluster from its snapshot. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.restore_from_cluster_snapshot` @@ -165,7 +165,7 @@ def create_cluster_snapshot( tags: list[Any] | None = None, ) -> str: """ - Creates a snapshot of a cluster + Creates a snapshot of a cluster. .. seealso:: - :external+boto3:py:meth:`Redshift.Client.create_cluster_snapshot` @@ -188,7 +188,7 @@ def create_cluster_snapshot( def get_cluster_snapshot_status(self, snapshot_identifier: str): """ - Return Redshift cluster snapshot status. If cluster snapshot not found return ``None`` + Return Redshift cluster snapshot status. If cluster snapshot not found return ``None``. :param snapshot_identifier: A unique identifier for the snapshot that you are requesting """ @@ -204,7 +204,7 @@ def get_cluster_snapshot_status(self, snapshot_identifier: str): class RedshiftAsyncHook(AwsBaseAsyncHook): - """Interact with AWS Redshift using aiobotocore library""" + """Interact with AWS Redshift using aiobotocore library.""" def __init__(self, *args, **kwargs): warnings.warn( @@ -219,7 +219,7 @@ def __init__(self, *args, **kwargs): async def cluster_status(self, cluster_identifier: str, delete_operation: bool = False) -> dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and get the status - and returns the status of the cluster based on the cluster_identifier passed + and returns the status of the cluster based on the cluster_identifier passed. :param cluster_identifier: unique identifier of a cluster :param delete_operation: whether the method has been called as part of delete cluster operation @@ -239,7 +239,7 @@ async def cluster_status(self, cluster_identifier: str, delete_operation: bool = async def pause_cluster(self, cluster_identifier: str, poll_interval: float = 5.0) -> dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and - pause the cluster based on the cluster_identifier passed + pause the cluster based on the cluster_identifier passed. :param cluster_identifier: unique identifier of a cluster :param poll_interval: polling period in seconds to check for the status @@ -268,7 +268,7 @@ async def resume_cluster( ) -> dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and - resume the cluster for the cluster_identifier passed + resume the cluster for the cluster_identifier passed. :param cluster_identifier: unique identifier of a cluster :param polling_period_seconds: polling period in seconds to check for the status @@ -298,7 +298,7 @@ async def get_cluster_status( delete_operation: bool = False, ) -> dict[str, Any]: """ - check for expected Redshift cluster state + check for expected Redshift cluster state. :param cluster_identifier: unique identifier of a cluster :param expected_state: expected_state example("available", "pausing", "paused"") diff --git a/airflow/providers/amazon/aws/hooks/redshift_data.py b/airflow/providers/amazon/aws/hooks/redshift_data.py index e033624c4c8c1..b38fc8962bc73 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_data.py +++ b/airflow/providers/amazon/aws/hooks/redshift_data.py @@ -61,7 +61,7 @@ def execute_query( poll_interval: int = 10, ) -> str: """ - Execute a statement against Amazon Redshift + Execute a statement against Amazon Redshift. :param database: the name of the database :param sql: the SQL statement or list of SQL statement to run diff --git a/airflow/providers/amazon/aws/hooks/redshift_sql.py b/airflow/providers/amazon/aws/hooks/redshift_sql.py index 61832e67aaf89..afc2e797c916e 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_sql.py +++ b/airflow/providers/amazon/aws/hooks/redshift_sql.py @@ -34,7 +34,7 @@ class RedshiftSQLHook(DbApiHook): """ - Execute statements against Amazon Redshift, using redshift_connector + Execute statements against Amazon Redshift, using redshift_connector. This hook requires the redshift_conn_id connection. @@ -65,7 +65,7 @@ def __init__(self, *args, aws_conn_id: str = "aws_default", **kwargs) -> None: @staticmethod def get_ui_field_behaviour() -> dict: - """Returns custom field behavior""" + """Returns custom field behavior.""" return { "hidden_fields": [], "relabeling": {"login": "User", "schema": "Database"}, @@ -76,7 +76,7 @@ def conn(self): return self.get_connection(self.redshift_conn_id) # type: ignore[attr-defined] def _get_conn_params(self) -> dict[str, str | int]: - """Helper method to retrieve connection args""" + """Helper method to retrieve connection args.""" conn = self.conn conn_params: dict[str, str | int] = {} @@ -100,7 +100,7 @@ def _get_conn_params(self) -> dict[str, str | int]: def get_iam_token(self, conn: Connection) -> tuple[str, str, int]: """ Uses AWSHook to retrieve a temporary password to connect to Redshift. - Port is required. If none is provided, default is used for each service + Port is required. If none is provided, default is used for each service. """ port = conn.port or 5439 # Pull the custer-identifier from the beginning of the Redshift URL @@ -124,7 +124,7 @@ def get_iam_token(self, conn: Connection) -> tuple[str, str, int]: return login, token, port def get_uri(self) -> str: - """Overrides DbApiHook get_uri to use redshift_connector sqlalchemy dialect as driver name""" + """Overrides DbApiHook get_uri to use redshift_connector sqlalchemy dialect as driver name.""" conn_params = self._get_conn_params() if "user" in conn_params: @@ -136,7 +136,7 @@ def get_uri(self) -> str: return str(create_url(drivername="redshift+redshift_connector", **conn_params)) def get_sqlalchemy_engine(self, engine_kwargs=None): - """Overrides DbApiHook get_sqlalchemy_engine to pass redshift_connector specific kwargs""" + """Overrides DbApiHook get_sqlalchemy_engine to pass redshift_connector specific kwargs.""" conn_kwargs = self.conn.extra_dejson if engine_kwargs is None: engine_kwargs = {} @@ -150,7 +150,8 @@ def get_sqlalchemy_engine(self, engine_kwargs=None): def get_table_primary_key(self, table: str, schema: str | None = "public") -> list[str] | None: """ - Helper method that returns the table primary key + Helper method that returns the table primary key. + :param table: Name of the target table :param schema: Name of the target schema, public by default :return: Primary key columns list @@ -170,7 +171,7 @@ def get_table_primary_key(self, table: str, schema: str | None = "public") -> li return pk_columns or None def get_conn(self) -> RedshiftConnection: - """Returns a redshift_connector.Connection object""" + """Returns a redshift_connector.Connection object.""" conn_params = self._get_conn_params() conn_kwargs_dejson = self.conn.extra_dejson conn_kwargs: dict = {**conn_params, **conn_kwargs_dejson} diff --git a/airflow/providers/amazon/aws/hooks/s3.py b/airflow/providers/amazon/aws/hooks/s3.py index 1cfa6b7b01eba..0ed28b3960541 100644 --- a/airflow/providers/amazon/aws/hooks/s3.py +++ b/airflow/providers/amazon/aws/hooks/s3.py @@ -174,8 +174,9 @@ def extra_args(self): def parse_s3_url(s3url: str) -> tuple[str, str]: """ Parses the S3 Url into a bucket name and key. - See https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html - for valid url formats + + See https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html + for valid url formats. :param s3url: The S3 Url to parse. :return: the parsed bucket name and key @@ -206,9 +207,11 @@ def get_s3_bucket_key( bucket: str | None, key: str, bucket_param_name: str, key_param_name: str ) -> tuple[str, str]: """ - Get the S3 bucket name and key from either: - - bucket name and key. Return the info as it is after checking `key` is a relative path - - key. Must be a full s3:// url + Get the S3 bucket name and key. + + From either: + - bucket name and key. Return the info as it is after checking `key` is a relative path. + - key. Must be a full s3:// url. :param bucket: The S3 bucket name :param key: The S3 key @@ -261,7 +264,7 @@ def check_for_bucket(self, bucket_name: str | None = None) -> bool: @provide_bucket_name def get_bucket(self, bucket_name: str | None = None) -> object: """ - Returns a :py:class:`S3.Bucket` object + Returns a :py:class:`S3.Bucket` object. .. seealso:: - :external+boto3:py:meth:`S3.ServiceResource.Bucket` @@ -306,7 +309,7 @@ def create_bucket(self, bucket_name: str | None = None, region_name: str | None @provide_bucket_name def check_for_prefix(self, prefix: str, delimiter: str, bucket_name: str | None = None) -> bool: """ - Checks that a prefix exists in a bucket + Checks that a prefix exists in a bucket. :param bucket_name: the name of the bucket :param prefix: a key prefix @@ -329,7 +332,7 @@ def list_prefixes( max_items: int | None = None, ) -> list: """ - Lists prefixes in a bucket under prefix + Lists prefixes in a bucket under prefix. .. seealso:: - :external+boto3:py:class:`S3.Paginator.ListObjectsV2` @@ -386,7 +389,7 @@ def list_keys( object_filter: Callable[..., list] | None = None, ) -> list: """ - Lists keys in a bucket under prefix and not containing delimiter + Lists keys in a bucket under prefix and not containing delimiter. .. seealso:: - :external+boto3:py:class:`S3.Paginator.ListObjectsV2` @@ -461,7 +464,7 @@ def get_file_metadata( max_items: int | None = None, ) -> list: """ - Lists metadata objects in a bucket under prefix + Lists metadata objects in a bucket under prefix. .. seealso:: - :external+boto3:py:class:`S3.Paginator.ListObjectsV2` @@ -490,7 +493,7 @@ def get_file_metadata( @provide_bucket_name def head_object(self, key: str, bucket_name: str | None = None) -> dict | None: """ - Retrieves metadata of an object + Retrieves metadata of an object. .. seealso:: - :external+boto3:py:meth:`S3.Client.head_object` @@ -511,7 +514,7 @@ def head_object(self, key: str, bucket_name: str | None = None) -> dict | None: @provide_bucket_name def check_for_key(self, key: str, bucket_name: str | None = None) -> bool: """ - Checks if a key exists in a bucket + Checks if a key exists in a bucket. .. seealso:: - :external+boto3:py:meth:`S3.Client.head_object` @@ -550,7 +553,7 @@ def get_key(self, key: str, bucket_name: str | None = None) -> S3ResourceObject: @provide_bucket_name def read_key(self, key: str, bucket_name: str | None = None) -> str: """ - Reads a key from S3 + Reads a key from S3. .. seealso:: - :external+boto3:py:meth:`S3.Object.get` @@ -614,7 +617,7 @@ def check_for_wildcard_key( self, wildcard_key: str, bucket_name: str | None = None, delimiter: str = "" ) -> bool: """ - Checks that a key matching a wildcard expression exists in a bucket + Checks that a key matching a wildcard expression exists in a bucket. :param wildcard_key: the path to the key :param bucket_name: the name of the bucket @@ -632,7 +635,7 @@ def get_wildcard_key( self, wildcard_key: str, bucket_name: str | None = None, delimiter: str = "" ) -> S3ResourceObject | None: """ - Returns a boto3.s3.Object object matching the wildcard expression + Returns a boto3.s3.Object object matching the wildcard expression. :param wildcard_key: the path to the key :param bucket_name: the name of the bucket @@ -659,7 +662,7 @@ def load_file( acl_policy: str | None = None, ) -> None: """ - Loads a local file to S3 + Loads a local file to S3. .. seealso:: - :external+boto3:py:meth:`S3.Client.upload_file` @@ -709,7 +712,7 @@ def load_string( compression: str | None = None, ) -> None: """ - Loads a string to S3 + Loads a string to S3. This is provided as a convenience to drop a string in S3. It uses the boto infrastructure to ship a file to s3. @@ -760,7 +763,7 @@ def load_bytes( acl_policy: str | None = None, ) -> None: """ - Loads bytes to S3 + Loads bytes to S3. This is provided as a convenience to drop bytes data into S3. It uses the boto infrastructure to ship a file to s3. @@ -794,7 +797,7 @@ def load_file_obj( acl_policy: str | None = None, ) -> None: """ - Loads a file object to S3 + Loads a file object to S3. .. seealso:: - :external+boto3:py:meth:`S3.Client.upload_fileobj` @@ -1034,7 +1037,7 @@ def generate_presigned_url( http_method: str | None = None, ) -> str | None: """ - Generate a presigned url given a client, its method, and arguments + Generate a presigned url given a client, its method, and arguments. .. seealso:: - :external+boto3:py:meth:`S3.Client.generate_presigned_url` diff --git a/airflow/providers/amazon/aws/hooks/sagemaker.py b/airflow/providers/amazon/aws/hooks/sagemaker.py index 31c2eaaf5bd2d..b6e7dd487d222 100644 --- a/airflow/providers/amazon/aws/hooks/sagemaker.py +++ b/airflow/providers/amazon/aws/hooks/sagemaker.py @@ -41,6 +41,7 @@ class LogState: """ Enum-style class holding all possible states of CloudWatch log streams. + https://sagemaker.readthedocs.io/en/stable/session.html#sagemaker.session.LogState """ @@ -57,7 +58,7 @@ class LogState: def argmin(arr, f: Callable) -> int | None: - """Return the index, i, in arr that minimizes f(arr[i])""" + """Return the index, i, in arr that minimizes f(arr[i]).""" min_value = None min_idx = None for idx, item in enumerate(arr): @@ -158,7 +159,7 @@ def __init__(self, *args, **kwargs): def tar_and_s3_upload(self, path: str, key: str, bucket: str) -> None: """ - Tar the local file or directory and upload to s3 + Tar the local file or directory and upload to s3. :param path: local file or directory :param key: s3 key @@ -197,7 +198,7 @@ def configure_s3_resources(self, config: dict) -> None: def check_s3_url(self, s3url: str) -> bool: """ - Check if an S3 URL exists + Check if an S3 URL exists. :param s3url: S3 url """ @@ -219,7 +220,7 @@ def check_s3_url(self, s3url: str) -> bool: def check_training_config(self, training_config: dict) -> None: """ - Check if a training configuration is valid + Check if a training configuration is valid. :param training_config: training_config :return: None @@ -231,7 +232,7 @@ def check_training_config(self, training_config: dict) -> None: def check_tuning_config(self, tuning_config: dict) -> None: """ - Check if a tuning configuration is valid + Check if a tuning configuration is valid. :param tuning_config: tuning_config :return: None @@ -543,7 +544,7 @@ def update_endpoint( def describe_training_job(self, name: str): """ - Return the training job info associated with the name + Return the training job info associated with the name. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.describe_training_job` @@ -563,7 +564,7 @@ def describe_training_job_with_log( last_description: dict, last_describe_job_call: float, ): - """Return the training job info associated with job_name and print CloudWatch logs""" + """Return the training job info associated with job_name and print CloudWatch logs.""" log_group = "/aws/sagemaker/TrainingJobs" if len(stream_names) < instance_count: @@ -616,7 +617,7 @@ def describe_training_job_with_log( def describe_tuning_job(self, name: str) -> dict: """ - Return the tuning job info associated with the name + Return the tuning job info associated with the name. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.describe_hyper_parameter_tuning_job` @@ -628,7 +629,7 @@ def describe_tuning_job(self, name: str) -> dict: def describe_model(self, name: str) -> dict: """ - Return the SageMaker model info associated with the name + Return the SageMaker model info associated with the name. :param name: the name of the SageMaker model :return: A dict contains all the model info @@ -637,7 +638,7 @@ def describe_model(self, name: str) -> dict: def describe_transform_job(self, name: str) -> dict: """ - Return the transform job info associated with the name + Return the transform job info associated with the name. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.describe_transform_job` @@ -649,7 +650,7 @@ def describe_transform_job(self, name: str) -> dict: def describe_processing_job(self, name: str) -> dict: """ - Return the processing job info associated with the name + Return the processing job info associated with the name. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.describe_processing_job` @@ -661,7 +662,7 @@ def describe_processing_job(self, name: str) -> dict: def describe_endpoint_config(self, name: str) -> dict: """ - Return the endpoint config info associated with the name + Return the endpoint config info associated with the name. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.describe_endpoint_config` @@ -693,7 +694,7 @@ def check_status( non_terminal_states: set | None = None, ) -> dict: """ - Check status of a SageMaker resource + Check status of a SageMaker resource. :param job_name: name of the resource to check status, can be a job but also pipeline for instance. :param key: the key of the response dict that points to the state @@ -833,7 +834,7 @@ def list_training_jobs( """ This method wraps boto3's `list_training_jobs`. The training job name and max results are configurable via arguments. Other arguments are not, and should be provided via kwargs. Note boto3 expects these in - CamelCase format, for example: + CamelCase format, for example. .. code-block:: python @@ -861,7 +862,7 @@ def list_transform_jobs( This method wraps boto3's `list_transform_jobs`. The transform job name and max results are configurable via arguments. Other arguments are not, and should be provided via kwargs. Note boto3 expects these in - CamelCase format, for example: + CamelCase format, for example. .. code-block:: python @@ -885,7 +886,7 @@ def list_transform_jobs( def list_processing_jobs(self, **kwargs) -> list[dict]: """ This method wraps boto3's `list_processing_jobs`. All arguments should be provided via kwargs. - Note boto3 expects these in CamelCase format, for example: + Note boto3 expects these in CamelCase format, for example. .. code-block:: python @@ -1116,7 +1117,7 @@ def stop_pipeline( verbose: bool = True, fail_if_not_running: bool = False, ) -> str: - """Stop SageMaker pipeline execution + """Stop SageMaker pipeline execution. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.stop_pipeline_execution` @@ -1182,7 +1183,7 @@ def stop_pipeline( def create_model_package_group(self, package_group_name: str, package_group_desc: str = "") -> bool: """ - Creates a Model Package Group if it does not already exist + Creates a Model Package Group if it does not already exist. .. seealso:: - :external+boto3:py:meth:`SageMaker.Client.create_model_package_group` diff --git a/airflow/providers/amazon/aws/hooks/secrets_manager.py b/airflow/providers/amazon/aws/hooks/secrets_manager.py index f64b834bbedd1..c82d543b0cc68 100644 --- a/airflow/providers/amazon/aws/hooks/secrets_manager.py +++ b/airflow/providers/amazon/aws/hooks/secrets_manager.py @@ -42,7 +42,7 @@ def __init__(self, *args, **kwargs): def get_secret(self, secret_name: str) -> str | bytes: """ Retrieve secret value from AWS Secrets Manager as a str or bytes - reflecting format it stored in the AWS Secrets Manager + reflecting format it stored in the AWS Secrets Manager. .. seealso:: - :external+boto3:py:meth:`SecretsManager.Client.get_secret_value` @@ -61,7 +61,7 @@ def get_secret(self, secret_name: str) -> str | bytes: def get_secret_as_dict(self, secret_name: str) -> dict: """ - Retrieve secret value from AWS Secrets Manager in a dict representation + Retrieve secret value from AWS Secrets Manager in a dict representation. :param secret_name: name of the secrets. :return: dict with the information about the secrets diff --git a/airflow/providers/amazon/aws/hooks/ses.py b/airflow/providers/amazon/aws/hooks/ses.py index b919fcc75713c..2f5a6a91646fc 100644 --- a/airflow/providers/amazon/aws/hooks/ses.py +++ b/airflow/providers/amazon/aws/hooks/ses.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS SES Hook""" +"""This module contains AWS SES Hook.""" from __future__ import annotations from typing import Any, Iterable @@ -55,7 +55,7 @@ def send_email( custom_headers: dict[str, Any] | None = None, ) -> dict: """ - Send email using Amazon Simple Email Service + Send email using Amazon Simple Email Service. .. seealso:: - :external+boto3:py:meth:`SES.Client.send_raw_email` diff --git a/airflow/providers/amazon/aws/hooks/sns.py b/airflow/providers/amazon/aws/hooks/sns.py index ee022dd00a7d3..199376c7ffe87 100644 --- a/airflow/providers/amazon/aws/hooks/sns.py +++ b/airflow/providers/amazon/aws/hooks/sns.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS SNS hook""" +"""This module contains AWS SNS hook.""" from __future__ import annotations import json diff --git a/airflow/providers/amazon/aws/hooks/sqs.py b/airflow/providers/amazon/aws/hooks/sqs.py index 43699787afbf3..c59beba9225df 100644 --- a/airflow/providers/amazon/aws/hooks/sqs.py +++ b/airflow/providers/amazon/aws/hooks/sqs.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS SQS hook""" +"""This module contains AWS SQS hook.""" from __future__ import annotations from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook diff --git a/airflow/providers/amazon/aws/hooks/step_function.py b/airflow/providers/amazon/aws/hooks/step_function.py index 7baf23eadee68..8b5df2bdba62a 100644 --- a/airflow/providers/amazon/aws/hooks/step_function.py +++ b/airflow/providers/amazon/aws/hooks/step_function.py @@ -70,7 +70,7 @@ def start_execution( def describe_execution(self, execution_arn: str) -> dict: """ - Describes a State Machine Execution + Describes a State Machine Execution. .. seealso:: - :external+boto3:py:meth:`SFN.Client.describe_execution` diff --git a/airflow/providers/amazon/aws/links/base_aws.py b/airflow/providers/amazon/aws/links/base_aws.py index 82acd337bfc41..97130fabd6cc7 100644 --- a/airflow/providers/amazon/aws/links/base_aws.py +++ b/airflow/providers/amazon/aws/links/base_aws.py @@ -31,7 +31,7 @@ class BaseAwsLink(BaseOperatorLink): - """Base Helper class for constructing AWS Console Link""" + """Base Helper class for constructing AWS Console Link.""" name: ClassVar[str] key: ClassVar[str] @@ -50,7 +50,7 @@ def get_aws_domain(aws_partition) -> str | None: def format_link(self, **kwargs) -> str: """ - Format AWS Service Link + Format AWS Service Link. Some AWS Service Link should require additional escaping in this case this method should be overridden. @@ -80,7 +80,7 @@ def get_link( def persist( cls, context: Context, operator: BaseOperator, region_name: str, aws_partition: str, **kwargs ) -> None: - """Store link information into XCom""" + """Store link information into XCom.""" if not operator.do_xcom_push: return diff --git a/airflow/providers/amazon/aws/links/batch.py b/airflow/providers/amazon/aws/links/batch.py index 432d129a7c328..4c5bdd8016040 100644 --- a/airflow/providers/amazon/aws/links/batch.py +++ b/airflow/providers/amazon/aws/links/batch.py @@ -20,7 +20,7 @@ class BatchJobDefinitionLink(BaseAwsLink): - """Helper class for constructing AWS Batch Job Definition Link""" + """Helper class for constructing AWS Batch Job Definition Link.""" name = "Batch Job Definition" key = "batch_job_definition" @@ -30,7 +30,7 @@ class BatchJobDefinitionLink(BaseAwsLink): class BatchJobDetailsLink(BaseAwsLink): - """Helper class for constructing AWS Batch Job Details Link""" + """Helper class for constructing AWS Batch Job Details Link.""" name = "Batch Job Details" key = "batch_job_details" @@ -38,7 +38,7 @@ class BatchJobDetailsLink(BaseAwsLink): class BatchJobQueueLink(BaseAwsLink): - """Helper class for constructing AWS Batch Job Queue Link""" + """Helper class for constructing AWS Batch Job Queue Link.""" name = "Batch Job Queue" key = "batch_job_queue" diff --git a/airflow/providers/amazon/aws/links/emr.py b/airflow/providers/amazon/aws/links/emr.py index 22f952e763f15..358b8b302fd54 100644 --- a/airflow/providers/amazon/aws/links/emr.py +++ b/airflow/providers/amazon/aws/links/emr.py @@ -27,7 +27,7 @@ class EmrClusterLink(BaseAwsLink): - """Helper class for constructing AWS EMR Cluster Link""" + """Helper class for constructing AWS EMR Cluster Link.""" name = "EMR Cluster" key = "emr_cluster" @@ -35,7 +35,7 @@ class EmrClusterLink(BaseAwsLink): class EmrLogsLink(BaseAwsLink): - """Helper class for constructing AWS EMR Logs Link""" + """Helper class for constructing AWS EMR Logs Link.""" name = "EMR Cluster Logs" key = "emr_logs" diff --git a/airflow/providers/amazon/aws/links/glue.py b/airflow/providers/amazon/aws/links/glue.py index 44a7110118ee0..ad9c1765a94bd 100644 --- a/airflow/providers/amazon/aws/links/glue.py +++ b/airflow/providers/amazon/aws/links/glue.py @@ -20,7 +20,7 @@ class GlueJobRunDetailsLink(BaseAwsLink): - """Helper class for constructing AWS Glue Job Run Details Link""" + """Helper class for constructing AWS Glue Job Run Details Link.""" name = "AWS Glue Job Run Details" key = "glue_job_run_details" diff --git a/airflow/providers/amazon/aws/links/logs.py b/airflow/providers/amazon/aws/links/logs.py index 7998191d9226d..90296957ca61b 100644 --- a/airflow/providers/amazon/aws/links/logs.py +++ b/airflow/providers/amazon/aws/links/logs.py @@ -22,7 +22,7 @@ class CloudWatchEventsLink(BaseAwsLink): - """Helper class for constructing AWS CloudWatch Events Link""" + """Helper class for constructing AWS CloudWatch Events Link.""" name = "CloudWatch Events" key = "cloudwatch_events" diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index f35cad065c2af..5ec02b44d32a5 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -31,7 +31,8 @@ def get_default_delete_local_copy(): - """Load delete_local_logs conf if Airflow version > 2.6 and return False if not + """Load delete_local_logs conf if Airflow version > 2.6 and return False if not. + TODO: delete this function when min airflow version >= 2.6 """ from airflow.version import version @@ -158,7 +159,7 @@ def _read(self, ti, try_number, metadata=None): def s3_log_exists(self, remote_log_location: str) -> bool: """ - Check if remote_log_location exists in remote storage + Check if remote_log_location exists in remote storage. :param remote_log_location: log's location in remote storage :return: True if location exists else False diff --git a/airflow/providers/amazon/aws/operators/athena.py b/airflow/providers/amazon/aws/operators/athena.py index 5525641f23935..1bd1a97be220a 100644 --- a/airflow/providers/amazon/aws/operators/athena.py +++ b/airflow/providers/amazon/aws/operators/athena.py @@ -91,7 +91,7 @@ def hook(self) -> AthenaHook: return AthenaHook(self.aws_conn_id, sleep_time=self.sleep_time, log_query=self.log_query) def execute(self, context: Context) -> str | None: - """Run Presto Query on Athena""" + """Run Presto Query on Athena.""" self.query_execution_context["Database"] = self.database self.result_configuration["OutputLocation"] = self.output_location self.query_execution_id = self.hook.run_query( @@ -121,7 +121,7 @@ def execute(self, context: Context) -> str | None: return self.query_execution_id def on_kill(self) -> None: - """Cancel the submitted athena query""" + """Cancel the submitted athena query.""" if self.query_execution_id: self.log.info("Received a kill signal.") response = self.hook.stop_query(self.query_execution_id) diff --git a/airflow/providers/amazon/aws/operators/batch.py b/airflow/providers/amazon/aws/operators/batch.py index e6cb549871276..8a127dbd67cce 100644 --- a/airflow/providers/amazon/aws/operators/batch.py +++ b/airflow/providers/amazon/aws/operators/batch.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. """ -An Airflow operator for AWS Batch services +An Airflow operator for AWS Batch services. .. seealso:: @@ -47,7 +47,7 @@ class BatchOperator(BaseOperator): """ - Execute a job on AWS Batch + Execute a job on AWS Batch. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -200,7 +200,7 @@ def hook(self) -> BatchClientHook: def execute(self, context: Context): """ - Submit and monitor an AWS Batch job + Submit and monitor an AWS Batch job. :raises: AirflowException """ @@ -237,7 +237,7 @@ def on_kill(self): def submit_job(self, context: Context): """ - Submit an AWS Batch job + Submit an AWS Batch job. :raises: AirflowException """ @@ -292,7 +292,7 @@ def monitor_job(self, context: Context): Monitor an AWS Batch job monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout is given while creating the task. These exceptions should be handled in taskinstance.py - instead of here like it was previously done + instead of here like it was previously done. :raises: AirflowException """ @@ -358,7 +358,7 @@ def monitor_job(self, context: Context): class BatchCreateComputeEnvironmentOperator(BaseOperator): """ - Create an AWS Batch compute environment + Create an AWS Batch compute environment. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -433,7 +433,7 @@ def __init__( @cached_property def hook(self): - """Create and return a BatchClientHook""" + """Create and return a BatchClientHook.""" return BatchClientHook( max_retries=self.max_retries, status_retries=self.status_retries, @@ -442,7 +442,7 @@ def hook(self): ) def execute(self, context: Context): - """Create an AWS batch compute environment""" + """Create an AWS batch compute environment.""" kwargs: dict[str, Any] = { "computeEnvironmentName": self.compute_environment_name, "type": self.environment_type, diff --git a/airflow/providers/amazon/aws/operators/datasync.py b/airflow/providers/amazon/aws/operators/datasync.py index 508d87e163658..76b288a54805f 100644 --- a/airflow/providers/amazon/aws/operators/datasync.py +++ b/airflow/providers/amazon/aws/operators/datasync.py @@ -261,7 +261,7 @@ def _get_tasks_and_locations(self) -> None: self.log.info("Found candidate DataSync TaskArns %s", self.candidate_task_arns) def choose_task(self, task_arn_list: list) -> str | None: - """Select 1 DataSync TaskArn from a list""" + """Select 1 DataSync TaskArn from a list.""" if not task_arn_list: return None if len(task_arn_list) == 1: @@ -275,7 +275,7 @@ def choose_task(self, task_arn_list: list) -> str | None: raise AirflowException(f"Unable to choose a Task from {task_arn_list}") def choose_location(self, location_arn_list: list[str] | None) -> str | None: - """Select 1 DataSync LocationArn from a list""" + """Select 1 DataSync LocationArn from a list.""" if not location_arn_list: return None if len(location_arn_list) == 1: diff --git a/airflow/providers/amazon/aws/operators/dms.py b/airflow/providers/amazon/aws/operators/dms.py index 6303afcbaf318..afa81ab040bae 100644 --- a/airflow/providers/amazon/aws/operators/dms.py +++ b/airflow/providers/amazon/aws/operators/dms.py @@ -88,7 +88,7 @@ def __init__( def execute(self, context: Context): """ - Creates AWS DMS replication task from Airflow + Creates AWS DMS replication task from Airflow. :return: replication task arn """ @@ -141,7 +141,7 @@ def __init__( def execute(self, context: Context): """ - Deletes AWS DMS replication task from Airflow + Deletes AWS DMS replication task from Airflow. :return: replication task arn """ @@ -183,7 +183,7 @@ def __init__( def execute(self, context: Context) -> tuple[str | None, list]: """ - Describes AWS DMS replication tasks from Airflow + Describes AWS DMS replication tasks from Airflow. :return: Marker and list of replication tasks """ @@ -235,7 +235,7 @@ def __init__( def execute(self, context: Context): """ - Starts AWS DMS replication task from Airflow + Starts AWS DMS replication task from Airflow. :return: replication task arn """ @@ -282,7 +282,7 @@ def __init__( def execute(self, context: Context): """ - Stops AWS DMS replication task from Airflow + Stops AWS DMS replication task from Airflow. :return: replication task arn """ diff --git a/airflow/providers/amazon/aws/operators/ec2.py b/airflow/providers/amazon/aws/operators/ec2.py index 5f6b76ce154f9..b9de533378324 100644 --- a/airflow/providers/amazon/aws/operators/ec2.py +++ b/airflow/providers/amazon/aws/operators/ec2.py @@ -120,7 +120,7 @@ def execute(self, context: Context): class EC2CreateInstanceOperator(BaseOperator): """ - Create and start a specified number of EC2 Instances using boto3 + Create and start a specified number of EC2 Instances using boto3. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -201,7 +201,7 @@ def execute(self, context: Context): class EC2TerminateInstanceOperator(BaseOperator): """ - Terminate EC2 Instances using boto3 + Terminate EC2 Instances using boto3. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/ecs.py b/airflow/providers/amazon/aws/operators/ecs.py index d4f0bdf045c76..149ae2c11c15f 100644 --- a/airflow/providers/amazon/aws/operators/ecs.py +++ b/airflow/providers/amazon/aws/operators/ecs.py @@ -335,7 +335,7 @@ def execute(self, context: Context): class EcsRunTaskOperator(EcsBaseOperator): """ - Execute a task on AWS ECS (Elastic Container Service) + Execute a task on AWS ECS (Elastic Container Service). .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 55ad270d27cc6..1490775fe61a7 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -414,7 +414,7 @@ def hook(self) -> EmrContainerHook: return EmrContainerHook(self.aws_conn_id) def execute(self, context: Context) -> str | None: - """Create EMR on EKS virtual Cluster""" + """Create EMR on EKS virtual Cluster.""" self.virtual_cluster_id = self.hook.create_emr_on_eks_cluster( self.virtual_cluster_name, self.eks_cluster_name, self.eks_namespace, self.tags ) @@ -513,7 +513,7 @@ def hook(self) -> EmrContainerHook: ) def execute(self, context: Context) -> str | None: - """Run job on EMR Containers""" + """Run job on EMR Containers.""" self.job_id = self.hook.submit_job( self.name, self.execution_role_arn, @@ -545,7 +545,7 @@ def execute(self, context: Context) -> str | None: return self.job_id def on_kill(self) -> None: - """Cancel the submitted job run""" + """Cancel the submitted job run.""" if self.job_id: self.log.info("Stopping job run with jobId - %s", self.job_id) response = self.hook.stop_query(self.job_id) @@ -838,7 +838,7 @@ def execute(self, context: Context) -> None: class EmrServerlessCreateApplicationOperator(BaseOperator): """ - Operator to create Serverless EMR Application + Operator to create Serverless EMR Application. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -1056,7 +1056,7 @@ def execute(self, context: Context) -> str | None: return self.job_id def on_kill(self) -> None: - """Cancel the submitted job run""" + """Cancel the submitted job run.""" if self.job_id: self.log.info("Stopping job run with jobId - %s", self.job_id) response = self.hook.conn.cancel_job_run(applicationId=self.application_id, jobRunId=self.job_id) @@ -1089,7 +1089,7 @@ def on_kill(self) -> None: class EmrServerlessStopApplicationOperator(BaseOperator): """ - Operator to stop an EMR Serverless application + Operator to stop an EMR Serverless application. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -1167,7 +1167,7 @@ def execute(self, context: Context) -> None: class EmrServerlessDeleteApplicationOperator(EmrServerlessStopApplicationOperator): """ - Operator to delete EMR Serverless application + Operator to delete EMR Serverless application. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/glacier.py b/airflow/providers/amazon/aws/operators/glacier.py index 4e7c8b5e17421..54123e586dab1 100644 --- a/airflow/providers/amazon/aws/operators/glacier.py +++ b/airflow/providers/amazon/aws/operators/glacier.py @@ -28,7 +28,7 @@ class GlacierCreateJobOperator(BaseOperator): """ - Initiate an Amazon Glacier inventory-retrieval job + Initiate an Amazon Glacier inventory-retrieval job. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -58,7 +58,7 @@ def execute(self, context: Context): class GlacierUploadArchiveOperator(BaseOperator): """ - This operator add an archive to an Amazon S3 Glacier vault + This operator add an archive to an Amazon S3 Glacier vault. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/glue.py b/airflow/providers/amazon/aws/operators/glue.py index 053e530c72674..88950a2d01963 100644 --- a/airflow/providers/amazon/aws/operators/glue.py +++ b/airflow/providers/amazon/aws/operators/glue.py @@ -36,7 +36,7 @@ class GlueJobOperator(BaseOperator): """ Creates an AWS Glue Job. AWS Glue is a serverless Spark ETL service for running Spark Jobs on the AWS cloud. - Language support: Python and Scala + Language support: Python and Scala. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -124,7 +124,7 @@ def __init__( def execute(self, context: Context): """ - Executes AWS Glue Job from Airflow + Executes AWS Glue Job from Airflow. :return: the id of the current glue job. """ diff --git a/airflow/providers/amazon/aws/operators/glue_crawler.py b/airflow/providers/amazon/aws/operators/glue_crawler.py index c7ac25f1f2e30..2d6f4f47c04c7 100644 --- a/airflow/providers/amazon/aws/operators/glue_crawler.py +++ b/airflow/providers/amazon/aws/operators/glue_crawler.py @@ -77,7 +77,7 @@ def hook(self) -> GlueCrawlerHook: def execute(self, context: Context): """ - Executes AWS Glue Crawler from Airflow + Executes AWS Glue Crawler from Airflow. :return: the name of the current glue crawler. """ diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index df010261896fd..4d9b0889789ae 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -32,7 +32,7 @@ class RdsBaseOperator(BaseOperator): - """Base operator that implements common functions for all operators""" + """Base operator that implements common functions for all operators.""" ui_color = "#eeaa88" ui_fgcolor = "#ffffff" @@ -45,11 +45,11 @@ def __init__(self, *args, aws_conn_id: str = "aws_conn_id", hook_params: dict | self._await_interval = 60 # seconds def execute(self, context: Context) -> str: - """Different implementations for snapshots, tasks and events""" + """Different implementations for snapshots, tasks and events.""" raise NotImplementedError def on_kill(self) -> None: - """Different implementations for snapshots, tasks and events""" + """Different implementations for snapshots, tasks and events.""" raise NotImplementedError @@ -124,7 +124,7 @@ def execute(self, context: Context) -> str: class RdsCopyDbSnapshotOperator(RdsBaseOperator): """ - Copies the specified DB instance or DB cluster snapshot + Copies the specified DB instance or DB cluster snapshot. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -230,7 +230,7 @@ def execute(self, context: Context) -> str: class RdsDeleteDbSnapshotOperator(RdsBaseOperator): """ - Deletes a DB instance or cluster snapshot or terminating the copy operation + Deletes a DB instance or cluster snapshot or terminating the copy operation. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -353,7 +353,7 @@ def execute(self, context: Context) -> str: class RdsCancelExportTaskOperator(RdsBaseOperator): """ - Cancels an export task in progress that is exporting a snapshot to Amazon S3 + Cancels an export task in progress that is exporting a snapshot to Amazon S3. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -396,7 +396,7 @@ def execute(self, context: Context) -> str: class RdsCreateEventSubscriptionOperator(RdsBaseOperator): """ - Creates an RDS event notification subscription + Creates an RDS event notification subscription. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -470,7 +470,7 @@ def execute(self, context: Context) -> str: class RdsDeleteEventSubscriptionOperator(RdsBaseOperator): """ - Deletes an RDS event notification subscription + Deletes an RDS event notification subscription. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -507,7 +507,7 @@ def execute(self, context: Context) -> str: class RdsCreateDbInstanceOperator(RdsBaseOperator): """ - Creates an RDS DB instance + Creates an RDS DB instance. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -561,7 +561,7 @@ def execute(self, context: Context) -> str: class RdsDeleteDbInstanceOperator(RdsBaseOperator): """ - Deletes an RDS DB Instance + Deletes an RDS DB Instance. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -605,7 +605,7 @@ def execute(self, context: Context) -> str: class RdsStartDbOperator(RdsBaseOperator): """ - Starts an RDS DB instance / cluster + Starts an RDS DB instance / cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -658,7 +658,7 @@ def _wait_until_db_available(self): class RdsStopDbOperator(RdsBaseOperator): """ - Stops an RDS DB instance / cluster + Stops an RDS DB instance / cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py b/airflow/providers/amazon/aws/operators/redshift_cluster.py index 5b90ed3cdd756..905c34ff3ac56 100644 --- a/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -292,7 +292,7 @@ def execute_complete(self, context, event=None): class RedshiftCreateClusterSnapshotOperator(BaseOperator): """ - Creates a manual snapshot of the specified cluster. The cluster must be in the available state + Creates a manual snapshot of the specified cluster. The cluster must be in the available state. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -389,7 +389,7 @@ def execute_complete(self, context, event=None): class RedshiftDeleteClusterSnapshotOperator(BaseOperator): """ - Deletes the specified manual snapshot + Deletes the specified manual snapshot. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -444,7 +444,7 @@ def get_status(self) -> str: class RedshiftResumeClusterOperator(BaseOperator): """ - Resume a paused AWS Redshift Cluster + Resume a paused AWS Redshift Cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/redshift_data.py b/airflow/providers/amazon/aws/operators/redshift_data.py index 6d6ef9d103b77..bf560c9973cec 100644 --- a/airflow/providers/amazon/aws/operators/redshift_data.py +++ b/airflow/providers/amazon/aws/operators/redshift_data.py @@ -31,7 +31,7 @@ class RedshiftDataOperator(BaseOperator): """ - Executes SQL Statements against an Amazon Redshift cluster using Redshift Data + Executes SQL Statements against an Amazon Redshift cluster using Redshift Data. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -112,7 +112,7 @@ def hook(self) -> RedshiftDataHook: return RedshiftDataHook(aws_conn_id=self.aws_conn_id, region_name=self.region) def execute(self, context: Context) -> GetStatementResultResponseTypeDef | str: - """Execute a statement against Amazon Redshift""" + """Execute a statement against Amazon Redshift.""" self.log.info("Executing statement: %s", self.sql) self.statement_id = self.hook.execute_query( @@ -136,7 +136,7 @@ def execute(self, context: Context) -> GetStatementResultResponseTypeDef | str: return self.statement_id def on_kill(self) -> None: - """Cancel the submitted redshift query""" + """Cancel the submitted redshift query.""" if self.statement_id: self.log.info("Received a kill signal.") self.log.info("Stopping Query with statementId - %s", self.statement_id) diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py index d9ab1ab75c62e..8357a21ed6fa3 100644 --- a/airflow/providers/amazon/aws/operators/s3.py +++ b/airflow/providers/amazon/aws/operators/s3.py @@ -37,7 +37,7 @@ class S3CreateBucketOperator(BaseOperator): """ - This operator creates an S3 bucket + This operator creates an S3 bucket. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -78,7 +78,7 @@ def execute(self, context: Context): class S3DeleteBucketOperator(BaseOperator): """ - This operator deletes an S3 bucket + This operator deletes an S3 bucket. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -118,7 +118,7 @@ def execute(self, context: Context): class S3GetBucketTaggingOperator(BaseOperator): """ - This operator gets tagging from an S3 bucket + This operator gets tagging from an S3 bucket. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/sns.py b/airflow/providers/amazon/aws/operators/sns.py index 2f5b9844bf7f3..6b16dc074156c 100644 --- a/airflow/providers/amazon/aws/operators/sns.py +++ b/airflow/providers/amazon/aws/operators/sns.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Publish message to SNS queue""" +"""Publish message to SNS queue.""" from __future__ import annotations from typing import TYPE_CHECKING, Sequence diff --git a/airflow/providers/amazon/aws/operators/sqs.py b/airflow/providers/amazon/aws/operators/sqs.py index 0b0cfc4f16297..13bff5538f826 100644 --- a/airflow/providers/amazon/aws/operators/sqs.py +++ b/airflow/providers/amazon/aws/operators/sqs.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Publish message to SQS queue""" +"""Publish message to SQS queue.""" from __future__ import annotations from typing import TYPE_CHECKING, Sequence @@ -75,7 +75,7 @@ def __init__( def execute(self, context: Context) -> dict: """ - Publish the message to the Amazon SQS queue + Publish the message to the Amazon SQS queue. :param context: the context object :return: dict with information about the message sent diff --git a/airflow/providers/amazon/aws/secrets/secrets_manager.py b/airflow/providers/amazon/aws/secrets/secrets_manager.py index 1ccc95a21b4be..a5acf19a372e8 100644 --- a/airflow/providers/amazon/aws/secrets/secrets_manager.py +++ b/airflow/providers/amazon/aws/secrets/secrets_manager.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Objects relating to sourcing secrets from AWS Secrets Manager""" +"""Objects relating to sourcing secrets from AWS Secrets Manager.""" from __future__ import annotations import json @@ -33,7 +33,7 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin): """ - Retrieves Connection or Variables from AWS Secrets Manager + Retrieves Connection or Variables from AWS Secrets Manager. Configurable via ``airflow.cfg`` like so: @@ -178,7 +178,7 @@ def __init__( @cached_property def client(self): - """Create a Secrets Manager client""" + """Create a Secrets Manager client.""" from airflow.providers.amazon.aws.hooks.base_aws import SessionFactory from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper @@ -198,7 +198,7 @@ def client(self): return session.client(service_name="secretsmanager", **client_kwargs) def _standardize_secret_keys(self, secret: dict[str, Any]) -> dict[str, Any]: - """Standardize the names of the keys in the dict. These keys align with""" + """Standardize the names of the keys in the dict. These keys align with.""" possible_words_for_conn_fields = { "login": ["login", "user", "username", "user_name"], "password": ["password", "pass", "key"], @@ -225,7 +225,7 @@ def _standardize_secret_keys(self, secret: dict[str, Any]) -> dict[str, Any]: return conn_d def _remove_escaping_in_secret_dict(self, secret: dict[str, Any]) -> dict[str, Any]: - """Un-escape secret values that are URL-encoded""" + """Un-escape secret values that are URL-encoded.""" for k, v in secret.copy().items(): if k == "extra" and isinstance(v, dict): # The old behavior was that extras were _not_ urlencoded inside the secret. @@ -239,7 +239,7 @@ def _remove_escaping_in_secret_dict(self, secret: dict[str, Any]) -> dict[str, A def get_conn_value(self, conn_id: str) -> str | None: """ - Get serialized representation of Connection + Get serialized representation of Connection. :param conn_id: connection id """ @@ -270,7 +270,8 @@ def get_conn_value(self, conn_id: str) -> str | None: def get_variable(self, key: str) -> str | None: """ - Get Airflow Variable + Get Airflow Variable. + :param key: Variable Key :return: Variable Value """ @@ -281,7 +282,8 @@ def get_variable(self, key: str) -> str | None: def get_config(self, key: str) -> str | None: """ - Get Airflow Configuration + Get Airflow Configuration. + :param key: Configuration Option Key :return: Configuration Option Value """ @@ -292,7 +294,8 @@ def get_config(self, key: str) -> str | None: def _get_secret(self, path_prefix, secret_id: str, lookup_pattern: str | None) -> str | None: """ - Get secret value from Secrets Manager + Get secret value from Secrets Manager. + :param path_prefix: Prefix for the Path to get Secret :param secret_id: Secret Key :param lookup_pattern: If provided, `secret_id` must match this pattern to look up the secret in diff --git a/airflow/providers/amazon/aws/secrets/systems_manager.py b/airflow/providers/amazon/aws/secrets/systems_manager.py index f15ee384aaacb..8b1daca1f7dc1 100644 --- a/airflow/providers/amazon/aws/secrets/systems_manager.py +++ b/airflow/providers/amazon/aws/secrets/systems_manager.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Objects relating to sourcing connections from AWS SSM Parameter Store""" +"""Objects relating to sourcing connections from AWS SSM Parameter Store.""" from __future__ import annotations import re @@ -28,7 +28,7 @@ class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin): """ - Retrieves Connection or Variables from AWS SSM Parameter Store + Retrieves Connection or Variables from AWS SSM Parameter Store. Configurable via ``airflow.cfg`` like so: @@ -112,7 +112,7 @@ def __init__( @cached_property def client(self): - """Create a SSM client""" + """Create a SSM client.""" from airflow.providers.amazon.aws.hooks.base_aws import SessionFactory from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper @@ -133,7 +133,7 @@ def client(self): def get_conn_value(self, conn_id: str) -> str | None: """ - Get param value + Get param value. :param conn_id: connection id """ @@ -144,7 +144,7 @@ def get_conn_value(self, conn_id: str) -> str | None: def get_variable(self, key: str) -> str | None: """ - Get Airflow Variable + Get Airflow Variable. :param key: Variable Key :return: Variable Value @@ -156,7 +156,7 @@ def get_variable(self, key: str) -> str | None: def get_config(self, key: str) -> str | None: """ - Get Airflow Configuration + Get Airflow Configuration. :param key: Configuration Option Key :return: Configuration Option Value @@ -190,7 +190,7 @@ def _get_secret(self, path_prefix: str, secret_id: str, lookup_pattern: str | No def _ensure_leading_slash(self, ssm_path: str): """ - AWS Systems Manager mandate to have a leading "/". Adding it dynamically if not there to the SSM path + AWS Systems Manager mandate to have a leading "/". Adding it dynamically if not there to the SSM path. :param ssm_path: SSM parameter path """ diff --git a/airflow/providers/amazon/aws/sensors/athena.py b/airflow/providers/amazon/aws/sensors/athena.py index 40dc80a924bfb..f67fb3ff9ab02 100644 --- a/airflow/providers/amazon/aws/sensors/athena.py +++ b/airflow/providers/amazon/aws/sensors/athena.py @@ -87,5 +87,5 @@ def poke(self, context: Context) -> bool: @cached_property def hook(self) -> AthenaHook: - """Create and return an AthenaHook""" + """Create and return an AthenaHook.""" return AthenaHook(self.aws_conn_id, sleep_time=self.sleep_time) diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index 594be1e55ed93..c93fc3d8b3140 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/providers/amazon/aws/sensors/batch.py @@ -77,7 +77,7 @@ def poke(self, context: Context) -> bool: @deprecated(reason="use `hook` property instead.") def get_hook(self) -> BatchClientHook: - """Create and return a BatchClientHook""" + """Create and return a BatchClientHook.""" return self.hook @cached_property @@ -122,7 +122,7 @@ def __init__( @cached_property def hook(self) -> BatchClientHook: - """Create and return a BatchClientHook""" + """Create and return a BatchClientHook.""" return BatchClientHook( aws_conn_id=self.aws_conn_id, region_name=self.region_name, @@ -188,7 +188,7 @@ def __init__( @cached_property def hook(self) -> BatchClientHook: - """Create and return a BatchClientHook""" + """Create and return a BatchClientHook.""" return BatchClientHook( aws_conn_id=self.aws_conn_id, region_name=self.region_name, diff --git a/airflow/providers/amazon/aws/sensors/cloud_formation.py b/airflow/providers/amazon/aws/sensors/cloud_formation.py index df383cf2407a1..942735c4bcd31 100644 --- a/airflow/providers/amazon/aws/sensors/cloud_formation.py +++ b/airflow/providers/amazon/aws/sensors/cloud_formation.py @@ -61,7 +61,7 @@ def poke(self, context: Context): @cached_property def hook(self) -> CloudFormationHook: - """Create and return a CloudFormationHook""" + """Create and return a CloudFormationHook.""" return CloudFormationHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) @@ -105,5 +105,5 @@ def poke(self, context: Context): @cached_property def hook(self) -> CloudFormationHook: - """Create and return a CloudFormationHook""" + """Create and return a CloudFormationHook.""" return CloudFormationHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) diff --git a/airflow/providers/amazon/aws/sensors/dms.py b/airflow/providers/amazon/aws/sensors/dms.py index 35563d4346292..ec642e5df807b 100644 --- a/airflow/providers/amazon/aws/sensors/dms.py +++ b/airflow/providers/amazon/aws/sensors/dms.py @@ -64,7 +64,7 @@ def __init__( @deprecated(reason="use `hook` property instead.") def get_hook(self) -> DmsHook: - """Get DmsHook""" + """Get DmsHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/dynamodb.py b/airflow/providers/amazon/aws/sensors/dynamodb.py index 26a38c0bbdadd..298667e9996fc 100644 --- a/airflow/providers/amazon/aws/sensors/dynamodb.py +++ b/airflow/providers/amazon/aws/sensors/dynamodb.py @@ -80,7 +80,7 @@ def __init__( self.region_name = region_name def poke(self, context: Context) -> bool: - """Test DynamoDB item for matching attribute value""" + """Test DynamoDB item for matching attribute value.""" key = {self.partition_key_name: self.partition_key_value} msg = ( f"Checking table {self.table_name} for " @@ -110,5 +110,5 @@ def poke(self, context: Context) -> bool: @cached_property def hook(self) -> DynamoDBHook: - """Create and return a DynamoDBHook""" + """Create and return a DynamoDBHook.""" return DynamoDBHook(self.aws_conn_id, region_name=self.region_name) diff --git a/airflow/providers/amazon/aws/sensors/emr.py b/airflow/providers/amazon/aws/sensors/emr.py index ff134dc33d867..6272c048c9ef3 100644 --- a/airflow/providers/amazon/aws/sensors/emr.py +++ b/airflow/providers/amazon/aws/sensors/emr.py @@ -156,7 +156,7 @@ def poke(self, context: Context) -> bool: @cached_property def hook(self) -> EmrServerlessHook: - """Create and return an EmrServerlessHook""" + """Create and return an EmrServerlessHook.""" return EmrServerlessHook(aws_conn_id=self.aws_conn_id) @staticmethod @@ -212,7 +212,7 @@ def poke(self, context: Context) -> bool: @cached_property def hook(self) -> EmrServerlessHook: - """Create and return an EmrServerlessHook""" + """Create and return an EmrServerlessHook.""" return EmrServerlessHook(aws_conn_id=self.aws_conn_id) @staticmethod @@ -292,7 +292,7 @@ def poke(self, context: Context) -> bool: @cached_property def hook(self) -> EmrContainerHook: - """Create and return an EmrContainerHook""" + """Create and return an EmrContainerHook.""" return EmrContainerHook(self.aws_conn_id, virtual_cluster_id=self.virtual_cluster_id) diff --git a/airflow/providers/amazon/aws/sensors/glacier.py b/airflow/providers/amazon/aws/sensors/glacier.py index 0ae22fbce61c2..0ba2c6153487f 100644 --- a/airflow/providers/amazon/aws/sensors/glacier.py +++ b/airflow/providers/amazon/aws/sensors/glacier.py @@ -30,7 +30,7 @@ class JobStatus(Enum): - """Glacier jobs description""" + """Glacier jobs description.""" IN_PROGRESS = "InProgress" SUCCEEDED = "Succeeded" diff --git a/airflow/providers/amazon/aws/sensors/glue.py b/airflow/providers/amazon/aws/sensors/glue.py index 30e44a9f8e585..781853d6d222d 100644 --- a/airflow/providers/amazon/aws/sensors/glue.py +++ b/airflow/providers/amazon/aws/sensors/glue.py @@ -30,7 +30,8 @@ class GlueJobSensor(BaseSensorOperator): """ - Waits for an AWS Glue Job to reach any of the status below + Waits for an AWS Glue Job to reach any of the status below. + 'FAILED', 'STOPPED', 'SUCCEEDED' .. seealso:: diff --git a/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py b/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py index 6a4856b6c950b..2f5a00208422c 100644 --- a/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py +++ b/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py @@ -76,7 +76,7 @@ def __init__( self.database_name = database_name def poke(self, context: Context): - """Checks for existence of the partition in the AWS Glue Catalog table""" + """Checks for existence of the partition in the AWS Glue Catalog table.""" if "." in self.table_name: self.database_name, self.table_name = self.table_name.split(".") self.log.info( @@ -87,7 +87,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> GlueCatalogHook: - """Gets the GlueCatalogHook""" + """Gets the GlueCatalogHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/glue_crawler.py b/airflow/providers/amazon/aws/sensors/glue_crawler.py index b830fbffa16ec..4b3320b0f44d7 100644 --- a/airflow/providers/amazon/aws/sensors/glue_crawler.py +++ b/airflow/providers/amazon/aws/sensors/glue_crawler.py @@ -32,7 +32,8 @@ class GlueCrawlerSensor(BaseSensorOperator): """ - Waits for an AWS Glue crawler to reach any of the statuses below + Waits for an AWS Glue crawler to reach any of the statuses below. + 'FAILED', 'CANCELLED', 'SUCCEEDED' .. seealso:: @@ -68,7 +69,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> GlueCrawlerHook: - """Returns a new or pre-existing GlueCrawlerHook""" + """Returns a new or pre-existing GlueCrawlerHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/rds.py b/airflow/providers/amazon/aws/sensors/rds.py index 45a48e965a35b..00d055eaf60c3 100644 --- a/airflow/providers/amazon/aws/sensors/rds.py +++ b/airflow/providers/amazon/aws/sensors/rds.py @@ -29,7 +29,7 @@ class RdsBaseSensor(BaseSensorOperator): - """Base operator that implements common functions for all sensors""" + """Base operator that implements common functions for all sensors.""" ui_color = "#ddbb77" ui_fgcolor = "#ffffff" @@ -140,7 +140,7 @@ def poke(self, context: Context): class RdsDbSensor(RdsBaseSensor): """ - Waits for an RDS instance or cluster to enter one of a number of states + Waits for an RDS instance or cluster to enter one of a number of states. .. seealso:: For more information on how to use this sensor, take a look at the guide: diff --git a/airflow/providers/amazon/aws/sensors/redshift_cluster.py b/airflow/providers/amazon/aws/sensors/redshift_cluster.py index 9734e98bc75b3..2faa10e3bd15b 100644 --- a/airflow/providers/amazon/aws/sensors/redshift_cluster.py +++ b/airflow/providers/amazon/aws/sensors/redshift_cluster.py @@ -67,7 +67,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> RedshiftHook: - """Create and return a RedshiftHook""" + """Create and return a RedshiftHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/s3.py b/airflow/providers/amazon/aws/sensors/s3.py index 5d18b4619baee..7e46b6c91a8f4 100644 --- a/airflow/providers/amazon/aws/sensors/s3.py +++ b/airflow/providers/amazon/aws/sensors/s3.py @@ -133,7 +133,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> S3Hook: - """Create and return an S3Hook""" + """Create and return an S3Hook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/sqs.py b/airflow/providers/amazon/aws/sensors/sqs.py index be6698dcac334..f5bd0b385ca7a 100644 --- a/airflow/providers/amazon/aws/sensors/sqs.py +++ b/airflow/providers/amazon/aws/sensors/sqs.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Reads and then deletes the message from SQS queue""" +"""Reads and then deletes the message from SQS queue.""" from __future__ import annotations import json @@ -184,7 +184,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> SqsHook: - """Create and return an SqsHook""" + """Create and return an SqsHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/sensors/step_function.py b/airflow/providers/amazon/aws/sensors/step_function.py index e3345b4337312..ec6ee2b42271a 100644 --- a/airflow/providers/amazon/aws/sensors/step_function.py +++ b/airflow/providers/amazon/aws/sensors/step_function.py @@ -89,7 +89,7 @@ def poke(self, context: Context): @deprecated(reason="use `hook` property instead.") def get_hook(self) -> StepFunctionHook: - """Create and return a StepFunctionHook""" + """Create and return a StepFunctionHook.""" return self.hook @cached_property diff --git a/airflow/providers/amazon/aws/transfers/base.py b/airflow/providers/amazon/aws/transfers/base.py index b8ebfb55eb874..2a58c4844962a 100644 --- a/airflow/providers/amazon/aws/transfers/base.py +++ b/airflow/providers/amazon/aws/transfers/base.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains base AWS to AWS transfer operator""" +"""This module contains base AWS to AWS transfer operator.""" from __future__ import annotations import warnings @@ -33,7 +33,7 @@ class AwsToAwsBaseOperator(BaseOperator): """ - Base class for AWS to AWS transfer operators + Base class for AWS to AWS transfer operators. :param source_aws_conn_id: The Airflow connection used for AWS credentials to access DynamoDB. If this is None or empty then the default boto3 diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py index d7930222155c1..d2967235c1d17 100644 --- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py @@ -41,7 +41,7 @@ class JSONEncoder(json.JSONEncoder): - """Custom json encoder implementation""" + """Custom json encoder implementation.""" def default(self, obj): """Convert decimal objects in a json serializable format.""" @@ -136,7 +136,7 @@ def __init__( @cached_property def hook(self): - """Create DynamoDBHook""" + """Create DynamoDBHook.""" return DynamoDBHook(aws_conn_id=self.source_aws_conn_id) def execute(self, context: Context) -> None: diff --git a/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py b/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py index 3814d3f44ff57..60ee579025737 100644 --- a/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py +++ b/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py @@ -30,7 +30,7 @@ class GlacierToGCSOperator(BaseOperator): """ - Transfers data from Amazon Glacier to Google Cloud Storage + Transfers data from Amazon Glacier to Google Cloud Storage. .. note:: Please be warn that GlacierToGCSOperator may depends on memory usage. diff --git a/airflow/providers/amazon/aws/transfers/mongo_to_s3.py b/airflow/providers/amazon/aws/transfers/mongo_to_s3.py index 64390d9e351ab..d7432c395902d 100644 --- a/airflow/providers/amazon/aws/transfers/mongo_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/mongo_to_s3.py @@ -93,7 +93,7 @@ def __init__( self.compression = compression def execute(self, context: Context): - """Is written to depend on transform method""" + """Is written to depend on transform method.""" s3_conn = S3Hook(self.aws_conn_id) # Grab collection and execute query according to whether or not it is a pipeline @@ -129,7 +129,7 @@ def execute(self, context: Context): def _stringify(iterable: Iterable, joinable: str = "\n") -> str: """ Takes an iterable (pymongo Cursor or Array) containing dictionaries and - returns a stringified version using python join + returns a stringified version using python join. """ return joinable.join([json.dumps(doc, default=json_util.default) for doc in iterable]) @@ -138,7 +138,7 @@ def transform(docs: Any) -> Any: """This method is meant to be extended by child classes to perform transformations unique to those operators needs. Processes pyMongo cursor and returns an iterable with each element being - a JSON serializable dictionary + a JSON serializable dictionary. Base transform() assumes no processing is needed ie. docs is a pyMongo cursor of documents and cursor just diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py index f573c73ca4327..0d2a059f6e727 100644 --- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py +++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py @@ -34,7 +34,7 @@ class S3ToRedshiftOperator(BaseOperator): """ - Executes an COPY command to load files from s3 to Redshift + Executes an COPY command to load files from s3 to Redshift. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/transfers/s3_to_sql.py b/airflow/providers/amazon/aws/transfers/s3_to_sql.py index e52ebbaa1bc68..916ba183d7d1b 100644 --- a/airflow/providers/amazon/aws/transfers/s3_to_sql.py +++ b/airflow/providers/amazon/aws/transfers/s3_to_sql.py @@ -33,7 +33,7 @@ class S3ToSqlOperator(BaseOperator): """ Loads Data from S3 into a SQL Database. You need to provide a parser function that takes a filename as an input - and returns an iterable of rows + and returns an iterable of rows. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/transfers/sql_to_s3.py b/airflow/providers/amazon/aws/transfers/sql_to_s3.py index 04aafdd67c0f5..aa8382541cb61 100644 --- a/airflow/providers/amazon/aws/transfers/sql_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/sql_to_s3.py @@ -188,7 +188,7 @@ def execute(self, context: Context) -> None: ) def _partition_dataframe(self, df: DataFrame) -> Iterable[tuple[str, DataFrame]]: - """Partition dataframe using pandas groupby() method""" + """Partition dataframe using pandas groupby() method.""" if not self.groupby_kwargs: yield "", df else: diff --git a/airflow/providers/amazon/aws/triggers/redshift_cluster.py b/airflow/providers/amazon/aws/triggers/redshift_cluster.py index fecfda91da497..b3770bd395c6c 100644 --- a/airflow/providers/amazon/aws/triggers/redshift_cluster.py +++ b/airflow/providers/amazon/aws/triggers/redshift_cluster.py @@ -29,6 +29,7 @@ class RedshiftCreateClusterTrigger(BaseTrigger): """ Trigger for RedshiftCreateClusterOperator. + The trigger will asynchronously poll the boto3 API and wait for the Redshift cluster to be in the `available` state. @@ -80,6 +81,7 @@ async def run(self): class RedshiftPauseClusterTrigger(BaseTrigger): """ Trigger for RedshiftPauseClusterOperator. + The trigger will asynchronously poll the boto3 API and wait for the Redshift cluster to be in the `paused` state. @@ -150,6 +152,7 @@ async def run(self): class RedshiftCreateClusterSnapshotTrigger(BaseTrigger): """ Trigger for RedshiftCreateClusterSnapshotOperator. + The trigger will asynchronously poll the boto3 API and wait for the Redshift cluster snapshot to be in the `available` state. @@ -225,6 +228,7 @@ async def run(self): class RedshiftResumeClusterTrigger(BaseTrigger): """ Trigger for RedshiftResumeClusterOperator. + The trigger will asynchronously poll the boto3 API and wait for the Redshift cluster to be in the `available` state. @@ -296,7 +300,7 @@ async def run(self): class RedshiftDeleteClusterTrigger(BaseTrigger): """ - Trigger for RedshiftDeleteClusterOperator + Trigger for RedshiftDeleteClusterOperator. :param cluster_identifier: A unique identifier for the cluster. :param max_attempts: The maximum number of attempts to be made. diff --git a/airflow/providers/amazon/aws/utils/connection_wrapper.py b/airflow/providers/amazon/aws/utils/connection_wrapper.py index bf5c43b8049c5..1bbc873e9c646 100644 --- a/airflow/providers/amazon/aws/utils/connection_wrapper.py +++ b/airflow/providers/amazon/aws/utils/connection_wrapper.py @@ -429,7 +429,7 @@ def _parse_s3_config( ) -> tuple[str | None, str | None]: """ Parses a config file for s3 credentials. Can currently - parse boto, s3cmd.conf and AWS SDK config formats + parse boto, s3cmd.conf and AWS SDK config formats. :param config_file_name: path to the config file :param config_format: config type. One of "boto", "s3cmd" or "aws". diff --git a/airflow/providers/amazon/aws/utils/emailer.py b/airflow/providers/amazon/aws/utils/emailer.py index 3e00abc78a08c..5a5cdd15c8bc0 100644 --- a/airflow/providers/amazon/aws/utils/emailer.py +++ b/airflow/providers/amazon/aws/utils/emailer.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Airflow module for email backend using AWS SES""" +"""Airflow module for email backend using AWS SES.""" from __future__ import annotations from typing import Any diff --git a/airflow/providers/amazon/aws/utils/rds.py b/airflow/providers/amazon/aws/utils/rds.py index 873f2cf83ecf0..1ba511c4b4e9a 100644 --- a/airflow/providers/amazon/aws/utils/rds.py +++ b/airflow/providers/amazon/aws/utils/rds.py @@ -20,7 +20,7 @@ class RdsDbType(Enum): - """Only available types for the RDS""" + """Only available types for the RDS.""" INSTANCE: str = "instance" CLUSTER: str = "cluster" diff --git a/airflow/providers/amazon/aws/utils/redshift.py b/airflow/providers/amazon/aws/utils/redshift.py index d931cb047430d..1ef490422d2d0 100644 --- a/airflow/providers/amazon/aws/utils/redshift.py +++ b/airflow/providers/amazon/aws/utils/redshift.py @@ -26,7 +26,8 @@ def build_credentials_block(credentials: ReadOnlyCredentials) -> str: """ Generate AWS credentials block for Redshift COPY and UNLOAD - commands, as noted in AWS docs + commands, as noted in AWS docs. + https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html#copy-credentials :param credentials: ReadOnlyCredentials object from `botocore`