Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integration/fivetran): Fivetran source ingestion integration #14

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def get_long_description():
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8"},
"unity-catalog": databricks | sqllineage_lib,
"fivetran": {"requests"},
}

# This is mainly used to exclude plugins from the Docker image.
Expand Down Expand Up @@ -505,6 +506,7 @@ def get_long_description():
"nifi",
"vertica",
"mode",
"fivetran",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we added one unit test case on fivetran source

]
if plugin
for dependency in plugins[plugin]
Expand Down Expand Up @@ -605,6 +607,7 @@ def get_long_description():
"unity-catalog = datahub.ingestion.source.unity.source:UnityCatalogSource",
"gcs = datahub.ingestion.source.gcs.gcs_source:GCSSource",
"sql-queries = datahub.ingestion.source.sql_queries:SqlQueriesSource",
"fivetran = datahub.ingestion.source.fivetran:FivetranSource",
],
"datahub.ingestion.transformer.plugins": [
"simple_remove_dataset_ownership = datahub.ingestion.transformer.remove_dataset_ownership:SimpleRemoveDatasetOwnership",
Expand Down
13 changes: 12 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ def auto_status_aspect(
For all entities that don't have a status aspect, add one with removed set to false.
"""

skip_entities: Set[str] = {"dataProcessInstance"}
all_urns: Set[str] = set()
status_urns: Set[str] = set()
skip_urns: Set[str] = set()
for wu in stream:
urn = wu.get_urn()
all_urns.add(urn)
Expand All @@ -90,9 +92,18 @@ def auto_status_aspect(
else:
raise ValueError(f"Unexpected type {type(wu.metadata)}")

if (
not isinstance(wu.metadata, MetadataChangeEventClass)
and wu.metadata.entityType in skip_entities
):
# If any entity does not support aspect 'status' then skip that entity from adding status aspect.
# Example like dataProcessInstance doesn't suppport status aspect.
# If not skipped gives error: java.lang.RuntimeException: Unknown aspect status for entity dataProcessInstance
skip_urns.add(urn)

yield wu

for urn in sorted(all_urns - status_urns):
for urn in sorted(all_urns - status_urns - skip_urns):
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=False),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from datahub.ingestion.source.fivetran.fivetran import FivetranSource
135 changes: 135 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import logging
from dataclasses import dataclass, field as dataclass_field
from typing import Dict, List, Optional

import pydantic
from pydantic import Field
from pydantic.class_validators import root_validator

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig

logger = logging.getLogger(__name__)


class Constant:
"""
keys used in fivetran plugin
"""

ORCHESTRATOR = "fivetran"
# table column name
SOURCE_SCHEMA_NAME = "source_schema_name"
SOURCE_TABLE_NAME = "source_table_name"
DESTINATION_SCHEMA_NAME = "destination_schema_name"
DESTINATION_TABLE_NAME = "destination_table_name"
SYNC_ID = "sync_id"
MESSAGE_DATA = "message_data"
TIME_STAMP = "time_stamp"
STATUS = "status"
USER_ID = "user_id"
GIVEN_NAME = "given_name"
FAMILY_NAME = "family_name"
CONNECTOR_ID = "connector_id"
CONNECTOR_NAME = "connector_name"
CONNECTOR_TYPE_ID = "connector_type_id"
PAUSED = "paused"
SYNC_FREQUENCY = "sync_frequency"
DESTINATION_ID = "destination_id"
CONNECTING_USER_ID = "connecting_user_id"
# Job status constants
SUCCESSFUL = "SUCCESSFUL"
FAILURE_WITH_TASK = "FAILURE_WITH_TASK"
CANCELED = "CANCELED"


SUPPORTED_DATA_PLATFORM_MAPPING = {
"postgres": "postgres",
"snowflake": "snowflake",
"mysql": "mysql",
}


class SnowflakeDestinationConfig(BaseSnowflakeConfig):
database: str = Field(description="The fivetran connector log database.")
log_schema: str = Field(description="The fivetran connector log schema.")


class FivetranLogConfig(ConfigModel):
destination_platform: str = pydantic.Field(
default="snowflake",
description="The destination platform where fivetran connector log tables are dumped.",
)
snowflake_destination_config: Optional[SnowflakeDestinationConfig] = pydantic.Field(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rename it to destination_config

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this config is specific to snowflake destination only.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type is sufficient to understand SnowflakeDestinationConfig, later we can change it to Union[SnowflakeDestinationConfig|BigQueryDestinationConfig], so let's rename it to destination_config

default=None,
description="If destination platform is 'snowflake', provide snowflake configuration.",
)

@root_validator(pre=True)
def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict:
destination_platform = values["destination_platform"]
if destination_platform == "snowflake":
if "snowflake_destination_config" not in values:
raise ValueError(
"If destination platform is 'snowflake', user must provide snowflake destination configuration in the recipe."
)
else:
raise ValueError(
f"Destination platform '{destination_platform}' is not yet supported."
)
return values


@dataclass
class FivetranSourceReport(StaleEntityRemovalSourceReport):
connectors_scanned: int = 0
filtered_connectors: List[str] = dataclass_field(default_factory=list)

def report_connectors_scanned(self, count: int = 1) -> None:
self.connectors_scanned += count

def report_connectors_dropped(self, model: str) -> None:
self.filtered_connectors.append(model)


class PlatformDetail(ConfigModel):
platform_instance: Optional[str] = pydantic.Field(
default=None,
description="The instance of the platform that all assets produced by this recipe belong to",
)
env: str = pydantic.Field(
default=DEFAULT_ENV,
description="The environment that all assets produced by DataHub platform ingestion source belong to",
)


class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
fivetran_log_config: FivetranLogConfig = pydantic.Field(
description="Fivetran log connector destination server configurations.",
)
connector_patterns: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for connectors to filter in ingestion.",
)
# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Airbyte Stateful Ingestion Config."
)
# Fivetran connector all sources to platform instance mapping
sources_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
default={},
description="A mapping of the connector's all sources dataset to platform instance. Use connector id as key.",
)
# Fivetran destination to platform instance mapping
destination_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
default={},
description="A mapping of destination dataset to platform instance. Use destination id as key.",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import json
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Tuple

from sqlalchemy import create_engine

from datahub.ingestion.source.fivetran.config import (
Constant,
FivetranSourceConfig,
FivetranSourceReport,
)
from datahub.ingestion.source.fivetran.fivetran_query import FivetranLogQuery

logger: logging.Logger = logging.getLogger(__name__)


@dataclass
class Connector:
connector_id: str
connector_name: str
connector_type: str
paused: bool
sync_frequency: int
destination_id: str
user_name: str
source_tables: List[str]
destination_tables: List[str]
jobs: List["Job"]


@dataclass
class Job:
job_id: str
start_time: int
end_time: int
status: str


class FivetranLogDataDictionary:
def __init__(
self, config: FivetranSourceConfig, report: FivetranSourceReport
) -> None:
self.logger = logger
self.config = config
self.report = report
self.engine = self._get_log_destination_engine()

def _get_log_destination_engine(self) -> Any:
destination_platform = self.config.fivetran_log_config.destination_platform
engine = None
if destination_platform == "snowflake":
snowflake_destination_config = (
self.config.fivetran_log_config.snowflake_destination_config
)
if snowflake_destination_config is not None:
engine = create_engine(
snowflake_destination_config.get_sql_alchemy_url(),
**snowflake_destination_config.get_options(),
)
engine.execute(
FivetranLogQuery.use_schema(
snowflake_destination_config.database,
snowflake_destination_config.log_schema,
)
)
return engine

def _query(self, query: str) -> List[Dict]:
logger.debug("Query : {}".format(query))
resp = self.engine.execute(query)
return [row for row in resp]

def _get_table_lineage(self, connector_id: str) -> Tuple[List[str], List[str]]:
table_lineage = self._query(
FivetranLogQuery.get_table_lineage_query(connector_id=connector_id)
)
source_tables: List[str] = []
destination_tables: List[str] = []
for each in table_lineage:
source_tables.append(
f"{each[Constant.SOURCE_SCHEMA_NAME]}.{each[Constant.SOURCE_TABLE_NAME]}"
)
destination_tables.append(
f"{each[Constant.DESTINATION_SCHEMA_NAME]}.{each[Constant.DESTINATION_TABLE_NAME]}"
)
return source_tables, destination_tables

def _get_jobs_list(self, connector_id: str) -> List[Job]:
jobs: List[Job] = []
sync_start_logs = {
row[Constant.SYNC_ID]: row
for row in self._query(
FivetranLogQuery.get_sync_start_logs_query(connector_id=connector_id)
)
}
sync_end_logs = {
row[Constant.SYNC_ID]: row
for row in self._query(
FivetranLogQuery.get_sync_end_logs_query(connector_id=connector_id)
)
}
for sync_id in sync_start_logs.keys():
if sync_end_logs.get(sync_id) is None:
# If no sync-end event log for this sync id that means sync is still in progress
continue

message_data = json.loads(sync_end_logs[sync_id][Constant.MESSAGE_DATA])
if type(message_data) is str:
# Sometimes message_data contains json string inside string
# Ex: '"{\"status\":\"SUCCESSFUL\"}"'
# Hence, need to do json loads twice.
message_data = json.loads(message_data)

jobs.append(
Job(
job_id=sync_id,
start_time=round(
sync_start_logs[sync_id][Constant.TIME_STAMP].timestamp()
),
end_time=round(
sync_end_logs[sync_id][Constant.TIME_STAMP].timestamp()
),
status=message_data[Constant.STATUS],
)
)
return jobs

def _get_user_name(self, user_id: str) -> str:
user_details = self._query(FivetranLogQuery.get_user_query(user_id=user_id))[0]
return (
f"{user_details[Constant.GIVEN_NAME]} {user_details[Constant.FAMILY_NAME]}"
)

def get_connectors_list(self) -> List[Connector]:
connectors: List[Connector] = []
connector_list = self._query(FivetranLogQuery.get_connectors_query())
for connector in connector_list:
if not self.config.connector_patterns.allowed(
connector[Constant.CONNECTOR_NAME]
):
self.report.report_connectors_dropped(connector[Constant.CONNECTOR_ID])
continue

source_tables, destination_tables = self._get_table_lineage(
connector[Constant.CONNECTOR_ID]
)

connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],
connector_name=connector[Constant.CONNECTOR_NAME],
connector_type=connector[Constant.CONNECTOR_TYPE_ID],
paused=connector[Constant.PAUSED],
sync_frequency=connector[Constant.SYNC_FREQUENCY],
destination_id=connector[Constant.DESTINATION_ID],
user_name=self._get_user_name(
connector[Constant.CONNECTING_USER_ID]
),
source_tables=source_tables,
destination_tables=destination_tables,
jobs=self._get_jobs_list(connector[Constant.CONNECTOR_ID]),
)
)
return connectors
Loading