diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py index 891b64066721b..fc7619f34de21 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py @@ -2,7 +2,7 @@ import pymysql # noqa: F401 from pydantic.fields import Field -from sqlalchemy import util +from sqlalchemy import create_engine, util from sqlalchemy.dialects.mysql import base from sqlalchemy.dialects.mysql.enumerated import SET from sqlalchemy.engine.reflection import Inspector @@ -15,6 +15,11 @@ platform_name, support_status, ) +from datahub.ingestion.api.source import ( + CapabilityReport, + TestableSource, + TestConnectionReport, +) from datahub.ingestion.source.sql.sql_common import ( make_sqlalchemy_type, register_custom_type, @@ -68,7 +73,7 @@ def get_identifier(self, *, schema: str, table: str) -> str: @capability(SourceCapability.DOMAINS, "Supported via the `domain` config field") @capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") -class MySQLSource(TwoTierSQLAlchemySource): +class MySQLSource(TwoTierSQLAlchemySource, TestableSource): """ This plugin extracts the following: @@ -88,6 +93,21 @@ def create(cls, config_dict, ctx): config = MySQLConfig.parse_obj(config_dict) return cls(config, ctx) + @staticmethod + def test_connection(config_dict: dict) -> TestConnectionReport: + test_report = TestConnectionReport() + try: + source_config = MySQLConfig.parse_obj_allow_extras(config_dict) + url = source_config.get_sql_alchemy_url() + engine = create_engine(url, **source_config.options) + with engine.connect(): + test_report.basic_connectivity = CapabilityReport(capable=True) + except Exception as e: + test_report.basic_connectivity = CapabilityReport( + capable=False, failure_reason=str(e) + ) + return test_report + def add_profile_metadata(self, inspector: Inspector) -> None: if not self.config.is_profiling_enabled(): return diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py b/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py index c8418075928ef..a6f933cf3f513 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py @@ -29,6 +29,11 @@ platform_name, support_status, ) +from datahub.ingestion.api.source import ( + CapabilityReport, + TestableSource, + TestConnectionReport, +) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.sql.sql_common import ( SQLAlchemySource, @@ -132,7 +137,7 @@ class PostgresConfig(BasePostgresConfig): @capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") @capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") @capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration") -class PostgresSource(SQLAlchemySource): +class PostgresSource(SQLAlchemySource, TestableSource): """ This plugin extracts the following: @@ -153,6 +158,23 @@ def create(cls, config_dict, ctx): config = PostgresConfig.parse_obj(config_dict) return cls(config, ctx) + @staticmethod + def test_connection(config_dict: dict) -> TestConnectionReport: + test_report = TestConnectionReport() + try: + source_config = PostgresConfig.parse_obj_allow_extras(config_dict) + url = source_config.get_sql_alchemy_url( + database=source_config.database or source_config.initial_database + ) + engine = create_engine(url, **source_config.options) + with engine.connect(): + test_report.basic_connectivity = CapabilityReport(capable=True) + except Exception as e: + test_report.basic_connectivity = CapabilityReport( + capable=False, failure_reason=str(e) + ) + return test_report + def get_inspectors(self) -> Iterable[Inspector]: # Note: get_sql_alchemy_url will choose `sqlalchemy_uri` over the passed in database url = self.config.get_sql_alchemy_url(