diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index afc4a159ce315..ddb7fdfdb51ba 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1305,6 +1305,7 @@ podName
PodSpec
podSpec
podspec
+polars
poller
polyfill
pooler
diff --git a/providers/common/sql/docs/dataframes.rst b/providers/common/sql/docs/dataframes.rst
new file mode 100644
index 0000000000000..8033bf2d9c007
--- /dev/null
+++ b/providers/common/sql/docs/dataframes.rst
@@ -0,0 +1,59 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+SQL Data Frames Integration
+==============================
+
+The :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook` provides built-in integration with popular data analysis frameworks, allowing you to directly query databases and retrieve results as either Pandas or Polars dataframes. This integration simplifies data workflows by eliminating the need for manual conversion between SQL query results and data frames.
+
+Pandas Integration
+--------------------------
+
+`Pandas `_ is a widely used data analysis and manipulation library. The SQL hook allows you to directly retrieve query results as Pandas DataFrames, which is particularly useful for further data transformation, analysis, or visualization within your Airflow tasks.
+
+.. code-block:: python
+
+ # Get complete DataFrame in a single operation
+ df = hook.get_df(
+ sql="SELECT * FROM my_table WHERE date_column >= %s", parameters=["2023-01-01"], df_type="pandas"
+ )
+
+ # Get DataFrame in chunks for memory-efficient processing of large results
+ for chunk_df in hook.get_df_by_chunks(sql="SELECT * FROM large_table", chunksize=10000, df_type="pandas"):
+ process_chunk(chunk_df)
+
+To use this feature, install the ``pandas`` extra when installing this provider package. For installation instructions, see .
+
+Polars Integration
+--------------------------
+
+`Polars `_ is a modern, high-performance DataFrame library implemented in Rust with Python bindings. It's designed for speed and efficiency when working with large datasets. The SQL hook supports retrieving data directly as Polars DataFrames, which can be particularly beneficial for performance-critical data processing tasks.
+
+.. code-block:: python
+
+ # Get complete DataFrame in a single operation
+ df = hook.get_df(
+ sql="SELECT * FROM my_table WHERE date_column >= %s",
+ parameters={"date_column": "2023-01-01"},
+ df_type="polars",
+ )
+
+ # Get DataFrame in chunks for memory-efficient processing of large results
+ for chunk_df in hook.get_df_by_chunks(sql="SELECT * FROM large_table", chunksize=10000, df_type="polars"):
+ process_chunk(chunk_df)
+
+To use this feature, install the ``polars`` extra when installing this provider package. For installation instructions, see .
diff --git a/providers/common/sql/docs/index.rst b/providers/common/sql/docs/index.rst
index 5fce8437a02c8..e4debfecbd477 100644
--- a/providers/common/sql/docs/index.rst
+++ b/providers/common/sql/docs/index.rst
@@ -44,6 +44,7 @@
Python API <_api/airflow/providers/common/sql/index>
Supported Database Types
Dialects
+ Data Frames
.. toctree::
:hidden:
diff --git a/providers/common/sql/pyproject.toml b/providers/common/sql/pyproject.toml
index 1e458271ab386..78df7c0eac931 100644
--- a/providers/common/sql/pyproject.toml
+++ b/providers/common/sql/pyproject.toml
@@ -78,6 +78,9 @@ dependencies = [
"openlineage" = [
"apache-airflow-providers-openlineage"
]
+"polars" = [
+ "polars>=1.26.0"
+]
[dependency-groups]
dev = [
@@ -87,6 +90,7 @@ dev = [
"apache-airflow-providers-openlineage",
# Additional devel dependencies (do not remove this line and add extra development dependencies)
"apache-airflow-providers-common-sql[pandas]",
+ "apache-airflow-providers-common-sql[polars]",
"apache-airflow-providers-mysql",
"apache-airflow-providers-postgres",
"apache-airflow-providers-odbc",
diff --git a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
index 2804038e87b42..739ed677085e6 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
@@ -34,16 +34,19 @@
from urllib.parse import urlparse
import sqlparse
+from deprecated import deprecated
from methodtools import lru_cache
from more_itertools import chunked
from sqlalchemy import create_engine
from sqlalchemy.engine import Inspector, make_url
from sqlalchemy.exc import ArgumentError, NoSuchModuleError
+from typing_extensions import Literal
from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
AirflowOptionalProviderFeatureException,
+ AirflowProviderDeprecationWarning,
)
from airflow.hooks.base import BaseHook
from airflow.providers.common.sql.dialects.dialect import Dialect
@@ -52,6 +55,7 @@
if TYPE_CHECKING:
from pandas import DataFrame
+ from polars import DataFrame as PolarsDataFrame
from sqlalchemy.engine import URL
from airflow.models import Connection
@@ -375,6 +379,11 @@ def get_reserved_words(self, dialect_name: str) -> set[str]:
self.log.debug("reserved words for '%s': %s", dialect_name, result)
return result
+ @deprecated(
+ reason="Replaced by function `get_df`.",
+ category=AirflowProviderDeprecationWarning,
+ action="ignore",
+ )
def get_pandas_df(
self,
sql,
@@ -384,6 +393,57 @@ def get_pandas_df(
"""
Execute the sql and returns a pandas dataframe.
+ :param sql: the sql statement to be executed (str) or a list of sql statements to execute
+ :param parameters: The parameters to render the SQL query with.
+ :param kwargs: (optional) passed into pandas.io.sql.read_sql method
+ """
+ return self._get_pandas_df(sql, parameters, **kwargs)
+
+ @deprecated(
+ reason="Replaced by function `get_df_by_chunks`.",
+ category=AirflowProviderDeprecationWarning,
+ action="ignore",
+ )
+ def get_pandas_df_by_chunks(
+ self,
+ sql,
+ parameters: list | tuple | Mapping[str, Any] | None = None,
+ *,
+ chunksize: int,
+ **kwargs,
+ ) -> Generator[DataFrame, None, None]:
+ return self._get_pandas_df_by_chunks(sql, parameters, chunksize=chunksize, **kwargs)
+
+ def get_df(
+ self,
+ sql,
+ parameters: list | tuple | Mapping[str, Any] | None = None,
+ *,
+ df_type: Literal["pandas", "polars"] = "pandas",
+ **kwargs,
+ ) -> DataFrame | PolarsDataFrame:
+ """
+ Execute the sql and returns a dataframe.
+
+ :param sql: the sql statement to be executed (str) or a list of sql statements to execute
+ :param parameters: The parameters to render the SQL query with.
+ :param df_type: Type of dataframe to return, either "pandas" or "polars"
+ :param kwargs: (optional) passed into `pandas.io.sql.read_sql` or `polars.read_database` method
+ """
+ if df_type == "pandas":
+ return self._get_pandas_df(sql, parameters, **kwargs)
+ if df_type == "polars":
+ return self._get_polars_df(sql, parameters, **kwargs)
+
+ def _get_pandas_df(
+ self,
+ sql,
+ parameters: list | tuple | Mapping[str, Any] | None = None,
+ **kwargs,
+ ) -> DataFrame:
+ """
+ Execute the sql and returns a pandas dataframe.
+
:param sql: the sql statement to be executed (str) or a list of sql statements to execute
:param parameters: The parameters to render the SQL query with.
:param kwargs: (optional) passed into pandas.io.sql.read_sql method
@@ -399,7 +459,61 @@ def get_pandas_df(
with closing(self.get_conn()) as conn:
return psql.read_sql(sql, con=conn, params=parameters, **kwargs)
- def get_pandas_df_by_chunks(
+ def _get_polars_df(
+ self,
+ sql,
+ parameters: list | tuple | Mapping[str, Any] | None = None,
+ **kwargs,
+ ) -> PolarsDataFrame:
+ """
+ Execute the sql and returns a polars dataframe.
+
+ :param sql: the sql statement to be executed (str) or a list of sql statements to execute
+ :param parameters: The parameters to render the SQL query with.
+ :param kwargs: (optional) passed into polars.read_database method
+ """
+ try:
+ import polars as pl
+ except ImportError:
+ raise AirflowOptionalProviderFeatureException(
+ "polars library not installed, run: pip install "
+ "'apache-airflow-providers-common-sql[polars]'."
+ )
+
+ with closing(self.get_conn()) as conn:
+ execute_options: dict[str, Any] | None = None
+ if parameters is not None:
+ if isinstance(parameters, Mapping):
+ execute_options = dict(parameters)
+ else:
+ execute_options = {}
+
+ return pl.read_database(sql, connection=conn, execute_options=execute_options, **kwargs)
+
+ def get_df_by_chunks(
+ self,
+ sql,
+ parameters: list | tuple | Mapping[str, Any] | None = None,
+ *,
+ chunksize: int,
+ df_type: Literal["pandas", "polars"] = "pandas",
+ **kwargs,
+ ) -> Generator[DataFrame | PolarsDataFrame, None, None]:
+ """
+ Execute the sql and return a generator.
+
+ :param sql: the sql statement to be executed (str) or a list of sql statements to execute
+ :param parameters: The parameters to render the SQL query with
+ :param chunksize: number of rows to include in each chunk
+ :param df_type: Type of dataframe to return, either "pandas" or "polars"
+ :param kwargs: (optional) passed into `pandas.io.sql.read_sql` or `polars.read_database` method
+ """
+ if df_type == "pandas":
+ return self._get_pandas_df_by_chunks(sql, parameters, chunksize=chunksize, **kwargs)
+ if df_type == "polars":
+ return self._get_polars_df_by_chunks(sql, parameters, chunksize=chunksize, **kwargs)
+
+ def _get_pandas_df_by_chunks(
self,
sql,
parameters: list | tuple | Mapping[str, Any] | None = None,
@@ -422,10 +536,43 @@ def get_pandas_df_by_chunks(
"pandas library not installed, run: pip install "
"'apache-airflow-providers-common-sql[pandas]'."
)
-
with closing(self.get_conn()) as conn:
yield from psql.read_sql(sql, con=conn, params=parameters, chunksize=chunksize, **kwargs)
+ def _get_polars_df_by_chunks(
+ self,
+ sql,
+ parameters: list | tuple | Mapping[str, Any] | None = None,
+ *,
+ chunksize: int,
+ **kwargs,
+ ) -> Generator[PolarsDataFrame, None, None]:
+ """
+ Execute the sql and return a generator.
+
+ :param sql: the sql statement to be executed (str) or a list of sql statements to execute
+ :param parameters: The parameters to render the SQL query with.
+ :param chunksize: number of rows to include in each chunk
+ :param kwargs: (optional) passed into pandas.io.sql.read_sql method
+ """
+ try:
+ import polars as pl
+ except ImportError:
+ raise AirflowOptionalProviderFeatureException(
+ "polars library not installed, run: pip install "
+ "'apache-airflow-providers-common-sql[polars]'."
+ )
+
+ with closing(self.get_conn()) as conn:
+ execute_options = None
+ if parameters is not None:
+ if isinstance(parameters, Mapping):
+ execute_options = dict(parameters)
+
+ yield from pl.read_database(
+ sql, connection=conn, execute_options=execute_options, batch_size=chunksize, **kwargs
+ )
+
def get_records(
self,
sql: str | list[str],