Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion providers/amazon/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ PIP package Version required
========================================== =====================
``apache-airflow`` ``>=2.10.0``
``apache-airflow-providers-common-compat`` ``>=1.6.1``
``apache-airflow-providers-common-sql`` ``>=1.20.0``
``apache-airflow-providers-common-sql`` ``>=1.27.0``
``apache-airflow-providers-http``
``boto3`` ``>=1.37.0``
``botocore`` ``>=1.37.0``
Expand Down
10 changes: 2 additions & 8 deletions providers/amazon/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ requires-python = "~=3.9"
dependencies = [
"apache-airflow>=2.10.0",
"apache-airflow-providers-common-compat>=1.6.1",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow-providers-common-sql>=1.27.0",
"apache-airflow-providers-http",
# We should update minimum version of boto3 and here regularly to avoid `pip` backtracking with the number
# of candidates to consider. Make sure to configure boto3 version here as well as in all the tools below
Expand Down Expand Up @@ -88,13 +88,6 @@ dependencies = [
# The optional dependencies should be modified in place in the generated file
# Any change in the dependencies is preserved when the file is regenerated
[project.optional-dependencies]
"pandas" = [
# In pandas 2.2 minimal version of the sqlalchemy is 2.0
# https://pandas.pydata.org/docs/whatsnew/v2.2.0.html#increased-minimum-versions-for-dependencies
# However Airflow not fully supports it yet: https://github.com/apache/airflow/issues/28723
# In addition FAB also limit sqlalchemy to < 2.0
"pandas>=2.1.2,<2.2",
]
# There is conflict between boto3 and aiobotocore dependency botocore.
# TODO: We can remove it once boto3 and aiobotocore both have compatible botocore version or
# boto3 have native async support and we move away from aio aiobotocore
Expand Down Expand Up @@ -187,6 +180,7 @@ dev = [
"openapi-spec-validator>=0.7.1",
"opensearch-py>=2.2.0",
"responses>=0.25.0",
"apache-airflow-providers-common-sql[pandas,polars]",
]

# To build docs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def execute(self, context: Context):
self.log.info("Extracting data from Hive")
self.log.info(self.sql)

data = hive.get_pandas_df(self.sql, schema=self.schema)
data = hive.get_df(self.sql, schema=self.schema, df_type="pandas")
dynamodb = DynamoDBHook(
aws_conn_id=self.aws_conn_id,
table_name=self.table_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def _fix_dtypes(df: pd.DataFrame, file_format: FILE_FORMAT) -> None:
def execute(self, context: Context) -> None:
sql_hook = self._get_hook()
s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
data_df = sql_hook.get_pandas_df(sql=self.query, parameters=self.parameters)
data_df = sql_hook.get_df(sql=self.query, parameters=self.parameters, df_type="pandas")
self.log.info("Data from SQL obtained")
self._fix_dtypes(data_df, self.file_format)
file_options = FILE_OPTIONS_MAP[self.file_format]
Expand Down Expand Up @@ -233,8 +233,6 @@ def _get_hook(self) -> DbApiHook:
self.log.debug("Get connection for %s", self.sql_conn_id)
conn = BaseHook.get_connection(self.sql_conn_id)
hook = conn.get_hook(hook_params=self.sql_hook_params)
if not callable(getattr(hook, "get_pandas_df", None)):
raise AirflowException(
"This hook is not supported. The hook class must have get_pandas_df method."
)
if not callable(getattr(hook, "get_df", None)):
raise AirflowException("This hook is not supported. The hook class must have get_df method.")
return hook
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ def test_get_conn_returns_a_boto3_connection(self):
assert hook.get_conn() is not None

@mock.patch(
"airflow.providers.apache.hive.hooks.hive.HiveServer2Hook.get_pandas_df",
"airflow.providers.apache.hive.hooks.hive.HiveServer2Hook.get_df",
return_value=pd.DataFrame(data=[("1", "sid")], columns=["id", "name"]),
)
@mock_aws
def test_get_records_with_schema(self, mock_get_pandas_df):
def test_get_records_with_schema(self, mock_get_df):
# this table needs to be created in production
self.hook.get_conn().create_table(
TableName="test_airflow",
Expand All @@ -81,11 +81,11 @@ def test_get_records_with_schema(self, mock_get_pandas_df):
assert table.item_count == 1

@mock.patch(
"airflow.providers.apache.hive.hooks.hive.HiveServer2Hook.get_pandas_df",
"airflow.providers.apache.hive.hooks.hive.HiveServer2Hook.get_df",
return_value=pd.DataFrame(data=[("1", "sid"), ("1", "gupta")], columns=["id", "name"]),
)
@mock_aws
def test_pre_process_records_with_schema(self, mock_get_pandas_df):
def test_pre_process_records_with_schema(self, mock_get_df):
# this table needs to be created in production
self.hook.get_conn().create_table(
TableName="test_airflow",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def test_execute_csv(self, mock_s3_hook, temp_mock):

mock_dbapi_hook = mock.Mock()
test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
get_pandas_df_mock.return_value = test_df
get_df_mock = mock_dbapi_hook.return_value.get_df
get_df_mock.return_value = test_df
with NamedTemporaryFile() as f:
temp_mock.return_value.__enter__.return_value.name = f.name

Expand All @@ -58,7 +58,7 @@ def test_execute_csv(self, mock_s3_hook, temp_mock):
op.execute(None)
mock_s3_hook.assert_called_once_with(aws_conn_id="aws_conn_id", verify=None)

get_pandas_df_mock.assert_called_once_with(sql=query, parameters=None)
get_df_mock.assert_called_once_with(sql=query, parameters=None, df_type="pandas")

temp_mock.assert_called_once_with(mode="r+", suffix=".csv")
mock_s3_hook.return_value.load_file.assert_called_once_with(
Expand All @@ -78,8 +78,8 @@ def test_execute_parquet(self, mock_s3_hook, temp_mock):
mock_dbapi_hook = mock.Mock()

test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
get_pandas_df_mock.return_value = test_df
get_df_mock = mock_dbapi_hook.return_value.get_df
get_df_mock.return_value = test_df
with NamedTemporaryFile() as f:
temp_mock.return_value.__enter__.return_value.name = f.name

Expand All @@ -98,7 +98,7 @@ def test_execute_parquet(self, mock_s3_hook, temp_mock):
op.execute(None)
mock_s3_hook.assert_called_once_with(aws_conn_id="aws_conn_id", verify=None)

get_pandas_df_mock.assert_called_once_with(sql=query, parameters=None)
get_df_mock.assert_called_once_with(sql=query, parameters=None, df_type="pandas")

temp_mock.assert_called_once_with(mode="rb+", suffix=".parquet")
mock_s3_hook.return_value.load_file.assert_called_once_with(
Expand All @@ -114,8 +114,8 @@ def test_execute_json(self, mock_s3_hook, temp_mock):

mock_dbapi_hook = mock.Mock()
test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
get_pandas_df_mock.return_value = test_df
get_df_mock = mock_dbapi_hook.return_value.get_df
get_df_mock.return_value = test_df
with NamedTemporaryFile() as f:
temp_mock.return_value.__enter__.return_value.name = f.name

Expand All @@ -135,7 +135,7 @@ def test_execute_json(self, mock_s3_hook, temp_mock):
op.execute(None)
mock_s3_hook.assert_called_once_with(aws_conn_id="aws_conn_id", verify=None)

get_pandas_df_mock.assert_called_once_with(sql=query, parameters=None)
get_df_mock.assert_called_once_with(sql=query, parameters=None, df_type="pandas")

temp_mock.assert_called_once_with(mode="r+", suffix=".json")
mock_s3_hook.return_value.load_file.assert_called_once_with(
Expand Down