Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize caching of connection in DbApiHook to improve performance #40751

Merged
merged 91 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 81 commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
1d00dc4
refactor: Moved connection_extra_lower from OdbcHook and JdbcHook to …
dabla Jul 12, 2024
25a122a
refactor: Fixed static checks
davidblain-infrabel Jul 12, 2024
076dac5
refactor: Removed connection_extra_lower from MsSqlHook
davidblain-infrabel Jul 12, 2024
5748c70
refactor: Added cached connection property in DbApiHook
davidblain-infrabel Jul 12, 2024
8a7d752
refactor: Fixed odbc_connection_string in OdbcHook
davidblain-infrabel Jul 12, 2024
93ef22b
refactor: Fixed static check
davidblain-infrabel Jul 12, 2024
e0085df
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 12, 2024
f5bab94
refactor: Fixed import of Connection
davidblain-infrabel Jul 12, 2024
8b1b07f
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 12, 2024
d99353f
refactor: Refactored connection on PostgresHook
davidblain-infrabel Jul 13, 2024
dd6c74f
refactor: Refactored Connection property so logic is generic for Post…
davidblain-infrabel Jul 13, 2024
568220e
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 13, 2024
3de038c
refactor: Removed assignment of connection in MySqlHook
davidblain-infrabel Jul 13, 2024
3a69726
refactor: Fixed import elasticsearch
davidblain-infrabel Jul 13, 2024
9700377
refactor: Removed _placeholder and _connection from sql.pyi
davidblain-infrabel Jul 13, 2024
3e255e2
refactor: Reorganized imports in sql.pyi
davidblain-infrabel Jul 13, 2024
392d66d
refactor: Re-added import of MySQLdbConnection in MySQLHook
davidblain-infrabel Jul 15, 2024
8c0793d
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 15, 2024
8572c73
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 15, 2024
4cdcac5
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 15, 2024
c44e860
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 15, 2024
01c5710
refactor: Try to fix AttributeError: type object 'SkipDBTestsSession'…
davidblain-infrabel Jul 15, 2024
4e499f4
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 16, 2024
739c173
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 16, 2024
b3dcb46
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 17, 2024
a85392e
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 17, 2024
792aa48
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 17, 2024
5257ab9
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 18, 2024
d84a0e2
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 18, 2024
6dbdc5b
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 18, 2024
910398a
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 18, 2024
0a66005
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 19, 2024
7617284
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 19, 2024
5c8720b
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 19, 2024
2a0d222
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 20, 2024
63a8e75
refactor: Updated dependency to common sql 1.14.2
davidblain-infrabel Jul 30, 2024
945ec47
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 30, 2024
944b1ad
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 30, 2024
10246c8
refactor: Updated provider dependencies
dabla Jul 30, 2024
a5431d7
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 30, 2024
3cf6017
refactor: Fixed assertion of common sql pacakge version testing
davidblain-infrabel Jul 31, 2024
2a19dd9
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 31, 2024
16f60bb
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 31, 2024
843c339
Merge branch 'main' into fix/cache-connection-extra
dabla Jul 31, 2024
f70fa49
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 1, 2024
03bfe96
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 1, 2024
032c8ee
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 1, 2024
97543d5
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 1, 2024
6bb4d87
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 1, 2024
c1d811f
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 2, 2024
614e557
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 2, 2024
97255a5
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 4, 2024
0ae6540
refactor: Updated sql common version in providers
davidblain-infrabel Aug 4, 2024
20bf513
refactor: updated provider dependencies
dabla Aug 4, 2024
9fc51e4
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 4, 2024
b1b16da
refactor: Use cached connection property in OracleHook
davidblain-infrabel Aug 4, 2024
0ce1fec
refactor: Removed deepcopy from connection
davidblain-infrabel Aug 4, 2024
28e0774
refactor: Updated docstring connection_extra_lower
davidblain-infrabel Aug 4, 2024
91ead4a
refactor: Reformatted OracleHook
davidblain-infrabel Aug 5, 2024
0a17b4a
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 5, 2024
e5a9b76
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 7, 2024
88f287e
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 7, 2024
82646ea
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 7, 2024
343d42b
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 8, 2024
6582334
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 8, 2024
bb23093
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 8, 2024
2e9c882
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 9, 2024
55935d4
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 13, 2024
7fc39c1
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 13, 2024
67e9b74
refactor: Connection in PostgresHook must be deep copied otherwise te…
davidblain-infrabel Aug 13, 2024
7f09732
refactor: Refactored OracleHook as before
davidblain-infrabel Aug 13, 2024
a7350ec
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 13, 2024
d6ba693
refactor: updated provider dependencies
dabla Aug 14, 2024
2d4cdf1
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 14, 2024
6fdefa5
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 14, 2024
2e8d1ce
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 14, 2024
ebeb439
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 14, 2024
025483d
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 14, 2024
9bcf105
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 20, 2024
ac01418
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 22, 2024
e810c71
Merge branch 'main' into fix/cache-connection-extra
dabla Aug 23, 2024
b5bc0ce
refactor: Updated sql common provider version
Sep 2, 2024
2672b24
Merge branch 'main' into fix/cache-connection-extra
dabla Sep 2, 2024
c9413f3
refactor: updated provider dependencies json file
dabla Sep 2, 2024
868cf4e
refactor: Updated common sql provider version to be tested in breeze
davidblain-infrabel Sep 2, 2024
22c1b79
Merge branch 'main' into fix/cache-connection-extra
dabla Sep 2, 2024
7b2b4f2
Merge branch 'main' into fix/cache-connection-extra
dabla Sep 3, 2024
adaca5e
refactor: Bumped version to 1.17.0 of common sql provider
davidblain-infrabel Sep 3, 2024
2c2ed77
refactor: Added explanation what has changed in common sql provider f…
davidblain-infrabel Sep 3, 2024
81d554e
Merge branch 'main' into fix/cache-connection-extra
dabla Sep 3, 2024
b69074a
Merge branch 'main' into fix/cache-connection-extra
dabla Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions airflow/providers/common/sql/hooks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from pandas import DataFrame
from sqlalchemy.engine import URL

