Skip to content

Commit

Permalink
fix(ingest): use prev.ignored arg in DataProcessInstance
Browse files Browse the repository at this point in the history
The high-level method DataProcessInstance.emit_process_end() actually ignores  start_timestamp_millis, which can be passed further to calculate durationMillis instead of using None.
This change makes emitter use this argument and save durationMillis.
  • Loading branch information
obaltian committed May 19, 2024
1 parent a44a549 commit a038a82
Showing 1 changed file with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ def end_event_mcp(
:param result: (InstanceRunResult) the result of the run
:param result_type: (string) It identifies the system where the native result comes from like Airflow, Azkaban
:param attempt: (int) the attempt number of this execution
:param start_timestamp_millis: (Optional[int]) the start time of the execution in milliseconds
"""
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
Expand Down Expand Up @@ -215,12 +217,15 @@ def emit_process_end(
:param result_type: (string) It identifies the system where the native result comes from like Airflow, Azkaban
:param attempt: (int) the attempt number of this execution
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
:param start_timestamp_millis: (Optional[int]) the start time of the execution in milliseconds
"""
for mcp in self.end_event_mcp(
end_timestamp_millis=end_timestamp_millis,
result=result,
result_type=result_type,
attempt=attempt,
start_timestamp_millis=start_timestamp_millis,
):
self._emit_mcp(mcp, emitter, callback)

Expand Down

0 comments on commit a038a82

Please sign in to comment.