Skip to content

Commit

Permalink
fix status aspects
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Oct 30, 2023
1 parent f17aaf2 commit af11bcc
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 28 deletions.
25 changes: 15 additions & 10 deletions metadata-ingestion/src/datahub/api/entities/datajob/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ def generate_tags_aspect(self) -> Iterable[GlobalTagsClass]:
)
return [tags]

def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
def generate_mcp(
self, materialize_iolets: bool = True
) -> Iterable[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataJobInfoClass(
Expand All @@ -113,7 +115,9 @@ def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
)
yield mcp

yield from self.generate_data_input_output_mcp()
yield from self.generate_data_input_output_mcp(
materialize_iolets=materialize_iolets
)

for owner in self.generate_ownership_aspect():
mcp = MetadataChangeProposalWrapper(
Expand Down Expand Up @@ -144,7 +148,9 @@ def emit(
for mcp in self.generate_mcp():
emitter.emit(mcp, callback)

def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
def generate_data_input_output_mcp(
self, materialize_iolets: bool
) -> Iterable[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataJobInputOutputClass(
Expand All @@ -157,10 +163,9 @@ def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapp
yield mcp

# Force entity materialization
for iolet in self.inlets + self.outlets:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)

yield mcp
if materialize_iolets:
for iolet in self.inlets + self.outlets:
yield MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,10 @@ def emit_process_end(
self._emit_mcp(mcp, emitter, callback)

def generate_mcp(
self, created_ts_millis: Optional[int] = None
self, created_ts_millis: Optional[int] = None, materialize_iolets: bool = True
) -> Iterable[MetadataChangeProposalWrapper]:
"""
Generates mcps from the object
:rtype: Iterable[MetadataChangeProposalWrapper]
"""
"""Generates mcps from the object"""

mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataProcessInstanceProperties(
Expand Down Expand Up @@ -253,7 +251,7 @@ def generate_mcp(
)
yield mcp

yield from self.generate_inlet_outlet_mcp()
yield from self.generate_inlet_outlet_mcp(materialize_iolets=materialize_iolets)

@staticmethod
def _emit_mcp(
Expand Down Expand Up @@ -329,7 +327,9 @@ def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance":
dpi._template_object = dataflow
return dpi

def generate_inlet_outlet_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
def generate_inlet_outlet_mcp(
self, materialize_iolets: bool
) -> Iterable[MetadataChangeProposalWrapper]:
if self.inlets:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
Expand All @@ -349,10 +349,9 @@ def generate_inlet_outlet_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
yield mcp

# Force entity materialization
for iolet in self.inlets + self.outlets:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)

yield mcp
if materialize_iolets:
for iolet in self.inlets + self.outlets:
yield MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)
4 changes: 3 additions & 1 deletion metadata-ingestion/src/datahub/emitter/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def from_obj_require_wrapper(
return mcp

def as_workunit(
self, *, treat_errors_as_warnings: bool = False
self, *, treat_errors_as_warnings: bool = False, is_primary_source: bool = True
) -> "MetadataWorkUnit":
from datahub.ingestion.api.workunit import MetadataWorkUnit

Expand All @@ -254,10 +254,12 @@ def as_workunit(
id=f"{self.entityUrn}-{self.aspectName}-{ts}",
mcp=self,
treat_errors_as_warnings=treat_errors_as_warnings,
is_primary_source=is_primary_source,
)

return MetadataWorkUnit(
id=f"{self.entityUrn}-{self.aspectName}",
mcp=self,
treat_errors_as_warnings=treat_errors_as_warnings,
is_primary_source=is_primary_source,
)
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata.schema_classes import StatusClass
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn

Expand Down Expand Up @@ -181,7 +182,9 @@ def _get_dpi_workunit(
return []
result = status_result_map[job.status]
start_timestamp_millis = job.start_time * 1000
for mcp in dpi.generate_mcp(created_ts_millis=start_timestamp_millis):
for mcp in dpi.generate_mcp(
created_ts_millis=start_timestamp_millis, materialize_iolets=False
):
yield mcp.as_workunit()
for mcp in dpi.start_event_mcp(start_timestamp_millis):
yield mcp.as_workunit()
Expand All @@ -203,8 +206,13 @@ def _get_connector_workunit(

# Map Fivetran's connector entity with Datahub's datajob entity
datajob = self._generate_datajob_from_connector(connector)
for mcp in datajob.generate_mcp():
yield mcp.as_workunit()
for mcp in datajob.generate_mcp(materialize_iolets=True):
if mcp.entityType == "dataset" and isinstance(mcp.aspect, StatusClass):
# While we "materialize" the referenced datasets, we don't want them
# to be tracked by stateful ingestion.
yield mcp.as_workunit(is_primary_source=False)
else:
yield mcp.as_workunit()

# Map Fivetran's job/sync history entity with Datahub's data process entity
for job in connector.jobs:
Expand Down

0 comments on commit af11bcc

Please sign in to comment.