Skip to content

Commit

Permalink
Add on_failure to handle cleanup during pipeline failure
Browse files Browse the repository at this point in the history
- Resolves #1639

Signed-off-by: Keshav Priyadarshi <git@keshav.space>
  • Loading branch information
keshav-space committed Nov 14, 2024
1 parent 297ab51 commit a566e87
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 0 deletions.
53 changes: 53 additions & 0 deletions vulnerabilities/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
import logging
from datetime import datetime
from datetime import timezone
from timeit import default_timer as timer
from traceback import format_exc as traceback_format_exc
from typing import Iterable

from aboutcode.pipeline import BasePipeline
from aboutcode.pipeline import LoopProgress
from aboutcode.pipeline import humanize_time

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.improver import MAX_CONFIDENCE
Expand All @@ -29,6 +31,57 @@
class VulnerableCodePipeline(BasePipeline):
pipeline_id = None # Unique Pipeline ID

def on_failure(self):
"""
Tasks to run in the event that pipeline execution fails.
Implement cleanup or other tasks that need to be performed
on pipeline failure, such as:
- Removing cloned repositories.
- Deleting downloaded archives.
"""
pass

def execute(self):
"""Execute each steps in the order defined on this pipeline class."""
self.log(f"Pipeline [{self.pipeline_name}] starting")

steps = self.pipeline_class.get_steps(groups=self.selected_groups)
steps_count = len(steps)
pipeline_start_time = timer()

for current_index, step in enumerate(steps, start=1):
step_name = step.__name__

if self.selected_steps and step_name not in self.selected_steps:
self.log(f"Step [{step_name}] skipped")
continue

self.set_current_step(f"{current_index}/{steps_count} {step_name}")
self.log(f"Step [{step_name}] starting")
step_start_time = timer()

try:
step(self)
except Exception as exception:
self.log("Pipeline failed")
on_failure_start_time = timer()
self.log(f"Running [on_failure] tasks")
self.on_failure()
on_failure_run_time = timer() - on_failure_start_time
self.log(f"Completed [on_failure] tasks in {humanize_time(on_failure_run_time)}")

return 1, self.output_from_exception(exception)

step_run_time = timer() - step_start_time
self.log(f"Step [{step_name}] completed in {humanize_time(step_run_time)}")

self.set_current_step("") # Reset the `current_step` field on completion
pipeline_run_time = timer() - pipeline_start_time
self.log(f"Pipeline completed in {humanize_time(pipeline_run_time)}")

return 0, ""

def log(self, message, level=logging.INFO):
"""Log the given `message` to the current module logger and execution_log."""
now_local = datetime.now(timezone.utc).astimezone()
Expand Down
3 changes: 3 additions & 0 deletions vulnerabilities/pipelines/gitlab_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def clean_downloads(self):
self.log(f"Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()


def parse_advisory_path(base_path: Path, file_path: Path) -> Tuple[str, str, str]:
"""
Expand Down
3 changes: 3 additions & 0 deletions vulnerabilities/pipelines/npm_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,6 @@ def clean_downloads(self):
if self.vcs_response:
self.log(f"Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()
3 changes: 3 additions & 0 deletions vulnerabilities/pipelines/pypa_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ def clean_downloads(self):
if self.vcs_response:
self.log(f"Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()

0 comments on commit a566e87

Please sign in to comment.