-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ingest/pipeline): catch pipeline exceptions #10753
fix(ingest/pipeline): catch pipeline exceptions #10753
Conversation
@@ -494,6 +492,10 @@ def run(self) -> None: | |||
self.final_status = "cancelled" | |||
logger.error("Caught error", exc_info=e) | |||
raise | |||
except Exception as exc: | |||
self.final_status = "pipeline_failure" | |||
logger.error("pipline run error: ", exc_info=exc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would've thought this log line would be redundant, since we log for any exception as part of entrypoints.py
Can you provide more details about this?
with cut off log in report.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ingestion Logs are put into report by self._notify_reporters_on_ingestion_completion() method in finally clause.
So if we don't log a pipeline exception before finally clause code block is called, exception that caused pipeline failure is not present in a report.
It can be tested by mangling host_port in any recipe and checking a report in ingestion page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about handling redundancy?
Should I wrap this exception up PipelineRunError and handle it in entrypoints.py without calling logger.exception(f"Command failed: {exc}") ?
I don't like import from pipeline to entrypoints, because there is intermediary module ingest_cli.
Another option is to cut tracebacks:
except Exception as exc:
self.final_status = "pipeline_failure"
logger.error("pipline run error: ", exc_info=exc.with_traceback(None))
raise exc from None
WalkthroughSignificant updates were made to the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Pipeline
participant Logger
User->>Pipeline: Start pipeline
Pipeline->>Pipeline: Set status to PipelineStatus.UNKNOWN
Pipeline->>+Logger: Log initial status
Pipeline->>Pipeline: Perform tasks
alt Successful completion
Pipeline->>Pipeline: Set status to PipelineStatus.COMPLETED
else Exception raised
Pipeline->>Pipeline: Set status to PipelineStatus.PIPELINE_ERROR
end
Pipeline->>Pipeline: Handle specific exceptions (set status to CANCELLED)
Pipeline->>-Logger: Log final status
Logger->>User: Provide status update
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
@pie1nthesky made some tweaks to this, hopefully it does what you're looking for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Outside diff range and nitpick comments (1)
metadata-ingestion/src/datahub/ingestion/run/pipeline.py (1)
Line range hint
541-543
: Refactor: Simplify Boolean ExpressionInstead of using a ternary operation for a boolean result, use the
bool
function directly for clarity.- has_errors: bool = ( - True - if self.source.get_report().failures or self.sink.get_report().failures - else False - ) + has_errors: bool = bool(self.source.get_report().failures or self.sink.get_report().failures)
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- metadata-ingestion/src/datahub/ingestion/run/pipeline.py (7 hunks)
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/run/pipeline.py
98-99: Use a single
if
statement instead of nestedif
statements(SIM102)
541-543: Use
bool(...)
instead ofTrue if ... else False
Replace with `bool(...)
(SIM210)
class PipelineStatus(enum.Enum): | ||
UNKNOWN = enum.auto() | ||
COMPLETED = enum.auto() | ||
PIPELINE_ERROR = enum.auto() | ||
CANCELLED = enum.auto() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor: Simplify Enum Declaration
The use of enum.auto()
is great for avoiding manual assignment of values, but it results in the enum values being integers, which might not be the expected behavior since the original status values were strings. Consider explicitly setting the enum values to their string counterparts.
class PipelineStatus(enum.Enum):
- UNKNOWN = enum.auto()
- COMPLETED = enum.auto()
- PIPELINE_ERROR = enum.auto()
- CANCELLED = enum.auto()
+ UNKNOWN = "unknown"
+ COMPLETED = "completed"
+ PIPELINE_ERROR = "pipeline_error"
+ CANCELLED = "cancelled"
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
class PipelineStatus(enum.Enum): | |
UNKNOWN = enum.auto() | |
COMPLETED = enum.auto() | |
PIPELINE_ERROR = enum.auto() | |
CANCELLED = enum.auto() | |
class PipelineStatus(enum.Enum): | |
UNKNOWN = "unknown" | |
COMPLETED = "completed" | |
PIPELINE_ERROR = "pipeline_error" | |
CANCELLED = "cancelled" |
if self.final_status == PipelineStatus.CANCELLED | ||
else "FAILURE" | ||
if self.has_failures() | ||
else "SUCCESS" | ||
if self.final_status == "completed" | ||
if self.final_status == PipelineStatus.COMPLETED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve Readability: Simplify Conditional Logic
The nested ternary operators make this code hard to read. Consider refactoring this to use a more straightforward conditional structure.
- if self.final_status == PipelineStatus.CANCELLED
- else "FAILURE"
- if self.has_failures()
- else "SUCCESS"
- if self.final_status == PipelineStatus.COMPLETED
- else "UNKNOWN",
+ status = "UNKNOWN"
+ if self.final_status == PipelineStatus.CANCELLED:
+ status = "CANCELLED"
+ elif self.has_failures():
+ status = "FAILURE"
+ elif self.final_status == PipelineStatus.COMPLETED:
+ status = "SUCCESS"
+ report=self._get_structured_report(),
+ ctx=self.ctx,
Committable suggestion was skipped due to low confidence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- .github/workflows/docker-unified.yml (2 hunks)
Additional comments not posted (2)
.github/workflows/docker-unified.yml (2)
799-799
: Approval for Timeout AdditionAdding a 15-minute timeout to the job step is a good practice. It helps in managing resources efficiently and prevents potential issues with jobs that may hang or stall.
1026-1026
: Approval for Docker Container RenamingRenaming the Docker container to
datahub-datahub-upgrade-1
enhances clarity in identifying related logs and reports, which is beneficial for troubleshooting and monitoring.
This reverts commit 008e2ec.
@hsheth2 I would go with split traceback like:
No redundancy, no hacks, but status and logs are available in report. |
@pie1nthesky it's a bit more tricky than that - I want the logs in the UI to closely match whatever is printed to the CLI. The reporting code you linked to only works for CLI ingestion, but UI-driven ingestion is only based on the stdout/stderr logs. Additionally, I wanted to change it so that pipeline.run() to never throws. Finally, I want the error to show up somewhere in the structured report (to help with #10790). |
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
For now unhandled exception are not reported properly.
When pipeline fails with e.g. 'Connection timeout' exception during source processing,
pipeline exits with final_status = 'unknown' and with cut off log in report.
That makes impossible to troubleshoot ingestion issues from ingestion report page.
Checklist
Summary by CodeRabbit
New Features
Chores