Skip to content

Commit

Permalink
fix(ingest): drop deprecated database_alias from sql sources (datahub…
Browse files Browse the repository at this point in the history
…-project#9299)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
  • Loading branch information
mayurinehate and hsheth2 authored Nov 28, 2023
1 parent ff9876f commit 08fb730
Show file tree
Hide file tree
Showing 31 changed files with 1,052 additions and 909 deletions.
2 changes: 1 addition & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- Updating MySQL version for quickstarts to 8.2, may cause quickstart issues for existing instances.
- #9244: The `redshift-legacy` and `redshift-legacy-usage` sources, which have been deprecated for >6 months, have been removed. The new `redshift` source is a superset of the functionality provided by those legacy sources.

- `database_alias` config is no longer supported in SQL sources namely - Redshift, MySQL, Oracle, Postgres, Trino, Presto-on-hive. The config will automatically be ignored if it's present in your recipe. It has been deprecated since v0.9.6.
### Potential Downtime

### Deprecations
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class MetabaseConfig(DatasetLineageProviderConfigBase):
password: Optional[pydantic.SecretStr] = Field(
default=None, description="Metabase password."
)
# TODO: Check and remove this if no longer needed.
# Config database_alias is removed from sql sources.
database_alias_map: Optional[dict] = Field(
default=None,
description="Database name map to use when constructing dataset URN.",
Expand Down
12 changes: 0 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/common.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import DatasetLineageProviderConfigBase
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.sql.postgres import BasePostgresConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
Expand Down Expand Up @@ -87,10 +87,7 @@ class RedshiftConfig(
hidden_from_schema=True,
)

_database_alias_deprecation = pydantic_field_deprecated(
"database_alias",
message="database_alias is deprecated. Use platform_instance instead.",
)
_database_alias_removed = pydantic_removed_field("database_alias")

default_schema: str = Field(
default="public",
Expand Down Expand Up @@ -151,10 +148,8 @@ def check_email_is_set_on_usage(cls, values):
return values

@root_validator(skip_on_failure=True)
def check_database_or_database_alias_set(cls, values):
assert values.get("database") or values.get(
"database_alias"
), "either database or database_alias must be set"
def check_database_is_set(cls, values):
assert values.get("database"), "database must be set"
return values

@root_validator(skip_on_failure=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.aws.s3_util import strip_s3_prefix
from datahub.ingestion.source.redshift.common import get_db_name
from datahub.ingestion.source.redshift.config import LineageMode, RedshiftConfig
from datahub.ingestion.source.redshift.query import RedshiftQuery
from datahub.ingestion.source.redshift.redshift_schema import (
Expand Down Expand Up @@ -266,7 +265,7 @@ def _populate_lineage_map(
try:
cll: Optional[List[sqlglot_l.ColumnLineageInfo]] = None
raw_db_name = database
alias_db_name = get_db_name(self.config)
alias_db_name = self.config.database

for lineage_row in RedshiftDataDictionary.get_lineage_rows(
conn=connection, query=query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
DatasetContainerSubTypes,
DatasetSubTypes,
)
from datahub.ingestion.source.redshift.common import get_db_name
from datahub.ingestion.source.redshift.config import RedshiftConfig
from datahub.ingestion.source.redshift.lineage import RedshiftLineageExtractor
from datahub.ingestion.source.redshift.profile import RedshiftProfiler
Expand Down Expand Up @@ -393,8 +392,8 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:

def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
connection = RedshiftSource.get_redshift_connection(self.config)
database = get_db_name(self.config)
logger.info(f"Processing db {self.config.database} with name {database}")
database = self.config.database
logger.info(f"Processing db {database}")
self.report.report_ingestion_stage_start(METADATA_EXTRACTION)
self.db_tables[database] = defaultdict()
self.db_views[database] = defaultdict()
Expand Down Expand Up @@ -628,7 +627,7 @@ def gen_view_dataset_workunits(
) -> Iterable[MetadataWorkUnit]:
yield from self.gen_dataset_workunits(
table=view,
database=get_db_name(self.config),
database=self.config.database,
schema=schema,
sub_type=DatasetSubTypes.VIEW,
custom_properties={},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,6 @@ def _gen_access_events_from_history_query(
self.report.num_usage_stat_skipped += 1
continue

# Replace database name with the alias name if one is provided in the config.
if self.config.database_alias:
access_event.database = self.config.database_alias

if not self._should_process_event(access_event, all_tables=all_tables):
self.report.num_usage_stat_skipped += 1
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def host(self):

@property
def db(self):
return self.database_alias or self.database
return self.database


@platform_name("Microsoft SQL Server", id="mssql")
Expand Down Expand Up @@ -660,10 +660,7 @@ def get_identifier(
regular = f"{schema}.{entity}"
qualified_table_name = regular
if self.config.database:
if self.config.database_alias:
qualified_table_name = f"{self.config.database_alias}.{regular}"
else:
qualified_table_name = f"{self.config.database}.{regular}"
qualified_table_name = f"{self.config.database}.{regular}"
if self.current_database:
qualified_table_name = f"{self.current_database}.{regular}"
return (
Expand Down
6 changes: 1 addition & 5 deletions metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ class MySQLConnectionConfig(SQLAlchemyConnectionConfig):

class MySQLConfig(MySQLConnectionConfig, TwoTierSQLAlchemyConfig):
def get_identifier(self, *, schema: str, table: str) -> str:
regular = f"{schema}.{table}"
if self.database_alias:
return f"{self.database_alias}.{table}"
else:
return regular
return f"{schema}.{table}"


@platform_name("MySQL")
Expand Down
2 changes: 0 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ def get_sql_alchemy_url(self):
def get_identifier(self, schema: str, table: str) -> str:
regular = f"{schema}.{table}"
if self.add_database_name_to_urn:
if self.database_alias:
return f"{self.database_alias}.{regular}"
if self.database:
return f"{self.database}.{regular}"
return regular
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class PostgresSource(SQLAlchemySource):
- Metadata for databases, schemas, views, and tables
- Column types associated with each table
- Also supports PostGIS extensions
- database_alias (optional) can be used to change the name of database to be ingested
- Table, row, and column statistics via optional SQL profiling
"""

Expand Down Expand Up @@ -271,8 +270,6 @@ def get_identifier(
) -> str:
regular = f"{schema}.{entity}"
if self.config.database:
if self.config.database_alias:
return f"{self.config.database_alias}.{regular}"
return f"{self.config.database}.{regular}"
current_database = self.get_db_name(inspector)
return f"{current_database}.{regular}"
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,6 @@ def __init__(self, config: PrestoOnHiveConfig, ctx: PipelineContext) -> None:
)

def get_db_name(self, inspector: Inspector) -> str:
if self.config.database_alias:
return f"{self.config.database_alias}"
if self.config.database:
return f"{self.config.database}"
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
Expand Down Expand Up @@ -129,10 +129,6 @@ class SQLAlchemyConnectionConfig(ConfigModel):
host_port: str = Field(description="host URL")
database: Optional[str] = Field(default=None, description="database (catalog)")

database_alias: Optional[str] = Field(
default=None,
description="[Deprecated] Alias to apply to database when ingesting.",
)
scheme: str = Field(description="scheme")
sqlalchemy_uri: Optional[str] = Field(
default=None,
Expand All @@ -149,10 +145,7 @@ class SQLAlchemyConnectionConfig(ConfigModel):
),
)

_database_alias_deprecation = pydantic_field_deprecated(
"database_alias",
message="database_alias is deprecated. Use platform_instance instead.",
)
_database_alias_removed = pydantic_removed_field("database_alias")

def get_sql_alchemy_url(
self, uri_opts: Optional[Dict[str, Any]] = None, database: Optional[str] = None
Expand Down
11 changes: 3 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/sql/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,9 @@ class TrinoConfig(BasicSQLAlchemyConfig):
scheme: str = Field(default="trino", description="", hidden_from_docs=True)

def get_identifier(self: BasicSQLAlchemyConfig, schema: str, table: str) -> str:
regular = f"{schema}.{table}"
identifier = regular
if self.database_alias:
identifier = f"{self.database_alias}.{regular}"
elif self.database:
identifier = f"{self.database}.{regular}"
identifier = f"{schema}.{table}"
if self.database: # TODO: this should be required field
identifier = f"{self.database}.{identifier}"
return (
f"{self.platform_instance}.{identifier}"
if self.platform_instance
Expand Down Expand Up @@ -173,8 +170,6 @@ def __init__(
super().__init__(config, ctx, platform)

def get_db_name(self, inspector: Inspector) -> str:
if self.config.database_alias:
return f"{self.config.database_alias}"
if self.config.database:
return f"{self.config.database}"
else:
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class SupersetConfig(StatefulIngestionConfigBase, ConfigModel):
default=DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs",
)
# TODO: Check and remove this if no longer needed.
# Config database_alias is removed from sql sources.
database_alias: Dict[str, str] = Field(
default={},
description="Can be used to change mapping for database names in superset to what you have in datahub",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ source:
username: root
password: example
database: metagalaxy
database_alias: foogalaxy
host_port: localhost:53307
schema_pattern:
allow:
Expand Down
24 changes: 0 additions & 24 deletions metadata-ingestion/tests/integration/mysql/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,3 @@ def test_mysql_ingest_no_db(
output_path=tmp_path / "mysql_mces.json",
golden_path=test_resources_dir / golden_file,
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_mysql_ingest_with_db_alias(
mysql_runner, pytestconfig, test_resources_dir, tmp_path, mock_time
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "mysql_to_file_dbalias.yml").resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)

# Verify the output.
# Assert that all events generated have instance specific urns
import re

urn_pattern = "^" + re.escape(
"urn:li:dataset:(urn:li:dataPlatform:mysql,foogalaxy."
)
mce_helpers.assert_mcp_entity_urn(
filter="ALL",
entity_type="dataset",
regex_pattern=urn_pattern,
file=tmp_path / "mysql_mces_dbalias.json",
)
Loading

0 comments on commit 08fb730

Please sign in to comment.