Skip to content

Commit

Permalink
perf(ingestion/fivetran): Connector performance optimization (datahub…
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 authored Apr 30, 2024
1 parent 704ca65 commit 77045f9
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,25 +80,67 @@ def _query(self, query: str) -> List[Dict]:
resp = self.engine.execute(query)
return [row for row in resp]

def _get_table_lineage(self, connector_id: str) -> List[TableLineage]:
def _get_column_lineage_metadata(self) -> Dict[str, List]:
"""
Return's dict of column lineage metadata with key as '<SOURCE_TABLE_ID>-<DESTINATION_TABLE_ID>'
"""
all_column_lineage: Dict[str, List] = {}
column_lineage_result = self._query(
self.fivetran_log_query.get_column_lineage_query()
)
for column_lineage in column_lineage_result:
key = f"{column_lineage[Constant.SOURCE_TABLE_ID]}-{column_lineage[Constant.DESTINATION_TABLE_ID]}"
if key not in all_column_lineage:
all_column_lineage[key] = [column_lineage]
else:
all_column_lineage[key].append(column_lineage)
return all_column_lineage

def _get_connectors_table_lineage_metadata(self) -> Dict[str, List]:
"""
Return's dict of table lineage metadata with key as 'CONNECTOR_ID'
"""
connectors_table_lineage_metadata: Dict[str, List] = {}
table_lineage_result = self._query(
self.fivetran_log_query.get_table_lineage_query(connector_id=connector_id)
self.fivetran_log_query.get_table_lineage_query()
)
for table_lineage in table_lineage_result:
if (
table_lineage[Constant.CONNECTOR_ID]
not in connectors_table_lineage_metadata
):
connectors_table_lineage_metadata[
table_lineage[Constant.CONNECTOR_ID]
] = [table_lineage]
else:
connectors_table_lineage_metadata[
table_lineage[Constant.CONNECTOR_ID]
].append(table_lineage)
return connectors_table_lineage_metadata

def _get_table_lineage(
self,
column_lineage_metadata: Dict[str, List],
table_lineage_result: Optional[List],
) -> List[TableLineage]:
table_lineage_list: List[TableLineage] = []
if table_lineage_result is None:
return table_lineage_list
for table_lineage in table_lineage_result:
column_lineage_result = self._query(
self.fivetran_log_query.get_column_lineage_query(
source_table_id=table_lineage[Constant.SOURCE_TABLE_ID],
destination_table_id=table_lineage[Constant.DESTINATION_TABLE_ID],
)
column_lineage_result = column_lineage_metadata.get(
f"{table_lineage[Constant.SOURCE_TABLE_ID]}-{table_lineage[Constant.DESTINATION_TABLE_ID]}"
)
column_lineage_list: List[ColumnLineage] = [
ColumnLineage(
source_column=column_lineage[Constant.SOURCE_COLUMN_NAME],
destination_column=column_lineage[Constant.DESTINATION_COLUMN_NAME],
)
for column_lineage in column_lineage_result
]
column_lineage_list: List[ColumnLineage] = []
if column_lineage_result:
column_lineage_list = [
ColumnLineage(
source_column=column_lineage[Constant.SOURCE_COLUMN_NAME],
destination_column=column_lineage[
Constant.DESTINATION_COLUMN_NAME
],
)
for column_lineage in column_lineage_result
]
table_lineage_list.append(
TableLineage(
source_table=f"{table_lineage[Constant.SOURCE_SCHEMA_NAME]}.{table_lineage[Constant.SOURCE_TABLE_NAME]}",
Expand All @@ -109,30 +151,44 @@ def _get_table_lineage(self, connector_id: str) -> List[TableLineage]:

return table_lineage_list

def _get_jobs_list(self, connector_id: str) -> List[Job]:
def _get_all_connector_sync_logs(self) -> Dict[str, Dict]:
sync_logs = {}
for row in self._query(self.fivetran_log_query.get_sync_logs_query()):
if row[Constant.CONNECTOR_ID] not in sync_logs:
sync_logs[row[Constant.CONNECTOR_ID]] = {
row[Constant.SYNC_ID]: {
row["message_event"]: (
row[Constant.TIME_STAMP].timestamp(),
row[Constant.MESSAGE_DATA],
)
}
}
elif row[Constant.SYNC_ID] not in sync_logs[row[Constant.CONNECTOR_ID]]:
sync_logs[row[Constant.CONNECTOR_ID]][row[Constant.SYNC_ID]] = {
row["message_event"]: (
row[Constant.TIME_STAMP].timestamp(),
row[Constant.MESSAGE_DATA],
)
}
else:
sync_logs[row[Constant.CONNECTOR_ID]][row[Constant.SYNC_ID]][
row["message_event"]
] = (row[Constant.TIME_STAMP].timestamp(), row[Constant.MESSAGE_DATA])

return sync_logs

def _get_jobs_list(
self, connector_sync_log: Optional[Dict[str, Dict]]
) -> List[Job]:
jobs: List[Job] = []
sync_start_logs = {
row[Constant.SYNC_ID]: row
for row in self._query(
self.fivetran_log_query.get_sync_start_logs_query(
connector_id=connector_id
)
)
}
sync_end_logs = {
row[Constant.SYNC_ID]: row
for row in self._query(
self.fivetran_log_query.get_sync_end_logs_query(
connector_id=connector_id
)
)
}
for sync_id in sync_start_logs.keys():
if sync_end_logs.get(sync_id) is None:
# If no sync-end event log for this sync id that means sync is still in progress
if connector_sync_log is None:
return jobs
for sync_id in connector_sync_log.keys():
if len(connector_sync_log[sync_id]) != 2:
# If both sync-start and sync-end event log not present for this sync that means sync is still in progress
continue

message_data = sync_end_logs[sync_id][Constant.MESSAGE_DATA]
message_data = connector_sync_log[sync_id]["sync_end"][1]
if message_data is None:
continue
message_data = json.loads(message_data)
Expand All @@ -145,12 +201,8 @@ def _get_jobs_list(self, connector_id: str) -> List[Job]:
jobs.append(
Job(
job_id=sync_id,
start_time=round(
sync_start_logs[sync_id][Constant.TIME_STAMP].timestamp()
),
end_time=round(
sync_end_logs[sync_id][Constant.TIME_STAMP].timestamp()
),
start_time=round(connector_sync_log[sync_id]["sync_start"][0]),
end_time=round(connector_sync_log[sync_id]["sync_end"][0]),
status=message_data[Constant.STATUS],
)
)
Expand All @@ -172,6 +224,9 @@ def get_allowed_connectors_list(
self, connector_patterns: AllowDenyPattern, report: FivetranSourceReport
) -> List[Connector]:
connectors: List[Connector] = []
sync_logs = self._get_all_connector_sync_logs()
table_lineage_metadata = self._get_connectors_table_lineage_metadata()
column_lineage_metadata = self._get_column_lineage_metadata()
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]):
Expand All @@ -189,9 +244,14 @@ def get_allowed_connectors_list(
connector[Constant.CONNECTING_USER_ID]
),
table_lineage=self._get_table_lineage(
connector[Constant.CONNECTOR_ID]
column_lineage_metadata=column_lineage_metadata,
table_lineage_result=table_lineage_metadata.get(
connector[Constant.CONNECTOR_ID]
),
),
jobs=self._get_jobs_list(
sync_logs.get(connector[Constant.CONNECTOR_ID])
),
jobs=self._get_jobs_list(connector[Constant.CONNECTOR_ID]),
)
)
return connectors
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,20 @@ def get_user_query(self, user_id: str) -> str:
FROM {self.db_clause}user
WHERE id = '{user_id}'"""

def get_sync_start_logs_query(self, connector_id: str) -> str:
def get_sync_logs_query(self) -> str:
return f"""
SELECT time_stamp,
sync_id
FROM {self.db_clause}log
WHERE message_event = 'sync_start'
and connector_id = '{connector_id}' order by time_stamp"""

def get_sync_end_logs_query(self, connector_id: str) -> str:
return f"""
SELECT time_stamp,
SELECT connector_id,
sync_id,
message_data
message_event,
message_data,
time_stamp
FROM {self.db_clause}log
WHERE message_event = 'sync_end'
and connector_id = '{connector_id}' order by time_stamp"""
WHERE message_event in ('sync_start', 'sync_end')"""

