From 5134237172b9fca7a0bf29dded9938a28c05949a Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Sat, 12 Apr 2025 01:37:51 +0800 Subject: [PATCH 01/11] feat: `add get_df` and `get_polar_df` --- docs/spelling_wordlist.txt | 1 + providers/common/sql/pyproject.toml | 4 + .../airflow/providers/common/sql/hooks/sql.py | 85 +++++++++++++++++++ 3 files changed, 90 insertions(+) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index afc4a159ce315..ddb7fdfdb51ba 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1305,6 +1305,7 @@ podName PodSpec podSpec podspec +polars poller polyfill pooler diff --git a/providers/common/sql/pyproject.toml b/providers/common/sql/pyproject.toml index 1e458271ab386..78df7c0eac931 100644 --- a/providers/common/sql/pyproject.toml +++ b/providers/common/sql/pyproject.toml @@ -78,6 +78,9 @@ dependencies = [ "openlineage" = [ "apache-airflow-providers-openlineage" ] +"polars" = [ + "polars>=1.26.0" +] [dependency-groups] dev = [ @@ -87,6 +90,7 @@ dev = [ "apache-airflow-providers-openlineage", # Additional devel dependencies (do not remove this line and add extra development dependencies) "apache-airflow-providers-common-sql[pandas]", + "apache-airflow-providers-common-sql[polars]", "apache-airflow-providers-mysql", "apache-airflow-providers-postgres", "apache-airflow-providers-odbc", diff --git a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py index 2804038e87b42..5f34c157f6d51 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py @@ -34,16 +34,19 @@ from urllib.parse import urlparse import sqlparse +from deprecated import deprecated from methodtools import lru_cache from more_itertools import chunked from sqlalchemy import create_engine from sqlalchemy.engine import Inspector, make_url from sqlalchemy.exc import ArgumentError, NoSuchModuleError +from typing_extensions import Literal from airflow.configuration import conf from airflow.exceptions import ( AirflowException, AirflowOptionalProviderFeatureException, + AirflowProviderDeprecationWarning, ) from airflow.hooks.base import BaseHook from airflow.providers.common.sql.dialects.dialect import Dialect @@ -52,6 +55,7 @@ if TYPE_CHECKING: from pandas import DataFrame + from polars import DataFrame as PolarsDataFrame from sqlalchemy.engine import URL from airflow.models import Connection @@ -375,6 +379,56 @@ def get_reserved_words(self, dialect_name: str) -> set[str]: self.log.debug("reserved words for '%s': %s", dialect_name, result) return result + def get_df( + self, + sql, + parameters: list | tuple | Mapping[str, Any] | None = None, + *, + df_type: Literal["pandas", "polars"] = "pandas", + **kwargs, + ) -> DataFrame | PolarsDataFrame: + """ + Execute the sql and returns a dataframe. + + :param sql: the sql statement to be executed (str) or a list of sql statements to execute + :param parameters: The parameters to render the SQL query with. + :param df_type: Type of dataframe to return, either "pandas" or "polars" + :param kwargs: (optional) passed into `pandas.io.sql.read_sql` or `polars.read_database` method + """ + if df_type == "pandas": + return self._get_pandas_df(sql, parameters, **kwargs) + elif df_type == "polars": + return self._get_polars_df(sql, parameters, **kwargs) + + def _get_pandas_df( + self, + sql, + parameters: list | tuple | Mapping[str, Any] | None = None, + **kwargs, + ) -> DataFrame: + """ + Execute the sql and returns a pandas dataframe. + + :param sql: the sql statement to be executed (str) or a list of sql statements to execute + :param parameters: The parameters to render the SQL query with. + :param kwargs: (optional) passed into pandas.io.sql.read_sql method + """ + try: + from pandas.io import sql as psql + except ImportError: + raise AirflowOptionalProviderFeatureException( + "pandas library not installed, run: pip install " + "'apache-airflow-providers-common-sql[pandas]'." + ) + + with closing(self.get_conn()) as conn: + return psql.read_sql(sql, con=conn, params=parameters, **kwargs) + + @deprecated( + reason="Replaced by function `get_df`.", + category=AirflowProviderDeprecationWarning, + action="ignore", + ) def get_pandas_df( self, sql, @@ -426,6 +480,37 @@ def get_pandas_df_by_chunks( with closing(self.get_conn()) as conn: yield from psql.read_sql(sql, con=conn, params=parameters, chunksize=chunksize, **kwargs) + def _get_polars_df( + self, + sql, + parameters: list | tuple | Mapping[str, Any] | None = None, + **kwargs, + ) -> PolarsDataFrame: + """ + Execute the sql and returns a polars dataframe. + + :param sql: the sql statement to be executed (str) or a list of sql statements to execute + :param parameters: The parameters to render the SQL query with. + :param kwargs: (optional) passed into polars.read_database method + """ + try: + import polars as pl + except ImportError: + raise AirflowOptionalProviderFeatureException( + "polars library not installed, run: pip install " + "'apache-airflow-providers-common-sql[polars]'." + ) + + with closing(self.get_conn()) as conn: + execute_options: dict[str, Any] | None = None + if parameters is not None: + if isinstance(parameters, Mapping): + execute_options = dict(parameters) + else: + execute_options = {} + + return pl.read_database(sql, connection=conn, execute_options=execute_options, **kwargs) + def get_records( self, sql: str | list[str], From 2c9ce3a0d4b9f6c19164708f0ecc1e5e251f247a Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Sat, 12 Apr 2025 03:07:55 +0800 Subject: [PATCH 02/11] refactor: prevent code duplication Co-Authored-By: Elad Kalif <45845474+eladkal@users.noreply.github.com> --- .../sql/src/airflow/providers/common/sql/hooks/sql.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py index 5f34c157f6d51..24b76502a6b90 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py @@ -442,16 +442,7 @@ def get_pandas_df( :param parameters: The parameters to render the SQL query with. :param kwargs: (optional) passed into pandas.io.sql.read_sql method """ - try: - from pandas.io import sql as psql - except ImportError: - raise AirflowOptionalProviderFeatureException( - "pandas library not installed, run: pip install " - "'apache-airflow-providers-common-sql[pandas]'." - ) - - with closing(self.get_conn()) as conn: - return psql.read_sql(sql, con=conn, params=parameters, **kwargs) + return self.get_df(sql, parameters, df_type="pandas", **kwargs) def get_pandas_df_by_chunks( self, From ea79bf7f01b0f0338b1b14cfe6a80f8afd9cc9c0 Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Sat, 12 Apr 2025 16:39:47 +0800 Subject: [PATCH 03/11] type: use _get_pandas_df --- .../common/sql/src/airflow/providers/common/sql/hooks/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py index 24b76502a6b90..eecb37e31dee2 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py @@ -442,7 +442,7 @@ def get_pandas_df( :param parameters: The parameters to render the SQL query with. :param kwargs: (optional) passed into pandas.io.sql.read_sql method """ - return self.get_df(sql, parameters, df_type="pandas", **kwargs) + return self._get_pandas_df(sql, parameters, **kwargs) def get_pandas_df_by_chunks( self, From 0f44019d800c2efca92fe5eceb0a6eae8ab0d3ec Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Mon, 14 Apr 2025 23:44:27 +0800 Subject: [PATCH 04/11] feat: `add get_df_by_chunks` --- .../airflow/providers/common/sql/hooks/sql.py | 119 ++++++++++++++---- 1 file changed, 95 insertions(+), 24 deletions(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py index eecb37e31dee2..0ed736c283f57 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py @@ -379,6 +379,41 @@ def get_reserved_words(self, dialect_name: str) -> set[str]: self.log.debug("reserved words for '%s': %s", dialect_name, result) return result + @deprecated( + reason="Replaced by function `get_df`.", + category=AirflowProviderDeprecationWarning, + action="ignore", + ) + def get_pandas_df( + self, + sql, + parameters: list | tuple | Mapping[str, Any] | None = None, + **kwargs, + ) -> DataFrame: + """ + Execute the sql and returns a pandas dataframe. + + :param sql: the sql statement to be executed (str) or a list of sql statements to execute + :param parameters: The parameters to render the SQL query with. + :param kwargs: (optional) passed into pandas.io.sql.read_sql method + """ + return self._get_pandas_df(sql, parameters, **kwargs) + + @deprecated( + reason="Replaced by function `get_df_by_chunks`.", + category=AirflowProviderDeprecationWarning, + action="ignore", + ) + def get_pandas_df_by_chunks( + self, + sql, + parameters: list | tuple | Mapping[str, Any] | None = None, + *, + chunksize: int, + **kwargs, + ) -> Generator[DataFrame, None, None]: + return self._get_pandas_df_by_chunks(sql, parameters, chunksize=chunksize, **kwargs) + def get_df( self, sql, @@ -424,27 +459,61 @@ def _get_pandas_df( with closing(self.get_conn()) as conn: return psql.read_sql(sql, con=conn, params=parameters, **kwargs) - @deprecated( - reason="Replaced by function `get_df`.", - category=AirflowProviderDeprecationWarning, - action="ignore", - ) - def get_pandas_df( + def _get_polars_df( self, sql, parameters: list | tuple | Mapping[str, Any] | None = None, **kwargs, - ) -> DataFrame: + ) -> PolarsDataFrame: """ - Execute the sql and returns a pandas dataframe. + Execute the sql and returns a polars dataframe. :param sql: the sql statement to be executed (str) or a list of sql statements to execute :param parameters: The parameters to render the SQL query with. - :param kwargs: (optional) passed into pandas.io.sql.read_sql method + :param kwargs: (optional) passed into polars.read_database method """ - return self._get_pandas_df(sql, parameters, **kwargs) + try: + import polars as pl + except ImportError: + raise AirflowOptionalProviderFeatureException( + "polars library not installed, run: pip install " + "'apache-airflow-providers-common-sql[polars]'." + ) - def get_pandas_df_by_chunks( + with closing(self.get_conn()) as conn: + execute_options: dict[str, Any] | None = None + if parameters is not None: + if isinstance(parameters, Mapping): + execute_options = dict(parameters) + else: + execute_options = {} + + return pl.read_database(sql, connection=conn, execute_options=execute_options, **kwargs) + + def get_df_by_chunks( + self, + sql, + parameters: list | tuple | Mapping[str, Any] | None = None, + *, + chunksize: int, + df_type: Literal["pandas", "polars"] = "pandas", + **kwargs, + ) -> Generator[DataFrame | PolarsDataFrame, None, None]: + """ + Execute the sql and return a generator. + + :param sql: the sql statement to be executed (str) or a list of sql statements to execute + :param parameters: The parameters to render the SQL query with + :param chunksize: number of rows to include in each chunk + :param df_type: Type of dataframe to return, either "pandas" or "polars" + :param kwargs: (optional) passed into `pandas.io.sql.read_sql` or `polars.read_database` method + """ + if df_type == "pandas": + return self._get_pandas_df_by_chunks(sql, parameters, chunksize=chunksize, **kwargs) + elif df_type == "polars": + return self._get_polars_df_by_chunks(sql, parameters, chunksize=chunksize, **kwargs) + + def _get_pandas_df_by_chunks( self, sql, parameters: list | tuple | Mapping[str, Any] | None = None, @@ -467,22 +536,24 @@ def get_pandas_df_by_chunks( "pandas library not installed, run: pip install " "'apache-airflow-providers-common-sql[pandas]'." ) - with closing(self.get_conn()) as conn: yield from psql.read_sql(sql, con=conn, params=parameters, chunksize=chunksize, **kwargs) - def _get_polars_df( + def _get_polars_df_by_chunks( self, sql, parameters: list | tuple | Mapping[str, Any] | None = None, + *, + chunksize: int, **kwargs, - ) -> PolarsDataFrame: + ) -> Generator[PolarsDataFrame, None, None]: """ - Execute the sql and returns a polars dataframe. + Execute the sql and return a generator. :param sql: the sql statement to be executed (str) or a list of sql statements to execute :param parameters: The parameters to render the SQL query with. - :param kwargs: (optional) passed into polars.read_database method + :param chunksize: number of rows to include in each chunk + :param kwargs: (optional) passed into pandas.io.sql.read_sql method """ try: import polars as pl @@ -493,14 +564,14 @@ def _get_polars_df( ) with closing(self.get_conn()) as conn: - execute_options: dict[str, Any] | None = None - if parameters is not None: - if isinstance(parameters, Mapping): - execute_options = dict(parameters) - else: - execute_options = {} - - return pl.read_database(sql, connection=conn, execute_options=execute_options, **kwargs) + yield from pl.read_database( + sql, + connection=conn, + execute_options=parameters, + iter_batches=True, + batch_size=chunksize, + **kwargs, + ) def get_records( self, From c9b44f680fda50957f0176e30eebeb14ba3a2a6e Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Tue, 15 Apr 2025 02:27:46 +0800 Subject: [PATCH 05/11] fix: mypy error in ci --- .../airflow/providers/common/sql/hooks/sql.py | 18 ++++++++---------- .../google/cloud/life_sciences/__init__.py | 16 ++++++++++++++++ .../cloud/life_sciences/resources/__init__.py | 16 ++++++++++++++++ 3 files changed, 40 insertions(+), 10 deletions(-) create mode 100644 providers/google/tests/system/google/cloud/life_sciences/__init__.py create mode 100644 providers/google/tests/system/google/cloud/life_sciences/resources/__init__.py diff --git a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py index 0ed736c283f57..9917592c90115 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py @@ -432,7 +432,7 @@ def get_df( """ if df_type == "pandas": return self._get_pandas_df(sql, parameters, **kwargs) - elif df_type == "polars": + if df_type == "polars": return self._get_polars_df(sql, parameters, **kwargs) def _get_pandas_df( @@ -510,7 +510,7 @@ def get_df_by_chunks( """ if df_type == "pandas": return self._get_pandas_df_by_chunks(sql, parameters, chunksize=chunksize, **kwargs) - elif df_type == "polars": + if df_type == "polars": return self._get_polars_df_by_chunks(sql, parameters, chunksize=chunksize, **kwargs) def _get_pandas_df_by_chunks( @@ -564,14 +564,12 @@ def _get_polars_df_by_chunks( ) with closing(self.get_conn()) as conn: - yield from pl.read_database( - sql, - connection=conn, - execute_options=parameters, - iter_batches=True, - batch_size=chunksize, - **kwargs, - ) + execute_options = None + if parameters is not None: + if isinstance(parameters, Mapping): + execute_options = dict(parameters) + + yield from pl.read_database(sql, connection=conn, execute_options=execute_options, batch_size=chunksize, **kwargs) def get_records( self, diff --git a/providers/google/tests/system/google/cloud/life_sciences/__init__.py b/providers/google/tests/system/google/cloud/life_sciences/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/google/tests/system/google/cloud/life_sciences/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/google/tests/system/google/cloud/life_sciences/resources/__init__.py b/providers/google/tests/system/google/cloud/life_sciences/resources/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/google/tests/system/google/cloud/life_sciences/resources/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. From 838f8b02f450ee96053d4a97ebd95d49432bb741 Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Tue, 15 Apr 2025 11:24:03 +0800 Subject: [PATCH 06/11] fix: lint error in ci --- .../common/sql/src/airflow/providers/common/sql/hooks/sql.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py index 9917592c90115..739ed677085e6 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py @@ -569,7 +569,9 @@ def _get_polars_df_by_chunks( if isinstance(parameters, Mapping): execute_options = dict(parameters) - yield from pl.read_database(sql, connection=conn, execute_options=execute_options, batch_size=chunksize, **kwargs) + yield from pl.read_database( + sql, connection=conn, execute_options=execute_options, batch_size=chunksize, **kwargs + ) def get_records( self, From 151798ec3878b9682c39fb79d5315ed68e546c3a Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Tue, 15 Apr 2025 14:53:29 +0800 Subject: [PATCH 07/11] remove: unused file --- .../google/cloud/life_sciences/__init__.py | 16 ---------------- .../cloud/life_sciences/resources/__init__.py | 16 ---------------- 2 files changed, 32 deletions(-) delete mode 100644 providers/google/tests/system/google/cloud/life_sciences/__init__.py delete mode 100644 providers/google/tests/system/google/cloud/life_sciences/resources/__init__.py diff --git a/providers/google/tests/system/google/cloud/life_sciences/__init__.py b/providers/google/tests/system/google/cloud/life_sciences/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/providers/google/tests/system/google/cloud/life_sciences/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/providers/google/tests/system/google/cloud/life_sciences/resources/__init__.py b/providers/google/tests/system/google/cloud/life_sciences/resources/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/providers/google/tests/system/google/cloud/life_sciences/resources/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. From 4f095caaa6ecf0e920db3e51abfa59baf93887ff Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Tue, 15 Apr 2025 15:22:48 +0800 Subject: [PATCH 08/11] docs: add docs in reference --- providers/common/sql/docs/dataframes.rst | 67 ++++++++++++++++++++++++ providers/common/sql/docs/index.rst | 1 + 2 files changed, 68 insertions(+) create mode 100644 providers/common/sql/docs/dataframes.rst diff --git a/providers/common/sql/docs/dataframes.rst b/providers/common/sql/docs/dataframes.rst new file mode 100644 index 0000000000000..e8a8c2b08d98d --- /dev/null +++ b/providers/common/sql/docs/dataframes.rst @@ -0,0 +1,67 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +SQL Data Frames Integration +============================== + +The :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook` provides built-in integration with popular data analysis frameworks, allowing you to directly query databases and retrieve results as either Pandas or Polars dataframes. This integration simplifies data workflows by eliminating the need for manual conversion between SQL query results and data frames. + +Pandas Integration +---------------- + +`Pandas `_ is a widely used data analysis and manipulation library. The SQL hook allows you to directly retrieve query results as Pandas DataFrames, which is particularly useful for further data transformation, analysis, or visualization within your Airflow tasks. + +.. code-block:: python + + # Get complete DataFrame in a single operation + df = hook.get_df( + sql="SELECT * FROM my_table WHERE date_column >= %s", parameters=["2023-01-01"], df_type="pandas" + ) + + # Get DataFrame in chunks for memory-efficient processing of large results + for chunk_df in hook.get_df_by_chunks(sql="SELECT * FROM large_table", chunksize=10000, df_type="pandas"): + process_chunk(chunk_df) + +To use this feature, install the necessary dependencies: + +.. code-block:: bash + + pip install 'apache-airflow-providers-common-sql[pandas]' + +Polars Integration +---------------- + +`Polars `_ is a modern, high-performance DataFrame library implemented in Rust with Python bindings. It's designed for speed and efficiency when working with large datasets. The SQL hook supports retrieving data directly as Polars DataFrames, which can be particularly beneficial for performance-critical data processing tasks. + +.. code-block:: python + + # Get complete DataFrame in a single operation + df = hook.get_df( + sql="SELECT * FROM my_table WHERE date_column >= %s", + parameters={"date_column": "2023-01-01"}, + df_type="polars", + ) + + # Get DataFrame in chunks for memory-efficient processing of large results + for chunk_df in hook.get_df_by_chunks(sql="SELECT * FROM large_table", chunksize=10000, df_type="polars"): + process_chunk(chunk_df) + +To use this feature, install the necessary dependencies: + +.. code-block:: bash + + pip install 'apache-airflow-providers-common-sql[polars]' diff --git a/providers/common/sql/docs/index.rst b/providers/common/sql/docs/index.rst index 5fce8437a02c8..e4debfecbd477 100644 --- a/providers/common/sql/docs/index.rst +++ b/providers/common/sql/docs/index.rst @@ -44,6 +44,7 @@ Python API <_api/airflow/providers/common/sql/index> Supported Database Types Dialects + Data Frames .. toctree:: :hidden: From 419f8ae833ed2fc70618e89ae1bc2bf2aa8023a5 Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Tue, 15 Apr 2025 15:38:05 +0800 Subject: [PATCH 09/11] docs: redirect extra installation to core --- providers/common/sql/docs/dataframes.rst | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/providers/common/sql/docs/dataframes.rst b/providers/common/sql/docs/dataframes.rst index e8a8c2b08d98d..3c7e6c9a276ce 100644 --- a/providers/common/sql/docs/dataframes.rst +++ b/providers/common/sql/docs/dataframes.rst @@ -36,11 +36,7 @@ Pandas Integration for chunk_df in hook.get_df_by_chunks(sql="SELECT * FROM large_table", chunksize=10000, df_type="pandas"): process_chunk(chunk_df) -To use this feature, install the necessary dependencies: - -.. code-block:: bash - - pip install 'apache-airflow-providers-common-sql[pandas]' +To use this feature, install the ``pandas`` extra when installing this provider package. For installation instructions, see :ref:`installing-from-pypi-managing-providers-separately-from-airflow-core`. Polars Integration ---------------- @@ -60,8 +56,4 @@ Polars Integration for chunk_df in hook.get_df_by_chunks(sql="SELECT * FROM large_table", chunksize=10000, df_type="polars"): process_chunk(chunk_df) -To use this feature, install the necessary dependencies: - -.. code-block:: bash - - pip install 'apache-airflow-providers-common-sql[polars]' +To use this feature, install the ``polars`` extra when installing this provider package. For installation instructions, see :ref:`installing-from-pypi-managing-providers-separately-from-airflow-core`. From bb7405fdb7202c3eef6c69f2e812ac26ef4570a0 Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Tue, 15 Apr 2025 16:11:43 +0800 Subject: [PATCH 10/11] ci: fix underline length --- providers/common/sql/docs/dataframes.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/common/sql/docs/dataframes.rst b/providers/common/sql/docs/dataframes.rst index 3c7e6c9a276ce..81d6870ac171f 100644 --- a/providers/common/sql/docs/dataframes.rst +++ b/providers/common/sql/docs/dataframes.rst @@ -21,7 +21,7 @@ SQL Data Frames Integration The :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook` provides built-in integration with popular data analysis frameworks, allowing you to directly query databases and retrieve results as either Pandas or Polars dataframes. This integration simplifies data workflows by eliminating the need for manual conversion between SQL query results and data frames. Pandas Integration ----------------- +-------------------------- `Pandas `_ is a widely used data analysis and manipulation library. The SQL hook allows you to directly retrieve query results as Pandas DataFrames, which is particularly useful for further data transformation, analysis, or visualization within your Airflow tasks. @@ -39,7 +39,7 @@ Pandas Integration To use this feature, install the ``pandas`` extra when installing this provider package. For installation instructions, see :ref:`installing-from-pypi-managing-providers-separately-from-airflow-core`. Polars Integration ----------------- +-------------------------- `Polars `_ is a modern, high-performance DataFrame library implemented in Rust with Python bindings. It's designed for speed and efficiency when working with large datasets. The SQL hook supports retrieving data directly as Polars DataFrames, which can be particularly beneficial for performance-critical data processing tasks. From ae651a84e958b0d0ae63ccc8e6fad2b67c69975f Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Tue, 15 Apr 2025 17:09:19 +0800 Subject: [PATCH 11/11] docs: update link --- providers/common/sql/docs/dataframes.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/common/sql/docs/dataframes.rst b/providers/common/sql/docs/dataframes.rst index 81d6870ac171f..8033bf2d9c007 100644 --- a/providers/common/sql/docs/dataframes.rst +++ b/providers/common/sql/docs/dataframes.rst @@ -36,7 +36,7 @@ Pandas Integration for chunk_df in hook.get_df_by_chunks(sql="SELECT * FROM large_table", chunksize=10000, df_type="pandas"): process_chunk(chunk_df) -To use this feature, install the ``pandas`` extra when installing this provider package. For installation instructions, see :ref:`installing-from-pypi-managing-providers-separately-from-airflow-core`. +To use this feature, install the ``pandas`` extra when installing this provider package. For installation instructions, see . Polars Integration -------------------------- @@ -56,4 +56,4 @@ Polars Integration for chunk_df in hook.get_df_by_chunks(sql="SELECT * FROM large_table", chunksize=10000, df_type="polars"): process_chunk(chunk_df) -To use this feature, install the ``polars`` extra when installing this provider package. For installation instructions, see :ref:`installing-from-pypi-managing-providers-separately-from-airflow-core`. +To use this feature, install the ``polars`` extra when installing this provider package. For installation instructions, see .