Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,7 @@ podName
PodSpec
podSpec
podspec
polars
poller
polyfill
pooler
Expand Down
59 changes: 59 additions & 0 deletions providers/common/sql/docs/dataframes.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

SQL Data Frames Integration
==============================

The :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook` provides built-in integration with popular data analysis frameworks, allowing you to directly query databases and retrieve results as either Pandas or Polars dataframes. This integration simplifies data workflows by eliminating the need for manual conversion between SQL query results and data frames.

Pandas Integration
--------------------------

`Pandas <https://pandas.pydata.org/>`_ is a widely used data analysis and manipulation library. The SQL hook allows you to directly retrieve query results as Pandas DataFrames, which is particularly useful for further data transformation, analysis, or visualization within your Airflow tasks.

.. code-block:: python

# Get complete DataFrame in a single operation
df = hook.get_df(
sql="SELECT * FROM my_table WHERE date_column >= %s", parameters=["2023-01-01"], df_type="pandas"
)

# Get DataFrame in chunks for memory-efficient processing of large results
for chunk_df in hook.get_df_by_chunks(sql="SELECT * FROM large_table", chunksize=10000, df_type="pandas"):
process_chunk(chunk_df)

To use this feature, install the ``pandas`` extra when installing this provider package. For installation instructions, see <index>.

Polars Integration
--------------------------

`Polars <https://pola.rs/>`_ is a modern, high-performance DataFrame library implemented in Rust with Python bindings. It's designed for speed and efficiency when working with large datasets. The SQL hook supports retrieving data directly as Polars DataFrames, which can be particularly beneficial for performance-critical data processing tasks.

.. code-block:: python

# Get complete DataFrame in a single operation
df = hook.get_df(
sql="SELECT * FROM my_table WHERE date_column >= %s",
parameters={"date_column": "2023-01-01"},
df_type="polars",
)

# Get DataFrame in chunks for memory-efficient processing of large results
for chunk_df in hook.get_df_by_chunks(sql="SELECT * FROM large_table", chunksize=10000, df_type="polars"):
process_chunk(chunk_df)

To use this feature, install the ``polars`` extra when installing this provider package. For installation instructions, see <index>.
1 change: 1 addition & 0 deletions providers/common/sql/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
Python API <_api/airflow/providers/common/sql/index>
Supported Database Types </supported-database-types>
Dialects <dialects>
Data Frames <dataframes>

