Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): fix bug in auto_status_aspect #6705

Merged
merged 2 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions metadata-ingestion/src/datahub/utilities/source_helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Iterable, Optional, Set
from typing import Callable, Iterable, Optional, Set, Union

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand All @@ -9,6 +9,16 @@
from datahub.utilities.urns.urn import guess_entity_type


def auto_workunit(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use this anywhere other than the test?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not right now, but i'm planning on starting to use it elsewhere so I figured we should add it

stream: Iterable[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]]
) -> Iterable[MetadataWorkUnit]:
for item in stream:
if isinstance(item, MetadataChangeEventClass):
yield MetadataWorkUnit(id=f"{item.proposedSnapshot.urn}/mce", mce=item)
else:
yield item.as_workunit()


def auto_status_aspect(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
Expand All @@ -20,6 +30,7 @@ def auto_status_aspect(
status_urns: Set[str] = set()
for wu in stream:
urn = wu.get_urn()
all_urns.add(urn)
if isinstance(wu.metadata, MetadataChangeEventClass):
if any(
isinstance(aspect, StatusClass)
Expand All @@ -34,7 +45,7 @@ def auto_status_aspect(

yield wu

for urn in all_urns - status_urns:
for urn in sorted(all_urns - status_urns):
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=False),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,5 +397,75 @@
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(looker,dashboard_elements.10)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(looker,dashboard_elements.2)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Dimension",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Measure",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Temporal",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -273,5 +273,61 @@
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(looker,dashboard_elements.2)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Dimension",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Measure",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Temporal",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test"
}
}
]
Loading