Skip to content

Commit

Permalink
Code to execute query through Sqlalchemy engine added
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 committed Oct 5, 2023
1 parent b04ff8c commit 3f392f4
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 145 deletions.
1 change: 0 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,6 @@ def get_long_description():
"nifi",
"vertica",
"mode",
"fivetran",
]
if plugin
for dependency in plugins[plugin]
Expand Down
68 changes: 37 additions & 31 deletions metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pydantic
from pydantic import Field
from pydantic.class_validators import root_validator

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin
Expand All @@ -26,29 +27,24 @@ class Constant:

ORCHESTRATOR = "fivetran"
# table column name
SOURCE_SCHEMA_NAME = "SOURCE_SCHEMA_NAME"
SOURCE_TABLE_NAME = "SOURCE_TABLE_NAME"
DESTINATION_SCHEMA_NAME = "DESTINATION_SCHEMA_NAME"
DESTINATION_TABLE_NAME = "DESTINATION_TABLE_NAME"
SYNC_ID = "SYNC_ID"
MESSAGE_DATA = "MESSAGE_DATA"
TIME_STAMP = "TIME_STAMP"
SOURCE_SCHEMA_NAME = "source_schema_name"
SOURCE_TABLE_NAME = "source_table_name"
DESTINATION_SCHEMA_NAME = "destination_schema_name"
DESTINATION_TABLE_NAME = "destination_table_name"
SYNC_ID = "sync_id"
MESSAGE_DATA = "message_data"
TIME_STAMP = "time_stamp"
STATUS = "status"
USER_ID = "USER_ID"
GIVEN_NAME = "GIVEN_NAME"
FAMILY_NAME = "FAMILY_NAME"
EMAIL = "EMAIL"
EMAIL_DISABLED = "EMAIL_DISABLED"
VERIFIED = "VERIFIED"
CREATED_AT = "CREATED_AT"
CONNECTOR_ID = "CONNECTOR_ID"
CONNECTOR_NAME = "CONNECTOR_NAME"
CONNECTOR_TYPE_ID = "CONNECTOR_TYPE_ID"
PAUSED = "PAUSED"
SYNC_FREQUENCY = "SYNC_FREQUENCY"
DESTINATION_ID = "DESTINATION_ID"
CONNECTING_USER_ID = "CONNECTING_USER_ID"

USER_ID = "user_id"
GIVEN_NAME = "given_name"
FAMILY_NAME = "family_name"
CONNECTOR_ID = "connector_id"
CONNECTOR_NAME = "connector_name"
CONNECTOR_TYPE_ID = "connector_type_id"
PAUSED = "paused"
SYNC_FREQUENCY = "sync_frequency"
DESTINATION_ID = "destination_id"
CONNECTING_USER_ID = "connecting_user_id"
# Job status constants
SUCCESSFUL = "SUCCESSFUL"
FAILURE_WITH_TASK = "FAILURE_WITH_TASK"
Expand All @@ -63,24 +59,34 @@ class Constant:


class SnowflakeDestinationConfig(BaseSnowflakeConfig):
database: str = Field(
default=None, description="The fivetran connector log database."
)
log_schema: Optional[str] = Field(
default="FIVETRAN_LOG", description="The fivetran connector log schema."
)
database: str = Field(description="The fivetran connector log database.")
log_schema: str = Field(description="The fivetran connector log schema.")


class FivetranLogConfig(ConfigModel):
destination_platform: str = pydantic.Field(
default=None,
default="snowflake",
description="The destination platform where fivetran connector log tables are dumped.",
)
snowflake_destination_config: Optional[SnowflakeDestinationConfig] = pydantic.Field(
default=None,
description="If destination platform is 'snowflake', provide snowflake configuration.",
)

@root_validator(pre=True)
def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict:
destination_platform = values["destination_platform"]
if destination_platform == "snowflake":
if "snowflake_destination_config" not in values:
raise ValueError(
"If destination platform is 'snowflake', user must provide snowflake destination configuration in the recipe."
)
else:
raise ValueError(
f"Destination platform '{destination_platform}' is not yet supported."
)
return values


