From a038a82cfda378e6a52a495fff89b23775a494be Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Sun, 19 May 2024 18:35:33 +0200 Subject: [PATCH] fix(ingest): use prev.ignored arg in DataProcessInstance The high-level method DataProcessInstance.emit_process_end() actually ignores start_timestamp_millis, which can be passed further to calculate durationMillis instead of using None. This change makes emitter use this argument and save durationMillis. --- .../datahub/api/entities/dataprocess/dataprocess_instance.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py index b9029eaa94b20..fa5b5bd6a50fd 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py +++ b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py @@ -176,6 +176,8 @@ def end_event_mcp( :param result: (InstanceRunResult) the result of the run :param result_type: (string) It identifies the system where the native result comes from like Airflow, Azkaban :param attempt: (int) the attempt number of this execution + :param start_timestamp_millis: (Optional[int]) the start time of the execution in milliseconds + """ mcp = MetadataChangeProposalWrapper( entityUrn=str(self.urn), @@ -215,12 +217,15 @@ def emit_process_end( :param result_type: (string) It identifies the system where the native result comes from like Airflow, Azkaban :param attempt: (int) the attempt number of this execution :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used + :param start_timestamp_millis: (Optional[int]) the start time of the execution in milliseconds + """ for mcp in self.end_event_mcp( end_timestamp_millis=end_timestamp_millis, result=result, result_type=result_type, attempt=attempt, + start_timestamp_millis=start_timestamp_millis, ): self._emit_mcp(mcp, emitter, callback)