.. toctree::
:hidden:
Expand Down
4 changes: 4 additions & 0 deletions providers/common/sql/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ dependencies = [
"openlineage" = [
"apache-airflow-providers-openlineage"
]
"polars" = [
"polars>=1.26.0"
]

[dependency-groups]
dev = [
Expand All @@ -87,6 +90,7 @@ dev = [
"apache-airflow-providers-openlineage",
# Additional devel dependencies (do not remove this line and add extra development dependencies)
"apache-airflow-providers-common-sql[pandas]",
"apache-airflow-providers-common-sql[polars]",
"apache-airflow-providers-mysql",
"apache-airflow-providers-postgres",
"apache-airflow-providers-odbc",
Expand Down
151 changes: 149 additions & 2 deletions providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,19 @@
from urllib.parse import urlparse

import sqlparse
from deprecated import deprecated
from methodtools import lru_cache
from more_itertools import chunked
from sqlalchemy import create_engine
from sqlalchemy.engine import Inspector, make_url
from sqlalchemy.exc import ArgumentError, NoSuchModuleError
from typing_extensions import Literal

from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
AirflowOptionalProviderFeatureException,
AirflowProviderDeprecationWarning,
)
from airflow.hooks.base import BaseHook
from airflow.providers.common.sql.dialects.dialect import Dialect
Expand All @@ -52,6 +55,7 @@

if TYPE_CHECKING:
from pandas import DataFrame
from polars import DataFrame as PolarsDataFrame
from sqlalchemy.engine import URL

from airflow.models import Connection
Expand Down Expand Up @@ -375,6 +379,11 @@ def get_reserved_words(self, dialect_name: str) -> set[str]:
self.log.debug("reserved words for '%s': %s", dialect_name, result)
return result

@deprecated(
reason="Replaced by function `get_df`.",
category=AirflowProviderDeprecationWarning,
action="ignore",
)
def get_pandas_df(
self,
sql,
Expand All @@ -384,6 +393,57 @@ def get_pandas_df(
"""
Execute the sql and returns a pandas dataframe.

:param sql: the sql statement to be executed (str) or a list of sql statements to execute
:param parameters: The parameters to render the SQL query with.
:param kwargs: (optional) passed into pandas.io.sql.read_sql method
"""
return self._get_pandas_df(sql, parameters, **kwargs)

@deprecated(
reason="Replaced by function `get_df_by_chunks`.",
category=AirflowProviderDeprecationWarning,
action="ignore",
)
def get_pandas_df_by_chunks(
self,
sql,
parameters: list | tuple | Mapping[str, Any] | None = None,
*,
chunksize: int,
**kwargs,
) -> Generator[DataFrame, None, None]:
return self._get_pandas_df_by_chunks(sql, parameters, chunksize=chunksize, **kwargs)

def get_df(
self,
sql,
parameters: list | tuple | Mapping[str, Any] | None = None,
*,
df_type: Literal["pandas", "polars"] = "pandas",
**kwargs,
) -> DataFrame | PolarsDataFrame:
"""
Execute the sql and returns a dataframe.

:param sql: the sql statement to be executed (str) or a list of sql statements to execute
:param parameters: The parameters to render the SQL query with.
:param df_type: Type of dataframe to return, either "pandas" or "polars"
:param kwargs: (optional) passed into `pandas.io.sql.read_sql` or `polars.read_database` method
"""
if df_type == "pandas":
return self._get_pandas_df(sql, parameters, **kwargs)
if df_type == "polars":
return self._get_polars_df(sql, parameters, **kwargs)

def _get_pandas_df(
self,
sql,
parameters: list | tuple | Mapping[str, Any] | None = None,
**kwargs,
) -> DataFrame:
"""
Execute the sql and returns a pandas dataframe.

:param sql: the sql statement to be executed (str) or a list of sql statements to execute
:param parameters: The parameters to render the SQL query with.
:param kwargs: (optional) passed into pandas.io.sql.read_sql method
Expand All @@ -399,7 +459,61 @@ def get_pandas_df(
with closing(self.get_conn()) as conn:
return psql.read_sql(sql, con=conn, params=parameters, **kwargs)

def get_pandas_df_by_chunks(
def _get_polars_df(
self,
sql,
parameters: list | tuple | Mapping[str, Any] | None = None,
**kwargs,
) -> PolarsDataFrame:
"""
Execute the sql and returns a polars dataframe.

:param sql: the sql statement to be executed (str) or a list of sql statements to execute
:param parameters: The parameters to render the SQL query with.
:param kwargs: (optional) passed into polars.read_database method
"""
try:
import polars as pl
except ImportError:
raise AirflowOptionalProviderFeatureException(
"polars library not installed, run: pip install "
"'apache-airflow-providers-common-sql[polars]'."
)

with closing(self.get_conn()) as conn:
execute_options: dict[str, Any] | None = None
if parameters is not None:
if isinstance(parameters, Mapping):
execute_options = dict(parameters)
else:
execute_options = {}

return pl.read_database(sql, connection=conn, execute_options=execute_options, **kwargs)

def get_df_by_chunks(
self,
sql,
parameters: list | tuple | Mapping[str, Any] | None = None,
*,
chunksize: int,
df_type: Literal["pandas", "polars"] = "pandas",
**kwargs,
) -> Generator[DataFrame | PolarsDataFrame, None, None]:
"""
Execute the sql and return a generator.

:param sql: the sql statement to be executed (str) or a list of sql statements to execute
:param parameters: The parameters to render the SQL query with
:param chunksize: number of rows to include in each chunk
:param df_type: Type of dataframe to return, either "pandas" or "polars"
:param kwargs: (optional) passed into `pandas.io.sql.read_sql` or `polars.read_database` method
"""
if df_type == "pandas":
return self._get_pandas_df_by_chunks(sql, parameters, chunksize=chunksize, **kwargs)
if df_type == "polars":
return self._get_polars_df_by_chunks(sql, parameters, chunksize=chunksize, **kwargs)

def _get_pandas_df_by_chunks(
self,
sql,
parameters: list | tuple | Mapping[str, Any] | None = None,
Expand All @@ -422,10 +536,43 @@ def get_pandas_df_by_chunks(
"pandas library not installed, run: pip install "
"'apache-airflow-providers-common-sql[pandas]'."
)

with closing(self.get_conn()) as conn:
yield from psql.read_sql(sql, con=conn, params=parameters, chunksize=chunksize, **kwargs)

def _get_polars_df_by_chunks(
self,
sql,
parameters: list | tuple | Mapping[str, Any] | None = None,
*,
chunksize: int,
**kwargs,
) -> Generator[PolarsDataFrame, None, None]:
"""
Execute the sql and return a generator.

:param sql: the sql statement to be executed (str) or a list of sql statements to execute
:param parameters: The parameters to render the SQL query with.
:param chunksize: number of rows to include in each chunk
:param kwargs: (optional) passed into pandas.io.sql.read_sql method
"""
try:
import polars as pl
except ImportError:
raise AirflowOptionalProviderFeatureException(
"polars library not installed, run: pip install "
"'apache-airflow-providers-common-sql[polars]'."
)

with closing(self.get_conn()) as conn:
execute_options = None
if parameters is not None:
if isinstance(parameters, Mapping):
execute_options = dict(parameters)

yield from pl.read_database(
sql, connection=conn, execute_options=execute_options, batch_size=chunksize, **kwargs
)

def get_records(
self,
sql: str | list[str],
Expand Down