Skip to content

Commit

Permalink
test cases for fivetran source added
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 committed Oct 10, 2023
1 parent 3f392f4 commit 0c295e2
Show file tree
Hide file tree
Showing 3 changed files with 997 additions and 1 deletion.
13 changes: 12 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ def auto_status_aspect(
For all entities that don't have a status aspect, add one with removed set to false.
"""

skip_entities: Set[str] = {"dataProcessInstance"}
all_urns: Set[str] = set()
status_urns: Set[str] = set()
skip_urns: Set[str] = set()
for wu in stream:
urn = wu.get_urn()
all_urns.add(urn)
Expand All @@ -90,9 +92,18 @@ def auto_status_aspect(
else:
raise ValueError(f"Unexpected type {type(wu.metadata)}")

if (
not isinstance(wu.metadata, MetadataChangeEventClass)
and wu.metadata.entityType in skip_entities
):
# If any entity does not support aspect 'status' then skip that entity from adding status aspect.
# Example like dataProcessInstance doesn't suppport status aspect.
# If not skipped gives error: java.lang.RuntimeException: Unknown aspect status for entity dataProcessInstance
skip_urns.add(urn)

yield wu

for urn in sorted(all_urns - status_urns):
for urn in sorted(all_urns - status_urns - skip_urns):
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=False),
Expand Down
Loading

0 comments on commit 0c295e2

Please sign in to comment.