diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py index 6a80e20b7a81e..3f151318a3ff0 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py @@ -523,7 +523,7 @@ def _list_all(self, api_call: Callable, response_key: str, verbose: bool) -> lis :return: A List of the combined results of the provided API call. """ name_collection: list = [] - token = DEFAULT_PAGINATION_TOKEN + token: str | None = DEFAULT_PAGINATION_TOKEN while token is not None: response = api_call(nextToken=token) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/athena.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/athena.py index 44eb6590aeb83..537446d8dde6e 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/athena.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/athena.py @@ -239,7 +239,7 @@ def get_openlineage_facets_on_complete(self, _) -> OperatorLineage: run_facets: dict[str, BaseFacet] = {} if parse_result.errors: run_facets["extractionError"] = ExtractionErrorRunFacet( - totalTasks=len(self.query) if isinstance(self.query, list) else 1, + totalTasks=1, failedTasks=len(parse_result.errors), errors=[ Error( diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py index 23bcb0e17b28a..9fcf45fc30810 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py @@ -86,7 +86,7 @@ def __init__(self, *, config: dict, **kwargs): super().__init__(**kwargs) self.config = config - def parse_integer(self, config: dict, field: list[str] | str) -> None: + def parse_integer(self, config: dict | list, field: list[str] | str) -> None: """Recursive method for parsing string fields holding integer values to integers.""" if len(field) == 1: if isinstance(config, list): @@ -993,7 +993,7 @@ def execute(self, context: Context) -> dict: ) if response["ResponseMetadata"]["HTTPStatusCode"] != 200: raise AirflowException(f"Sagemaker Tuning Job creation failed: {response}") - + description: dict = {} if self.deferrable: self.defer( trigger=SageMakerTrigger( @@ -1009,7 +1009,6 @@ def execute(self, context: Context) -> dict: else None ), ) - description = {} # never executed but makes static checkers happy elif self.wait_for_completion: description = self.hook.check_status( self.config["HyperParameterTuningJobName"], diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py index 1f3f79d86f0ce..e31652af4d4aa 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py @@ -161,7 +161,7 @@ def _fix_dtypes(df: pd.DataFrame, file_format: FILE_FORMAT) -> None: raise AirflowOptionalProviderFeatureException(e) for col in df: - if df[col].dtype.name == "object" and file_format == "parquet": + if df[col].dtype.name == "object" and file_format == FILE_FORMAT.PARQUET: # if the type wasn't identified or converted, change it to a string so if can still be # processed. df[col] = df[col].astype(str) diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_sql_to_s3.py b/providers/amazon/tests/unit/amazon/aws/transfers/test_sql_to_s3.py index cb1d5828df4e1..6174582915700 100644 --- a/providers/amazon/tests/unit/amazon/aws/transfers/test_sql_to_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_sql_to_s3.py @@ -185,9 +185,10 @@ def test_fix_dtypes(self, params): s3_key="s3_key", task_id="task_id", sql_conn_id="mysql_conn_id", + file_format=params["file_format"], ) dirty_df = pd.DataFrame({"strings": ["a", "b", None], "ints": [1, 2, None]}) - op._fix_dtypes(df=dirty_df, file_format=params["file_format"]) + op._fix_dtypes(df=dirty_df, file_format=op.file_format) assert dirty_df["strings"].values[2] == params["null_string_result"] assert dirty_df["ints"].dtype.kind == "i"