@dataclass
class FivetranSourceReport(StaleEntityRemovalSourceReport):
Expand Down Expand Up @@ -120,10 +126,10 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
# Fivetran connector all sources to platform instance mapping
sources_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
default={},
description="A mapping of connector all sources to Data platform instance. Provide connector id as key.",
description="A mapping of the connector's all sources to platform instance. Use connector id as key.",
)
# Fivetran destination to platform instance mapping
destination_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
default={},
description="A mapping of fivetran destination to Data platform instance. Provide destination id as key.",
description="A mapping of destination to platform instance. Use destination id as key.",
)
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import json
import logging
from dataclasses import dataclass
from typing import List, Tuple
from typing import Any, Dict, List, Tuple

from sqlalchemy import create_engine

from datahub.ingestion.source.fivetran.config import (
Constant,
FivetranSourceConfig,
FivetranSourceReport,
)
from datahub.ingestion.source.fivetran.fivetran_query import FivetranLogQuery
from datahub.ingestion.source.fivetran.log_destination import (
LogDestination,
SnowflakeDestination,
)

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -25,10 +23,10 @@ class Connector:
paused: bool
sync_frequency: int
destination_id: str
user_name: str
source_tables: List[str]
destination_tables: List[str]
jobs: List["Job"]
user: "User"


@dataclass
Expand All @@ -39,39 +37,42 @@ class Job:
status: str


@dataclass
class User:
user_id: str
given_name: str
family_name: str
email: str
email_disabled: bool
verified: bool
created_at: int


class FivetranLogDataDictionary:
def __init__(
self, config: FivetranSourceConfig, report: FivetranSourceReport
) -> None:
self.logger = logger
self.config = config
self.report = report
self.log_destination: LogDestination = self._get_fivetran_log_destination()
self.engine = self._get_log_destination_engine()

