From d8555fc4c36784badc83251de146a905891684e2 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Tue, 23 Jan 2024 09:24:19 +0100 Subject: [PATCH] MINOR - Better handling of Ingestion Pipeline Status (#14792) * MINOR - Better handling of Ingestion Pipeline Status * format * format --- .../src/metadata/workflow/application.py | 2 +- .../workflow/workflow_status_mixin.py | 49 +++++++++++-------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/ingestion/src/metadata/workflow/application.py b/ingestion/src/metadata/workflow/application.py index 44aa6898181f..09ebd2461a5d 100644 --- a/ingestion/src/metadata/workflow/application.py +++ b/ingestion/src/metadata/workflow/application.py @@ -145,7 +145,7 @@ def raise_from_status_internal(self, raise_warnings=False): and self.calculate_success() < SUCCESS_THRESHOLD_VALUE ): raise WorkflowExecutionError( - f"{self.source.name} reported errors: {Summary.from_step(self.source)}" + f"{self.runner.name} reported errors: {Summary.from_step(self.runner)}" ) if raise_warnings and self.runner.get_status().warnings: diff --git a/ingestion/src/metadata/workflow/workflow_status_mixin.py b/ingestion/src/metadata/workflow/workflow_status_mixin.py index 1d0fa16a7911..c6d7e278e52d 100644 --- a/ingestion/src/metadata/workflow/workflow_status_mixin.py +++ b/ingestion/src/metadata/workflow/workflow_status_mixin.py @@ -11,6 +11,7 @@ """ Add methods to the workflows for updating the IngestionPipeline status """ +import traceback import uuid from datetime import datetime from typing import Optional, Tuple @@ -29,8 +30,10 @@ OpenMetadataWorkflowConfig, ) from metadata.ingestion.api.step import Step, Summary -from metadata.ingestion.api.steps import Source from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.logger import ometa_logger + +logger = ometa_logger() SUCCESS_THRESHOLD_VALUE = 90 @@ -49,8 +52,6 @@ class WorkflowStatusMixin: _start_ts: int ingestion_pipeline: Optional[IngestionPipeline] - # All workflows require a source as a first step - source: Source # All workflows execute a series of steps, aside from the source steps: Tuple[Step] @@ -86,24 +87,30 @@ def set_ingestion_pipeline_status( Method to set the pipeline status of current ingestion pipeline """ - # if we don't have a related Ingestion Pipeline FQN, no status is set. - if self.config.ingestionPipelineFQN and self.ingestion_pipeline: - pipeline_status = self.metadata.get_pipeline_status( - self.config.ingestionPipelineFQN, self.run_id - ) - if not pipeline_status: - # We need to crete the status - pipeline_status = self._new_pipeline_status(state) - else: - # if workflow is ended then update the end date in status - pipeline_status.endDate = datetime.now().timestamp() * 1000 - pipeline_status.pipelineState = state - - pipeline_status.status = ( - ingestion_status if ingestion_status else pipeline_status.status - ) - self.metadata.create_or_update_pipeline_status( - self.config.ingestionPipelineFQN, pipeline_status + try: + # if we don't have a related Ingestion Pipeline FQN, no status is set. + if self.config.ingestionPipelineFQN and self.ingestion_pipeline: + pipeline_status = self.metadata.get_pipeline_status( + self.ingestion_pipeline.fullyQualifiedName.__root__, self.run_id + ) + if not pipeline_status: + # We need to crete the status + pipeline_status = self._new_pipeline_status(state) + else: + # if workflow is ended then update the end date in status + pipeline_status.endDate = datetime.now().timestamp() * 1000 + pipeline_status.pipelineState = state + + pipeline_status.status = ( + ingestion_status if ingestion_status else pipeline_status.status + ) + self.metadata.create_or_update_pipeline_status( + self.ingestion_pipeline.fullyQualifiedName.__root__, pipeline_status + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error( + f"Unhandled error trying to update Ingestion Pipeline status [{err}]" ) def raise_from_status(self, raise_warnings=False):