from airflow.models import Connection
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.sqlparser import DatabaseInfo

Expand Down Expand Up @@ -183,14 +184,14 @@ def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwa
self._replace_statement_format: str = kwargs.get(
"replace_statement_format", "REPLACE INTO {} {} VALUES ({})"
)
self._connection: Connection | None = kwargs.pop("connection", None)

def get_conn_id(self) -> str:
return getattr(self, self.conn_name_attr)

@cached_property
def placeholder(self):
conn = self.get_connection(self.get_conn_id())
placeholder = conn.extra_dejson.get("placeholder")
placeholder = self.connection_extra.get("placeholder")
if placeholder:
if placeholder in SQL_PLACEHOLDERS:
return placeholder
Expand All @@ -203,9 +204,28 @@ def placeholder(self):
)
return self._placeholder

@property
def connection(self) -> Connection:
if self._connection is None:
self._connection = self.get_connection(self.get_conn_id())
return self._connection

@property
def connection_extra(self) -> dict:
return self.connection.extra_dejson

@cached_property
def connection_extra_lower(self) -> dict:
"""
``connection.extra_dejson`` but where keys are converted to lower case.

This is used internally for case-insensitive access of extra params.
"""
return {k.lower(): v for k, v in self.connection_extra.items()}

def get_conn(self):
"""Return a connection object."""
db = self.get_connection(self.get_conn_id())
db = self.connection
return self.connector.connect(host=db.host, port=db.port, username=db.login, schema=db.schema)

