Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions providers/amazon/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ dependencies = [
"common.messaging" = [
"apache-airflow-providers-common-messaging>=2.0.0"
]
"sqlalchemy" = [
"sqlalchemy>=1.4.49",
]

[dependency-groups]
dev = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
from typing import TYPE_CHECKING, Any

import pyathena
from sqlalchemy.engine.url import URL

try:
from sqlalchemy.engine.url import URL
except ImportError:
URL = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk Please note before approving similar PRs that leaving URL untyped (i.e. URL: Any before the try block) or unignored (# type: ignore[assignment,misc]) leads to mypy violations on SQLA2:

providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py:30: error: Cannot assign to a type  [misc]
        URL = create_engine = None
        ^~~
providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py:30: error: Incompatible types in assignment (expression has
type "None", variable has type "type[URL]")  [assignment]
        URL = create_engine = None
        ^
providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py:30: error: Incompatible types in assignment (expression has
type "None", variable has type overloaded function)  [assignment]
        URL = create_engine = None
                              ^~~~
providers/amazon/src/airflow/providers/amazon/aws/hooks/athena_sql.py:28: error: Cannot assign to a type  [misc]
        URL = None
        ^~~
providers/amazon/src/airflow/providers/amazon/aws/hooks/athena_sql.py:28: error: Incompatible types in assignment (expression has type
"None", variable has type "type[URL]")  [assignment]
        URL = None
              ^~~~

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. That's unfortunate indeed


from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper
Expand Down Expand Up @@ -152,9 +156,15 @@ def _get_conn_params(self) -> dict[str, str | None]:

def get_uri(self) -> str:
"""Overridden to use the Athena dialect as driver name."""
from airflow.exceptions import AirflowOptionalProviderFeatureException

if URL is None:
raise AirflowOptionalProviderFeatureException(
"sqlalchemy is required to generate the connection URI. "
"Install it with: pip install 'apache-airflow-providers-amazon[sqlalchemy]'"
)
conn_params = self._get_conn_params()
creds = self.get_credentials(region_name=conn_params["region_name"])

return URL.create(
f"awsathena+{conn_params['driver']}",
username=creds.access_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
import redshift_connector
import tenacity
from redshift_connector import Connection as RedshiftConnection, InterfaceError, OperationalError
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL

try:
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
except ImportError:
URL = create_engine = None

from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.common.compat.sdk import AirflowException
from airflow.providers.common.sql.hooks.sql import DbApiHook
Expand Down Expand Up @@ -151,6 +156,11 @@ def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:

def get_uri(self) -> str:
"""Overridden to use the Redshift dialect as driver name."""
if URL is None:
raise AirflowOptionalProviderFeatureException(
"sqlalchemy is required to generate the connection URI. "
"Install it with: pip install 'apache-airflow-providers-amazon[sqlalchemy]'"
)
conn_params = self._get_conn_params()

if "user" in conn_params:
Expand All @@ -174,6 +184,11 @@ def get_uri(self) -> str:

def get_sqlalchemy_engine(self, engine_kwargs=None):
"""Overridden to pass Redshift-specific arguments."""
if create_engine is None:
raise AirflowOptionalProviderFeatureException(
"sqlalchemy is required for creating the engine. Install it with"
": pip install 'apache-airflow-providers-amazon[sqlalchemy]'"
)
conn_kwargs = self.conn.extra_dejson
if engine_kwargs is None:
engine_kwargs = {}
Expand Down