def _get_fivetran_log_destination(self) -> LogDestination:
def _get_log_destination_engine(self) -> Any:
destination_platform = self.config.fivetran_log_config.destination_platform
engine = None
if destination_platform == "snowflake":
return SnowflakeDestination(
snowflake_destination_config = (
self.config.fivetran_log_config.snowflake_destination_config
)
else:
raise ValueError(
f"Destination platform '{destination_platform}' is not yet supported."
)
if snowflake_destination_config is not None:
engine = create_engine(
snowflake_destination_config.get_sql_alchemy_url(),
**snowflake_destination_config.get_options(),
)
engine.execute(
FivetranLogQuery.use_schema(
snowflake_destination_config.database,
snowflake_destination_config.log_schema,
)
)
return engine

def _query(self, query: str) -> List[Dict]:
logger.debug("Query : {}".format(query))
resp = self.engine.execute(query)
return [row for row in resp]

def _get_table_lineage(self, connector_id: str) -> Tuple[List[str], List[str]]:
table_lineage = self.log_destination.query(
table_lineage = self._query(
FivetranLogQuery.get_table_lineage_query(connector_id=connector_id)
)
source_tables: List[str] = []
Expand All @@ -89,13 +90,13 @@ def _get_jobs_list(self, connector_id: str) -> List[Job]:
jobs: List[Job] = []
sync_start_logs = {
row[Constant.SYNC_ID]: row
for row in self.log_destination.query(
for row in self._query(
FivetranLogQuery.get_sync_start_logs_query(connector_id=connector_id)
)
}
sync_end_logs = {
row[Constant.SYNC_ID]: row
for row in self.log_destination.query(
for row in self._query(
FivetranLogQuery.get_sync_end_logs_query(connector_id=connector_id)
)
}
Expand Down Expand Up @@ -125,31 +126,15 @@ def _get_jobs_list(self, connector_id: str) -> List[Job]:
)
return jobs

def _get_user_obj(self, user_id: str) -> User:
user_details = self.log_destination.query(
FivetranLogQuery.get_user_query(user_id=user_id)
)[0]
return User(
user_id=user_details[Constant.USER_ID],
given_name=user_details[Constant.GIVEN_NAME],
family_name=user_details[Constant.FAMILY_NAME],
email=user_details[Constant.EMAIL],
email_disabled=user_details[Constant.EMAIL_DISABLED],
verified=user_details[Constant.VERIFIED],
created_at=round(user_details[Constant.CREATED_AT].timestamp()),
def _get_user_name(self, user_id: str) -> str:
user_details = self._query(FivetranLogQuery.get_user_query(user_id=user_id))[0]
return (
f"{user_details[Constant.GIVEN_NAME]} {user_details[Constant.FAMILY_NAME]}"
)

def get_connectors_list(self) -> List[Connector]:
self.log_destination.query(
FivetranLogQuery.use_schema(
self.log_destination.get_database(), self.log_destination.get_schema()
)
)

connectors: List[Connector] = []
connector_list = self.log_destination.query(
FivetranLogQuery.get_connectors_query()
)
connector_list = self._query(FivetranLogQuery.get_connectors_query())
for connector in connector_list:
if not self.config.connector_patterns.allowed(
connector[Constant.CONNECTOR_NAME]
Expand All @@ -169,10 +154,12 @@ def get_connectors_list(self) -> List[Connector]:
paused=connector[Constant.PAUSED],
sync_frequency=connector[Constant.SYNC_FREQUENCY],
destination_id=connector[Constant.DESTINATION_ID],
user_name=self._get_user_name(
connector[Constant.CONNECTING_USER_ID]
),
source_tables=source_tables,
destination_tables=destination_tables,
jobs=self._get_jobs_list(connector[Constant.CONNECTOR_ID]),
user=self._get_user_obj(connector[Constant.CONNECTING_USER_ID]),
)
)
return connectors
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
FivetranSourceReport,
PlatformDetail,
)
from datahub.ingestion.source.fivetran.fivetran_schema import (
from datahub.ingestion.source.fivetran.data_classes import (
Connector,
FivetranLogDataDictionary,
Job,
User,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
Expand Down Expand Up @@ -141,13 +140,14 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob:
id=connector.connector_id,
flow_urn=dataflow_urn,
name=connector.connector_name,
owners={connector.user_name},
)

job_property_bag: Dict[str, str] = {}
allowed_connection_keys = [
Constant.PAUSED.lower(),
Constant.SYNC_FREQUENCY.lower(),
Constant.DESTINATION_ID.lower(),
Constant.PAUSED,
Constant.SYNC_FREQUENCY,
Constant.DESTINATION_ID,
]
for key in allowed_connection_keys:
if hasattr(connector, key) and getattr(connector, key) is not None:
Expand Down Expand Up @@ -203,17 +203,14 @@ def _get_connector_workunit(
self, connector: Connector
) -> Iterable[MetadataWorkUnit]:
self.report.report_connectors_scanned()
# Create dataflow entity with same name as connector name,
# Later soft delete this entity during ingestion
# Create dataflow entity with same name as connector name
dataflow = self._generate_dataflow_from_connector(connector)
for mcp in dataflow.generate_mcp():
# return workunit to Datahub Ingestion framework
yield mcp.as_workunit()

# Map Fivetran's connector entity with Datahub's datajob entity
datajob = self._generate_datajob_from_connector(connector)
for mcp in datajob.generate_mcp():
# return workunit to Datahub Ingestion framework
yield mcp.as_workunit()

# Map Fivetran's job/sync history entity with Datahub's data process entity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ def get_user_query(user_id: str) -> str:
return f"""
SELECT id as "USER_ID",
given_name as "GIVEN_NAME",
family_name as "FAMILY_NAME",
email as "EMAIL",
email_disabled as "EMAIL_DISABLED",
verified as "VERIFIED",
created_at as "CREATED_AT"
family_name as "FAMILY_NAME"
FROM USER
WHERE id = '{user_id}'"""

Expand Down

This file was deleted.

0 comments on commit 3f392f4

Please sign in to comment.