Skip to content

Commit

Permalink
MINOR - Better handling of Ingestion Pipeline Status (#14792)
Browse files Browse the repository at this point in the history
* MINOR - Better handling of Ingestion Pipeline Status

* format

* format
  • Loading branch information
pmbrull authored and harshach committed Jan 24, 2024
1 parent 993dc56 commit d8555fc
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 22 deletions.
2 changes: 1 addition & 1 deletion ingestion/src/metadata/workflow/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def raise_from_status_internal(self, raise_warnings=False):
and self.calculate_success() < SUCCESS_THRESHOLD_VALUE
):
raise WorkflowExecutionError(
f"{self.source.name} reported errors: {Summary.from_step(self.source)}"
f"{self.runner.name} reported errors: {Summary.from_step(self.runner)}"
)

if raise_warnings and self.runner.get_status().warnings:
Expand Down
49 changes: 28 additions & 21 deletions ingestion/src/metadata/workflow/workflow_status_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""
Add methods to the workflows for updating the IngestionPipeline status
"""
import traceback
import uuid
from datetime import datetime
from typing import Optional, Tuple
Expand All @@ -29,8 +30,10 @@
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.api.step import Step, Summary
from metadata.ingestion.api.steps import Source
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ometa_logger

logger = ometa_logger()

SUCCESS_THRESHOLD_VALUE = 90

Expand All @@ -49,8 +52,6 @@ class WorkflowStatusMixin:
_start_ts: int
ingestion_pipeline: Optional[IngestionPipeline]

# All workflows require a source as a first step
source: Source
# All workflows execute a series of steps, aside from the source
steps: Tuple[Step]

Expand Down Expand Up @@ -86,24 +87,30 @@ def set_ingestion_pipeline_status(
Method to set the pipeline status of current ingestion pipeline
"""

# if we don't have a related Ingestion Pipeline FQN, no status is set.
if self.config.ingestionPipelineFQN and self.ingestion_pipeline:
pipeline_status = self.metadata.get_pipeline_status(
self.config.ingestionPipelineFQN, self.run_id
)
if not pipeline_status:
# We need to crete the status
pipeline_status = self._new_pipeline_status(state)
else:
# if workflow is ended then update the end date in status
pipeline_status.endDate = datetime.now().timestamp() * 1000
pipeline_status.pipelineState = state

pipeline_status.status = (
ingestion_status if ingestion_status else pipeline_status.status
)
self.metadata.create_or_update_pipeline_status(
self.config.ingestionPipelineFQN, pipeline_status
try:
# if we don't have a related Ingestion Pipeline FQN, no status is set.
if self.config.ingestionPipelineFQN and self.ingestion_pipeline:
pipeline_status = self.metadata.get_pipeline_status(
self.ingestion_pipeline.fullyQualifiedName.__root__, self.run_id
)
if not pipeline_status:
# We need to crete the status
pipeline_status = self._new_pipeline_status(state)
else:
# if workflow is ended then update the end date in status
pipeline_status.endDate = datetime.now().timestamp() * 1000
pipeline_status.pipelineState = state

pipeline_status.status = (
ingestion_status if ingestion_status else pipeline_status.status
)
self.metadata.create_or_update_pipeline_status(
self.ingestion_pipeline.fullyQualifiedName.__root__, pipeline_status
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(
f"Unhandled error trying to update Ingestion Pipeline status [{err}]"
)

def raise_from_status(self, raise_warnings=False):
Expand Down

0 comments on commit d8555fc

Please sign in to comment.