Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions airflow/providers/common/sql/hooks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,26 @@ def get_pandas_df(self, sql, parameters: Iterable | Mapping[str, Any] | None = N
with closing(self.get_conn()) as conn:
return psql.read_sql(sql, con=conn, params=parameters, **kwargs)

def get_polars_df(self, sql, **kwargs):
"""
Executes the sql and returns a polars dataframe.

:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:param parameters: The parameters to render the SQL query with.
:param kwargs: (optional) passed into polars.read_database method
"""
try:
import polars as pl
except ImportError:
raise Exception(
"polars library not installed, run: pip install "
"'apache-airflow-providers-common-sql[polars]'."
)

with closing(self.get_conn()) as conn:
return pl.read_database(sql, connection=conn, **kwargs)
Comment on lines +221 to +239
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how for polaris, but for pandas the same method is broken. Let me explain:

  1. Pandas expect to get SQLAlchemy connection rather than DBAPI with only one exception for SQLite
  2. get_sqlalchemy_engine is also broken in some cases because get_url is broken, see discussion in dev list: https://lists.apache.org/thread/8rhmz3qh30hvkondct4sfmgk4vd07mn5

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is not a case for polaris I'm not familiar with this lib

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the issue it was suggested to have get_df (generic function) that accept as parameter if it should be pandas/polars - why did you choose not to have it eventually?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was suggest that we should not merge something in single implementation especially if no one know how pandas and polaris interface compatible. I just worried that if we merge it now, than it could turned into the something like BackfillJobRunner._backfill_job_runner.py which literally have 170 different statements.

And in additional pandas is broken for at least half implementations: #34679 (comment), so my vision that we should fix it first before we could create some generic method

Copy link
Author

@bfeif bfeif Sep 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Taragolis the polars doc on read_database() states the following:

This function supports a wide range of native database drivers (ranging from local databases such as SQLite to large cloud databases such as Snowflake), as well as generic libraries such as ADBC, SQLAlchemy and various flavours of ODBC.

... and the connection object eventually gets passed in the constructor to the polars ConnectionExecutor here. Does this answer your question?

Copy link
Author

@bfeif bfeif Sep 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the issue it was suggested to have get_df (generic function) that accept as parameter if it should be pandas/polars - why did you choose not to have it eventually?

@eladkal you are right (link to suggestion in issue), this was suggested.

I made this decision because the APIs of polars and pandas are quite syntactically different for the actual querying, so the get_df generic function would have just been a big switch statement, which seemed to me to add no value (this syntactic difference is larger on the BigQueryHook than on the DbApiHook, I do admit).

Furthermore, I'm not sure why we'd want to add get_df without also removing (or at least removing exposure of) the underlying methods get_polars_df and get_pandas_df; having get_df, get_polars_df, and get_pandas. Doing this removal, however, would be a breaking change for a lot of users' code.

It seemed to me like such a get_df redefinition may fall better under the scope of a refactoring ticket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to remove anything. We can deprecate and raise deprecation warning.
For such deprecation we probably will give very long time before actually removing it

Copy link
Author

@bfeif bfeif Oct 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still seems to me to be over-engineering, at least for now @eladkal.


def get_pandas_df_by_chunks(
self, sql, parameters: Iterable | Mapping[str, Any] | None = None, *, chunksize: int | None, **kwargs
):
Expand Down
32 changes: 32 additions & 0 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@
QueryJob,
SchemaField,
UnknownJob,
QueryJobConfig,
)
from google.cloud.bigquery.dataset import AccessEntry, Dataset, DatasetListItem, DatasetReference
from google.cloud.bigquery.table import EncryptionConfiguration, Row, RowIterator, Table, TableReference
from google.cloud.exceptions import NotFound
from googleapiclient.discovery import Resource, build
from pandas_gbq import read_gbq
from pandas_gbq.gbq import GbqConnector # noqa
import polars as pl
from requests import Session
from sqlalchemy import create_engine

Expand Down Expand Up @@ -270,6 +272,36 @@ def get_pandas_df(

return read_gbq(sql, project_id=project_id, dialect=dialect, credentials=credentials, **kwargs)

def get_polars_df(
self,
sql: str,
parameters: Iterable | Mapping[str, Any] | None = None,
dialect: str | None = None,
**kwargs,
) -> pd.DataFrame:
"""Get a Polars DataFrame for the BigQuery results.

:param sql: The BigQuery SQL to execute.
:param parameters: The parameters to render the SQL query with (not
used, leave to override superclass method)
:param dialect: Dialect of BigQuery SQL – legacy SQL or standard SQL
defaults to use `self.use_legacy_sql` if not specified
:param kwargs: (optional) passed into polars.from_arrow method
"""
if dialect is None:
dialect = "legacy" if self.use_legacy_sql else "standard"

project_id = self.get_project_id()
client = self.get_client(project_id=project_id)

job_config = QueryJobConfig(dialect=dialect) # Specify the SQL dialect here
query_job = client.query(sql, job_config=job_config) # API request
rows = query_job.result() # Waits for query to finish

df = pl.from_arrow(rows.to_arrow(), **kwargs)

return df

@GoogleBaseHook.fallback_to_default_project_id
def table_exists(self, dataset_id: str, table_id: str, project_id: str) -> bool:
"""Check if a table exists in Google BigQuery.
Expand Down
5 changes: 5 additions & 0 deletions airflow/providers/google/common/hooks/base_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ def get_credentials(self) -> google.auth.credentials.Credentials:
credentials, _ = self.get_credentials_and_project_id()
return credentials

def get_project_id(self) -> str:
"""Returns the project_id str for Google API."""
_, project_id = self.get_credentials_and_project_id()
return project_id

def _get_access_token(self) -> str:
"""Returns a valid access token from Google API Credentials."""
credentials = self.get_credentials()
Expand Down
3 changes: 3 additions & 0 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,9 @@ additional-extras:
- name: amazon
dependencies:
- apache-airflow-providers-amazon>=2.6.0
- name: polars
dependencies:
- polars>=0.19.5

secrets-backends:
- airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend
Expand Down