Skip to content

Commit

Permalink
build(ingest): upgrade to sqlalchemy 1.4, drop 1.3 support (#8810)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
  • Loading branch information
mayurinehate and hsheth2 authored Sep 12, 2023
1 parent a021053 commit 303a2d0
Show file tree
Hide file tree
Showing 10 changed files with 17 additions and 106 deletions.
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
## Next

### Breaking Changes
- #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now.

### Potential Downtime

Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ task installDev(type: Exec, dependsOn: [install]) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"${venv_name}/bin/pip install -e .[dev] ${extra_pip_requirements} && " +
"./scripts/install-sqlalchemy-stubs.sh && " +
"touch ${sentinel_file}"
}

Expand All @@ -82,7 +81,6 @@ task installAll(type: Exec, dependsOn: [install]) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"${venv_name}/bin/pip install -e .[all] ${extra_pip_requirements} && " +
"./scripts/install-sqlalchemy-stubs.sh && " +
"touch ${sentinel_file}"
}

Expand Down Expand Up @@ -119,7 +117,6 @@ task lint(type: Exec, dependsOn: installDev) {
task lintFix(type: Exec, dependsOn: installDev) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"./scripts/install-sqlalchemy-stubs.sh && " +
"black src/ tests/ examples/ && " +
"isort src/ tests/ examples/ && " +
"flake8 src/ tests/ examples/ && " +
Expand Down
28 changes: 0 additions & 28 deletions metadata-ingestion/scripts/install-sqlalchemy-stubs.sh

This file was deleted.

25 changes: 10 additions & 15 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def get_long_description():

sql_common = {
# Required for all SQL sources.
"sqlalchemy>=1.3.24, <2",
# This is temporary lower bound that we're open to loosening/tightening as requirements show up
"sqlalchemy>=1.4.39, <2",
# Required for SQL profiling.
"great-expectations>=0.15.12, <=0.15.50",
# scipy version restricted to reduce backtracking, used by great-expectations,
Expand Down Expand Up @@ -172,13 +173,13 @@ def get_long_description():
}

clickhouse_common = {
# Clickhouse 0.1.8 requires SQLAlchemy 1.3.x, while the newer versions
# allow SQLAlchemy 1.4.x.
"clickhouse-sqlalchemy>=0.1.8",
# Clickhouse 0.2.0 adds support for SQLAlchemy 1.4.x
"clickhouse-sqlalchemy>=0.2.0",
}

redshift_common = {
"sqlalchemy-redshift",
# Clickhouse 0.8.3 adds support for SQLAlchemy 1.4.x
"sqlalchemy-redshift>=0.8.3",
"psycopg2-binary",
"GeoAlchemy2",
*sqllineage_lib,
Expand All @@ -188,13 +189,8 @@ def get_long_description():
snowflake_common = {
# Snowflake plugin utilizes sql common
*sql_common,
# Required for all Snowflake sources.
# See https://github.com/snowflakedb/snowflake-sqlalchemy/issues/234 for why 1.2.5 is blocked.
"snowflake-sqlalchemy>=1.2.4, !=1.2.5",
# Because of https://github.com/snowflakedb/snowflake-sqlalchemy/issues/350 we need to restrict SQLAlchemy's max version.
# Eventually we should just require snowflake-sqlalchemy>=1.4.3, but I won't do that immediately
# because it may break Airflow users that need SQLAlchemy 1.3.x.
"SQLAlchemy<1.4.42",
# https://github.com/snowflakedb/snowflake-sqlalchemy/issues/350
"snowflake-sqlalchemy>=1.4.3",
# See https://github.com/snowflakedb/snowflake-connector-python/pull/1348 for why 2.8.2 is blocked
"snowflake-connector-python!=2.8.2",
"pandas",
Expand All @@ -206,9 +202,7 @@ def get_long_description():
}

trino = {
# Trino 0.317 broke compatibility with SQLAlchemy 1.3.24.
# See https://github.com/trinodb/trino-python-client/issues/250.
"trino[sqlalchemy]>=0.308, !=0.317",
"trino[sqlalchemy]>=0.308",
}

pyhive_common = {
Expand Down Expand Up @@ -430,6 +424,7 @@ def get_long_description():
"types-Deprecated",
"types-protobuf>=4.21.0.1",
"types-tzlocal",
"sqlalchemy2-stubs",
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,7 @@ def get_aspects(
return

for i, row in enumerate(rows):
# TODO: Replace with namedtuple usage once we drop sqlalchemy 1.3
if hasattr(row, "_asdict"):
row_dict = row._asdict()
else:
row_dict = dict(row)
row_dict = row._asdict()
mcp = self._parse_row(row_dict)
if mcp:
yield mcp, row_dict["createdon"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,17 +451,10 @@ def _get_operation_aspect_work_unit(
yield wu

def _process_snowflake_history_row(
self, row: Any
self, event_dict: dict
) -> Iterable[SnowflakeJoinedAccessEvent]:
try: # big hammer try block to ensure we don't fail on parsing events
self.report.rows_processed += 1
# Make some minor type conversions.
if hasattr(row, "_asdict"):
# Compat with SQLAlchemy 1.3 and 1.4
# See https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#rowproxy-is-no-longer-a-proxy-is-now-called-row-and-behaves-like-an-enhanced-named-tuple.
event_dict = row._asdict()
else:
event_dict = dict(row)

# no use processing events that don't have a query text
if not event_dict["QUERY_TEXT"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
logger,
register_custom_type,
)
from datahub.ingestion.source.sql.sql_config import make_sqlalchemy_uri
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
Expand Down Expand Up @@ -147,7 +146,6 @@ class ClickHouseConfig(
include_materialized_views: Optional[bool] = Field(default=True, description="")

def get_sql_alchemy_url(self, current_db=None):

url = make_url(
super().get_sql_alchemy_url(uri_opts=self.uri_opts, current_db=current_db)
)
Expand All @@ -158,42 +156,11 @@ def get_sql_alchemy_url(self, current_db=None):
)

# We can setup clickhouse ingestion in sqlalchemy_uri form and config form.

# If we use sqlalchemu_uri form then super().get_sql_alchemy_url doesn't
# update current_db because it return self.sqlalchemy_uri without any update.
# This code bellow needed for rewriting sqlalchemi_uri and replace database with current_db.from
# For the future without python3.7 and sqlalchemy 1.3 support we can use code
# url=url.set(db=current_db), but not now.

# Why we need to update database in uri at all?
# Because we get database from sqlalchemy inspector and inspector we form from url inherited from
# TwoTierSQLAlchemySource and SQLAlchemySource

if self.sqlalchemy_uri and current_db:
self.scheme = url.drivername
self.username = url.username
self.password = (
pydantic.SecretStr(str(url.password))
if url.password
else pydantic.SecretStr("")
)
if url.host and url.port:
self.host_port = url.host + ":" + str(url.port)
elif url.host:
self.host_port = url.host
# untill released https://github.com/python/mypy/pull/15174
self.uri_opts = {str(k): str(v) for (k, v) in url.query.items()}

url = make_url(
make_sqlalchemy_uri(
self.scheme,
self.username,
self.password.get_secret_value() if self.password else None,
self.host_port,
current_db if current_db else self.database,
uri_opts=self.uri_opts,
)
)
url = url.set(database=current_db)

return str(url)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,7 @@ def _get_clickhouse_history(self):
results = engine.execute(query)
events = []
for row in results:
# minor type conversion
if hasattr(row, "_asdict"):
event_dict = row._asdict()
else:
event_dict = dict(row)
event_dict = row._asdict()

# stripping extra spaces caused by above _asdict() conversion
for k, v in event_dict.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,7 @@ def _gen_access_events_from_history_query(
for row in results:
if not self._should_process_row(row):
continue
if hasattr(row, "_asdict"):
# Compatibility with sqlalchemy 1.4.x.
row = row._asdict()
row = row._asdict()
access_event = RedshiftAccessEvent(**dict(row.items()))
# Replace database name with the alias name if one is provided in the config.
if self.config.database_alias:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,7 @@ def _get_trino_history(self):
results = engine.execute(query)
events = []
for row in results:
# minor type conversion
if hasattr(row, "_asdict"):
event_dict = row._asdict()
else:
event_dict = dict(row)
event_dict = row._asdict()

# stripping extra spaces caused by above _asdict() conversion
for k, v in event_dict.items():
Expand Down

0 comments on commit 303a2d0

Please sign in to comment.