Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build(ingest): upgrade to sqlalchemy 1.4, drop 1.3 support #8810

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
## Next

### Breaking Changes
- #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now.

### Potential Downtime

Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ task installDev(type: Exec, dependsOn: [install]) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"${venv_name}/bin/pip install -e .[dev] ${extra_pip_requirements} && " +
"./scripts/install-sqlalchemy-stubs.sh && " +
"touch ${sentinel_file}"
}

Expand All @@ -82,7 +81,6 @@ task installAll(type: Exec, dependsOn: [install]) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"${venv_name}/bin/pip install -e .[all] ${extra_pip_requirements} && " +
"./scripts/install-sqlalchemy-stubs.sh && " +
"touch ${sentinel_file}"
}

Expand Down Expand Up @@ -119,7 +117,6 @@ task lint(type: Exec, dependsOn: installDev) {
task lintFix(type: Exec, dependsOn: installDev) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"./scripts/install-sqlalchemy-stubs.sh && " +
"black src/ tests/ examples/ && " +
"isort src/ tests/ examples/ && " +
"flake8 src/ tests/ examples/ && " +
Expand Down
28 changes: 0 additions & 28 deletions metadata-ingestion/scripts/install-sqlalchemy-stubs.sh

This file was deleted.

25 changes: 10 additions & 15 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def get_long_description():

sql_common = {
# Required for all SQL sources.
"sqlalchemy>=1.3.24, <2",
# This is temporary lower bound that we're open to loosening/tightening as requirements show up
"sqlalchemy>=1.4.39, <2",
# Required for SQL profiling.
"great-expectations>=0.15.12, <=0.15.50",
# scipy version restricted to reduce backtracking, used by great-expectations,
Expand Down Expand Up @@ -172,13 +173,13 @@ def get_long_description():
}

clickhouse_common = {
# Clickhouse 0.1.8 requires SQLAlchemy 1.3.x, while the newer versions
# allow SQLAlchemy 1.4.x.
"clickhouse-sqlalchemy>=0.1.8",
# Clickhouse 0.2.0 adds support for SQLAlchemy 1.4.x
"clickhouse-sqlalchemy>=0.2.0",
}

redshift_common = {
"sqlalchemy-redshift",
# Clickhouse 0.8.3 adds support for SQLAlchemy 1.4.x
"sqlalchemy-redshift>=0.8.3",
"psycopg2-binary",
"GeoAlchemy2",
*sqllineage_lib,
Expand All @@ -188,13 +189,8 @@ def get_long_description():
snowflake_common = {
# Snowflake plugin utilizes sql common
*sql_common,
# Required for all Snowflake sources.
# See https://github.com/snowflakedb/snowflake-sqlalchemy/issues/234 for why 1.2.5 is blocked.
"snowflake-sqlalchemy>=1.2.4, !=1.2.5",
# Because of https://github.com/snowflakedb/snowflake-sqlalchemy/issues/350 we need to restrict SQLAlchemy's max version.
# Eventually we should just require snowflake-sqlalchemy>=1.4.3, but I won't do that immediately
# because it may break Airflow users that need SQLAlchemy 1.3.x.
"SQLAlchemy<1.4.42",
# https://github.com/snowflakedb/snowflake-sqlalchemy/issues/350
"snowflake-sqlalchemy>=1.4.3",
# See https://github.com/snowflakedb/snowflake-connector-python/pull/1348 for why 2.8.2 is blocked
"snowflake-connector-python!=2.8.2",
"pandas",
Expand All @@ -206,9 +202,7 @@ def get_long_description():
}

trino = {
# Trino 0.317 broke compatibility with SQLAlchemy 1.3.24.
# See https://github.com/trinodb/trino-python-client/issues/250.
"trino[sqlalchemy]>=0.308, !=0.317",
"trino[sqlalchemy]>=0.308",
}

pyhive_common = {
Expand Down Expand Up @@ -430,6 +424,7 @@ def get_long_description():
"types-Deprecated",
"types-protobuf>=4.21.0.1",
"types-tzlocal",
"sqlalchemy2-stubs",
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,7 @@ def get_aspects(
return

for i, row in enumerate(rows):
# TODO: Replace with namedtuple usage once we drop sqlalchemy 1.3
if hasattr(row, "_asdict"):
row_dict = row._asdict()
else:
row_dict = dict(row)
row_dict = row._asdict()
mcp = self._parse_row(row_dict)
if mcp:
yield mcp, row_dict["createdon"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,17 +451,10 @@ def _get_operation_aspect_work_unit(
yield wu

def _process_snowflake_history_row(
self, row: Any
self, event_dict: dict
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use DictCursor in snowflake, so this is always a dictionary.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

) -> Iterable[SnowflakeJoinedAccessEvent]:
try: # big hammer try block to ensure we don't fail on parsing events
self.report.rows_processed += 1
# Make some minor type conversions.
if hasattr(row, "_asdict"):
# Compat with SQLAlchemy 1.3 and 1.4
# See https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#rowproxy-is-no-longer-a-proxy-is-now-called-row-and-behaves-like-an-enhanced-named-tuple.
event_dict = row._asdict()
else:
event_dict = dict(row)

# no use processing events that don't have a query text
if not event_dict["QUERY_TEXT"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
logger,
register_custom_type,
)
from datahub.ingestion.source.sql.sql_config import make_sqlalchemy_uri
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
Expand Down Expand Up @@ -147,7 +146,6 @@ class ClickHouseConfig(
include_materialized_views: Optional[bool] = Field(default=True, description="")

def get_sql_alchemy_url(self, current_db=None):

url = make_url(
super().get_sql_alchemy_url(uri_opts=self.uri_opts, current_db=current_db)
)
Expand All @@ -158,42 +156,11 @@ def get_sql_alchemy_url(self, current_db=None):
)

# We can setup clickhouse ingestion in sqlalchemy_uri form and config form.

# If we use sqlalchemu_uri form then super().get_sql_alchemy_url doesn't
# update current_db because it return self.sqlalchemy_uri without any update.
# This code bellow needed for rewriting sqlalchemi_uri and replace database with current_db.from
# For the future without python3.7 and sqlalchemy 1.3 support we can use code
# url=url.set(db=current_db), but not now.

# Why we need to update database in uri at all?
# Because we get database from sqlalchemy inspector and inspector we form from url inherited from
# TwoTierSQLAlchemySource and SQLAlchemySource

if self.sqlalchemy_uri and current_db:
self.scheme = url.drivername
self.username = url.username
self.password = (
pydantic.SecretStr(str(url.password))
if url.password
else pydantic.SecretStr("")
)
if url.host and url.port:
self.host_port = url.host + ":" + str(url.port)
elif url.host:
self.host_port = url.host
# untill released https://github.com/python/mypy/pull/15174
self.uri_opts = {str(k): str(v) for (k, v) in url.query.items()}

url = make_url(
make_sqlalchemy_uri(
self.scheme,
self.username,
self.password.get_secret_value() if self.password else None,
self.host_port,
current_db if current_db else self.database,
uri_opts=self.uri_opts,
)
)
url = url.set(database=current_db)

return str(url)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,7 @@ def _get_clickhouse_history(self):
results = engine.execute(query)
events = []
for row in results:
# minor type conversion
if hasattr(row, "_asdict"):
event_dict = row._asdict()
else:
event_dict = dict(row)
event_dict = row._asdict()

# stripping extra spaces caused by above _asdict() conversion
for k, v in event_dict.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,7 @@ def _gen_access_events_from_history_query(
for row in results:
if not self._should_process_row(row):
continue
if hasattr(row, "_asdict"):
# Compatibility with sqlalchemy 1.4.x.
row = row._asdict()
row = row._asdict()
access_event = RedshiftAccessEvent(**dict(row.items()))
# Replace database name with the alias name if one is provided in the config.
if self.config.database_alias:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,7 @@ def _get_trino_history(self):
results = engine.execute(query)
events = []
for row in results:
# minor type conversion
if hasattr(row, "_asdict"):
event_dict = row._asdict()
else:
event_dict = dict(row)
event_dict = row._asdict()

# stripping extra spaces caused by above _asdict() conversion
for k, v in event_dict.items():
Expand Down
Loading