def get_table_lineage_query(self, connector_id: str) -> str:
def get_table_lineage_query(self) -> str:
return f"""
SELECT stm.id as source_table_id,
SELECT stm.connector_id as connector_id,
stm.id as source_table_id,
stm.name as source_table_name,
ssm.name as source_schema_name,
dtm.id as destination_table_id,
Expand All @@ -59,17 +53,16 @@ def get_table_lineage_query(self, connector_id: str) -> str:
JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id
JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
WHERE stm.connector_id = '{connector_id}'"""
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id"""

def get_column_lineage_query(
self, source_table_id: str, destination_table_id: str
) -> str:
def get_column_lineage_query(self) -> str:
return f"""
SELECT scm.name as source_column_name,
SELECT scm.table_id as source_table_id,
dcm.table_id as destination_table_id,
scm.name as source_column_name,
dcm.name as destination_column_name
FROM {self.db_clause}column_lineage as cl
JOIN {self.db_clause}source_column_metadata as scm on
(cl.source_column_id = scm.id and scm.table_id = {source_table_id})
JOIN {self.db_clause}destination_column_metadata as dcm on
(cl.destination_column_id = dcm.id and dcm.table_id = {destination_table_id})"""
JOIN {self.db_clause}source_column_metadata as scm
on cl.source_column_id = scm.id
JOIN {self.db_clause}destination_column_metadata as dcm
on cl.destination_column_id = dcm.id"""
65 changes: 49 additions & 16 deletions metadata-ingestion/tests/integration/fivetran/test_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ def default_query_results(
return []
elif query == fivetran_log_query.get_connectors_query():
return connector_query_results
elif query == fivetran_log_query.get_table_lineage_query("calendar_elected"):
elif query == fivetran_log_query.get_table_lineage_query():
return [
{
"connector_id": "calendar_elected",
"source_table_id": "10040",
"source_table_name": "employee",
"source_schema_name": "public",
Expand All @@ -52,6 +53,7 @@ def default_query_results(
"destination_schema_name": "postgres_public",
},
{
"connector_id": "calendar_elected",
"source_table_id": "10041",
"source_table_name": "company",
"source_schema_name": "public",
Expand All @@ -60,15 +62,29 @@ def default_query_results(
"destination_schema_name": "postgres_public",
},
]
elif query == fivetran_log_query.get_column_lineage_query(
"10040", "7779"
) or query == fivetran_log_query.get_column_lineage_query("10041", "7780"):
elif query == fivetran_log_query.get_column_lineage_query():
return [
{
"source_table_id": "10040",
"destination_table_id": "7779",
"source_column_name": "id",
"destination_column_name": "id",
},
{
"source_table_id": "10040",
"destination_table_id": "7779",
"source_column_name": "name",
"destination_column_name": "name",
},
{
"source_table_id": "10041",
"destination_table_id": "7780",
"source_column_name": "id",
"destination_column_name": "id",
},
{
"source_table_id": "10041",
"destination_table_id": "7780",
"source_column_name": "name",
"destination_column_name": "name",
},
Expand All @@ -82,46 +98,63 @@ def default_query_results(
"email": "abc.xyz@email.com",
}
]
elif query == fivetran_log_query.get_sync_start_logs_query("calendar_elected"):
elif query == fivetran_log_query.get_sync_logs_query():
return [
{
"time_stamp": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000),
"connector_id": "calendar_elected",
"sync_id": "4c9a03d6-eded-4422-a46a-163266e58243",
"message_event": "sync_start",
"message_data": None,
"time_stamp": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000),
},
{
"time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 30, 345000),
"connector_id": "calendar_elected",
"sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834",
"message_event": "sync_start",
"message_data": None,
"time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 30, 345000),
},
{
"time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 55, 401000),
"connector_id": "calendar_elected",
"sync_id": "63c2fc85-600b-455f-9ba0-f576522465be",
"message_event": "sync_start",
"message_data": None,
"time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 55, 401000),
},
{
"time_stamp": datetime.datetime(2023, 10, 3, 14, 37, 5, 403000),
"connector_id": "calendar_elected",
"sync_id": "e773e1e9-c791-46f4-894f-8ch9b3dfc832",
"message_event": "sync_start",
"message_data": None,
"time_stamp": datetime.datetime(2023, 10, 3, 14, 37, 5, 403000),
},
]
elif query == fivetran_log_query.get_sync_end_logs_query("calendar_elected"):
return [
{
"time_stamp": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000),
"connector_id": "calendar_elected",
"sync_id": "4c9a03d6-eded-4422-a46a-163266e58243",
"message_event": "sync_end",
"message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"',
"time_stamp": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000),
},
{
"time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 31, 512000),
"connector_id": "calendar_elected",
"sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834",
"message_event": "sync_end",
"message_data": '"{\\"reason\\":\\"Sync has been cancelled because of a user action in the dashboard.Standard Config updated.\\",\\"status\\":\\"CANCELED\\"}"',
"time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 31, 512000),
},
{
"time_stamp": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000),
"connector_id": "calendar_elected",
"sync_id": "63c2fc85-600b-455f-9ba0-f576522465be",
"message_event": "sync_end",
"message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"',
"time_stamp": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000),
},
{
"time_stamp": datetime.datetime(2023, 10, 3, 14, 37, 35, 478000),
"connector_id": "calendar_elected",
"sync_id": "e773e1e9-c791-46f4-894f-8ch9b3dfc832",
"message_event": "sync_end",
"message_data": None,
"time_stamp": datetime.datetime(2023, 10, 3, 14, 37, 35, 478000),
},
]
# Unreachable code
Expand Down

0 comments on commit 77045f9

Please sign in to comment.