Skip to content

Commit

Permalink
add mysql and postgres source test_connection
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 committed Nov 24, 2023
1 parent e62127d commit 057617a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 3 deletions.
24 changes: 22 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pymysql # noqa: F401
from pydantic.fields import Field
from sqlalchemy import util
from sqlalchemy import create_engine, util
from sqlalchemy.dialects.mysql import base
from sqlalchemy.dialects.mysql.enumerated import SET
from sqlalchemy.engine.reflection import Inspector
Expand All @@ -15,6 +15,11 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import (
CapabilityReport,
TestableSource,
TestConnectionReport,
)
from datahub.ingestion.source.sql.sql_common import (
make_sqlalchemy_type,
register_custom_type,
Expand Down Expand Up @@ -68,7 +73,7 @@ def get_identifier(self, *, schema: str, table: str) -> str:
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
class MySQLSource(TwoTierSQLAlchemySource):
class MySQLSource(TwoTierSQLAlchemySource, TestableSource):
"""
This plugin extracts the following:
Expand All @@ -88,6 +93,21 @@ def create(cls, config_dict, ctx):
config = MySQLConfig.parse_obj(config_dict)
return cls(config, ctx)

@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
try:
source_config = MySQLConfig.parse_obj_allow_extras(config_dict)
url = source_config.get_sql_alchemy_url()
engine = create_engine(url, **source_config.options)
with engine.connect():
test_report.basic_connectivity = CapabilityReport(capable=True)
except Exception as e:
test_report.basic_connectivity = CapabilityReport(
capable=False, failure_reason=str(e)
)
return test_report

def add_profile_metadata(self, inspector: Inspector) -> None:
if not self.config.is_profiling_enabled():
return
Expand Down
24 changes: 23 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import (
CapabilityReport,
TestableSource,
TestConnectionReport,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemySource,
Expand Down Expand Up @@ -132,7 +137,7 @@ class PostgresConfig(BasePostgresConfig):
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration")
class PostgresSource(SQLAlchemySource):
class PostgresSource(SQLAlchemySource, TestableSource):
"""
This plugin extracts the following:
Expand All @@ -153,6 +158,23 @@ def create(cls, config_dict, ctx):
config = PostgresConfig.parse_obj(config_dict)
return cls(config, ctx)

@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
try:
source_config = PostgresConfig.parse_obj_allow_extras(config_dict)
url = source_config.get_sql_alchemy_url(
database=source_config.database or source_config.initial_database
)
engine = create_engine(url, **source_config.options)
with engine.connect():
test_report.basic_connectivity = CapabilityReport(capable=True)
except Exception as e:
test_report.basic_connectivity = CapabilityReport(
capable=False, failure_reason=str(e)
)
return test_report

def get_inspectors(self) -> Iterable[Inspector]:
# Note: get_sql_alchemy_url will choose `sqlalchemy_uri` over the passed in database
url = self.config.get_sql_alchemy_url(
Expand Down

0 comments on commit 057617a

Please sign in to comment.