Skip to content

Commit

Permalink
add fivetran source test connection
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 committed Nov 21, 2023
1 parent a704290 commit f819e1b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from datahub.ingestion.api.source import CapabilityReport, TestConnectionReport
from datahub.ingestion.source.fivetran.config import FivetranSourceConfig
from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI


class FivetranConnectionTest:
def __init__(self, config_dict: dict):
self.config = FivetranSourceConfig.parse_obj_allow_extras(config_dict)

def get_connection_test(self) -> TestConnectionReport:
return TestConnectionReport(basic_connectivity=self.basic_connectivity())

def basic_connectivity(self) -> CapabilityReport:
try:
self.audit_log = FivetranLogAPI(self.config.fivetran_log_config)
return CapabilityReport(capable=True)
except Exception as e:
return CapabilityReport(capable=False, failure_reason=str(e))
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.source import (
MetadataWorkUnitProcessor,
Source,
SourceReport,
TestableSource,
TestConnectionReport,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.fivetran.config import (
KNOWN_DATA_PLATFORM_MAPPING,
Expand All @@ -25,6 +31,7 @@
FivetranSourceReport,
PlatformDetail,
)
from datahub.ingestion.source.fivetran.connection_test import FivetranConnectionTest
from datahub.ingestion.source.fivetran.data_classes import Connector, Job
from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -54,7 +61,7 @@
SourceCapability.LINEAGE_FINE,
"Enabled by default, can be disabled via configuration `include_column_lineage`",
)
class FivetranSource(StatefulIngestionSourceBase):
class FivetranSource(StatefulIngestionSourceBase, TestableSource):
"""
This plugin extracts fivetran users, connectors, destinations and sync history.
This plugin is in beta and has only been tested on Snowflake connector.
Expand Down Expand Up @@ -261,6 +268,10 @@ def _get_connector_workunits(
dpi = self._generate_dpi_from_job(job, datajob)
yield from self._get_dpi_workunits(job, dpi)

@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
return FivetranConnectionTest(config_dict).get_connection_test()

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = FivetranSourceConfig.parse_obj(config_dict)
Expand Down

0 comments on commit f819e1b

Please sign in to comment.