def get_uri(self) -> str:
Expand Down
7 changes: 7 additions & 0 deletions airflow/providers/common/sql/hooks/sql.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ from airflow.exceptions import (
AirflowProviderDeprecationWarning as AirflowProviderDeprecationWarning,
)
from airflow.hooks.base import BaseHook as BaseHook
from airflow.models import Connection as Connection
from airflow.providers.openlineage.extractors import OperatorLineage as OperatorLineage
from airflow.providers.openlineage.sqlparser import DatabaseInfo as DatabaseInfo
from functools import cached_property as cached_property
Expand Down Expand Up @@ -67,6 +68,12 @@ class DbApiHook(BaseHook):
def get_conn_id(self) -> str: ...
@cached_property
def placeholder(self): ...
@property
def connection(self) -> Connection: ...
@property
def connection_extra(self) -> dict: ...
@cached_property
def connection_extra_lower(self) -> dict: ...
def get_conn(self): ...
def get_uri(self) -> str: ...
@property
Expand Down
8 changes: 3 additions & 5 deletions airflow/providers/elasticsearch/hooks/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,11 @@ class ElasticsearchSQLHook(DbApiHook):
def __init__(self, schema: str = "http", connection: AirflowConnection | None = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.schema = schema
self.connection = connection
self._connection = connection

def get_conn(self) -> ESConnection:
"""Return an elasticsearch connection object."""
conn_id = self.get_conn_id()
conn = self.connection or self.get_connection(conn_id)
conn = self.connection

conn_args = {
"host": conn.host,
Expand All @@ -119,8 +118,7 @@ def get_conn(self) -> ESConnection:
return connect(**conn_args)

def get_uri(self) -> str:
conn_id = self.get_conn_id()
conn = self.connection or self.get_connection(conn_id)
conn = self.connection

login = ""
if conn.login:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/elasticsearch/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ versions:

dependencies:
- apache-airflow>=2.8.0
- apache-airflow-providers-common-sql>=1.14.1
- apache-airflow-providers-common-sql>=1.15.0
- elasticsearch>=8.10,<9

integrations:
Expand Down
10 changes: 0 additions & 10 deletions airflow/providers/jdbc/hooks/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,6 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
"relabeling": {"host": "Connection URL"},
}

@property
def connection_extra_lower(self) -> dict:
"""
``connection.extra_dejson`` but where keys are converted to lower case.

This is used internally for case-insensitive access of jdbc params.
"""
conn = self.get_connection(self.get_conn_id())
return {k.lower(): v for k, v in conn.extra_dejson.items()}

@property
def driver_path(self) -> str | None:
from airflow.configuration import conf
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/jdbc/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ versions:

dependencies:
- apache-airflow>=2.8.0
- apache-airflow-providers-common-sql>=1.14.1
- apache-airflow-providers-common-sql>=1.15.0
- jaydebeapi>=1.1.1

integrations:
Expand Down
24 changes: 1 addition & 23 deletions airflow/providers/microsoft/mssql/hooks/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@

from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Any
from typing import Any

import pymssql
from methodtools import lru_cache
from pymssql import Connection as PymssqlConnection

from airflow.providers.common.sql.hooks.sql import DbApiHook, fetch_all_handler

if TYPE_CHECKING:
from airflow.models import Connection


class MsSqlHook(DbApiHook):
"""
Expand Down Expand Up @@ -59,24 +55,6 @@ def __init__(
self.schema = kwargs.pop("schema", None)
self._sqlalchemy_scheme = sqlalchemy_scheme

@cached_property
def connection(self) -> Connection:
"""
Get the airflow connection object.

:return: The connection object.
"""
return self.get_connection(self.get_conn_id())

@property
def connection_extra_lower(self) -> dict:
"""
``connection.extra_dejson`` but where keys are converted to lower case.

This is used internally for case-insensitive access of mssql params.
"""
return {k.lower(): v for k, v in self.connection.extra_dejson.items()}

@property
def sqlalchemy_scheme(self) -> str:
"""Sqlalchemy scheme either from constructor, connection extras or default."""
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/microsoft/mssql/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ versions:

dependencies:
- apache-airflow>=2.8.0
- apache-airflow-providers-common-sql>=1.14.1
- apache-airflow-providers-common-sql>=1.15.0
- pymssql>=2.3.0
# The methodtools dependency can be removed with min airflow version >=2.9.1
# as it was added in https://github.com/apache/airflow/pull/37757
Expand Down
1 change: 0 additions & 1 deletion airflow/providers/mysql/hooks/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class MySqlHook(DbApiHook):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.schema = kwargs.pop("schema", None)
self.connection = kwargs.pop("connection", None)
self.local_infile = kwargs.pop("local_infile", False)
self.init_command = kwargs.pop("init_command", None)

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/mysql/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ versions:

dependencies:
- apache-airflow>=2.8.0
- apache-airflow-providers-common-sql>=1.14.1
- apache-airflow-providers-common-sql>=1.15.0
- mysqlclient>=1.4.0
- mysql-connector-python>=8.0.29

Expand Down
20 changes: 1 addition & 19 deletions airflow/providers/odbc/hooks/odbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ def __init__(
self._connection = None
self._connect_kwargs = connect_kwargs

@property
def connection(self):
"""The Connection object with ID ``odbc_conn_id``."""
if not self._connection:
self._connection = self.get_connection(self.get_conn_id())
return self._connection

@property
def database(self) -> str | None:
"""Database provided in init if exists; otherwise, ``schema`` from ``Connection`` object."""
Expand All @@ -99,15 +92,6 @@ def sqlalchemy_scheme(self) -> str:
raise RuntimeError("sqlalchemy_scheme in connection extra should not contain : or / characters")
return self._sqlalchemy_scheme or extra_scheme or self.DEFAULT_SQLALCHEMY_SCHEME

@property
dabla marked this conversation as resolved.
Show resolved Hide resolved
def connection_extra_lower(self) -> dict:
"""
``connection.extra_dejson`` but where keys are converted to lower case.

This is used internally for case-insensitive access of odbc params.
"""
return {k.lower(): v for k, v in self.connection.extra_dejson.items()}

@property
def driver(self) -> str | None:
"""Driver from init param if given; else try to find one in connection extra."""
Expand Down Expand Up @@ -166,9 +150,7 @@ def odbc_connection_string(self):
conn_str += f"PORT={self.connection.port};"

extra_exclude = {"driver", "dsn", "connect_kwargs", "sqlalchemy_scheme", "placeholder"}
extra_params = {
k: v for k, v in self.connection.extra_dejson.items() if k.lower() not in extra_exclude
}
extra_params = {k: v for k, v in self.connection_extra.items() if k.lower() not in extra_exclude}
for k, v in extra_params.items():
conn_str += f"{k}={v};"

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/odbc/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ versions:

dependencies:
- apache-airflow>=2.8.0
- apache-airflow-providers-common-sql>=1.14.1
- apache-airflow-providers-common-sql>=1.15.0
- pyodbc>=5.0.0

integrations:
Expand Down
4 changes: 1 addition & 3 deletions airflow/providers/postgres/hooks/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ def __init__(self, *args, options: str | None = None, **kwargs) -> None:
)
kwargs["database"] = kwargs["schema"]
super().__init__(*args, **kwargs)
self.connection: Connection | None = kwargs.pop("connection", None)
self.conn: connection = None
self.database: str | None = kwargs.pop("database", None)
self.options = options
Expand Down Expand Up @@ -142,8 +141,7 @@ def _get_cursor(self, raw_cursor: str) -> CursorType:

def get_conn(self) -> connection:
"""Establish a connection to a postgres database."""
conn_id = self.get_conn_id()
conn = deepcopy(self.connection or self.get_connection(conn_id))
conn = deepcopy(self.connection)

# check for authentication via AWS IAM
if conn.extra_dejson.get("iam", False):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/postgres/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ versions:

dependencies:
- apache-airflow>=2.8.0
- apache-airflow-providers-common-sql>=1.14.1
- apache-airflow-providers-common-sql>=1.15.0
- psycopg2-binary>=2.9.4

additional-extras:
Expand Down
4 changes: 2 additions & 2 deletions dev/breeze/tests/test_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def test_get_documentation_package_path():
"postgres",
"beta0",
"""
"apache-airflow-providers-common-sql>=1.14.1b0",
"apache-airflow-providers-common-sql>=1.15.0b0",
"apache-airflow>=2.8.0b0",
"psycopg2-binary>=2.9.4",
""",
Expand All @@ -214,7 +214,7 @@ def test_get_documentation_package_path():
"postgres",
"",
"""
"apache-airflow-providers-common-sql>=1.14.1",
"apache-airflow-providers-common-sql>=1.15.0",
"apache-airflow>=2.8.0",
"psycopg2-binary>=2.9.4",
""",
Expand Down
12 changes: 6 additions & 6 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@
},
"elasticsearch": {
"deps": [
"apache-airflow-providers-common-sql>=1.14.1",
"apache-airflow-providers-common-sql>=1.15.0",
"apache-airflow>=2.8.0",
"elasticsearch>=8.10,<9"
],
Expand Down Expand Up @@ -758,7 +758,7 @@
},
"jdbc": {
"deps": [
"apache-airflow-providers-common-sql>=1.14.1",
"apache-airflow-providers-common-sql>=1.15.0",
"apache-airflow>=2.8.0",
"jaydebeapi>=1.1.1"
],
Expand Down Expand Up @@ -822,7 +822,7 @@
},
"microsoft.mssql": {
"deps": [
"apache-airflow-providers-common-sql>=1.14.1",
"apache-airflow-providers-common-sql>=1.15.0",
"apache-airflow>=2.8.0",
"methodtools>=0.4.7",
"pymssql>=2.3.0"
Expand Down Expand Up @@ -873,7 +873,7 @@
},
"mysql": {
"deps": [
"apache-airflow-providers-common-sql>=1.14.1",
"apache-airflow-providers-common-sql>=1.15.0",
"apache-airflow>=2.8.0",
"mysql-connector-python>=8.0.29",
"mysqlclient>=1.4.0"
Expand Down Expand Up @@ -904,7 +904,7 @@
},
"odbc": {
"deps": [
"apache-airflow-providers-common-sql>=1.14.1",
"apache-airflow-providers-common-sql>=1.15.0",
"apache-airflow>=2.8.0",
"pyodbc>=5.0.0"
],
Expand Down Expand Up @@ -1048,7 +1048,7 @@
},
"postgres": {
"deps": [
"apache-airflow-providers-common-sql>=1.14.1",
"apache-airflow-providers-common-sql>=1.15.0",
"apache-airflow>=2.8.0",
"psycopg2-binary>=2.9.4"
],
Expand Down
Loading