Skip to content

Commit

Permalink
retry DaggerError 3 times if error
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Aug 4, 2023
1 parent 4b4de02 commit b57eb84
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 20 deletions.
55 changes: 35 additions & 20 deletions airbyte-ci/connectors/pipelines/pipelines/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import asyncer
from anyio import Path
from connector_ops.utils import Connector, console
from dagger import Container, DaggerError, QueryError
from dagger import Container, DaggerError
from jinja2 import Environment, PackageLoader, select_autoescape
from pipelines import sentry_utils
from pipelines.actions import remote_storage
Expand Down Expand Up @@ -91,13 +91,16 @@ class Step(ABC):

title: ClassVar[str]
max_retries: ClassVar[int] = 0
max_dagger_error_retries: ClassVar[int] = 3
should_log: ClassVar[bool] = True
success_exit_code: ClassVar[int] = 0
skipped_exit_code: ClassVar[int] = None
# The max duration of a step run. If the step run for more than this duration it will be considered as timed out.
# The default of 5 hours is arbitrary and can be changed if needed.
max_duration: ClassVar[timedelta] = timedelta(hours=5)

retry_delay = timedelta(seconds=10)

def __init__(self, context: PipelineContext) -> None: # noqa D107
self.context = context
self.retry_count = 0
Expand Down Expand Up @@ -155,28 +158,39 @@ async def run(self, *args, **kwargs) -> StepResult:
Returns:
StepResult: The step result following the step run.
"""
self.logger.info(f"🚀 Start {self.title}")
self.started_at = datetime.utcnow()
completion_event = anyio.Event()
try:
self.started_at = datetime.utcnow()
self.logger.info(f"🚀 Start {self.title}")
completion_event = anyio.Event()
async with asyncer.create_task_group() as task_group:
soon_result = task_group.soonify(self.run_with_completion)(completion_event, *args, **kwargs)
task_group.soonify(self.log_progress)(completion_event)

result = soon_result.value

if result.status is StepStatus.FAILURE and self.retry_count <= self.max_retries and self.max_retries > 0:
self.retry_count += 1
await anyio.sleep(10)
self.logger.warn(f"Retry #{self.retry_count}.")
return await self.run(*args, **kwargs)
self.stopped_at = datetime.utcnow()
self.log_step_result(result)
return result
except (DaggerError, QueryError) as e:
self.stopped_at = datetime.utcnow()
self.logger.error(f"Dagger error on step {self.title}: {e}")
return StepResult(self, StepStatus.FAILURE, stderr=str(e))
step_result = soon_result.value
except DaggerError as e:
self.logger.error("Step failed with an unexpected dagger error", exc_info=e)
step_result = StepResult(self, StepStatus.FAILURE, stderr=str(e), exc_info=e)

self.stopped_at = datetime.utcnow()
self.log_step_result(step_result)

lets_retry = self.should_retry(step_result)
step_result = await self.retry(step_result, *args, **kwargs) if lets_retry else step_result
return step_result

def should_retry(self, step_result: StepResult) -> bool:
"""Return True if the step should be retried."""
if step_result.status is not StepStatus.FAILURE:
return False
max_retries = self.max_dagger_error_retries if step_result.exc_info else self.max_retries
return self.retry_count < max_retries and max_retries > 0

async def retry(self, step_result, *args, **kwargs) -> StepResult:
self.retry_count += 1
self.logger.warn(
f"Failed with error: {step_result.stderr}. Retry #{self.retry_count} in {self.retry_delay.total_seconds()} seconds..."
)
await anyio.sleep(self.retry_delay.total_seconds())
return await self.run(*args, **kwargs)

def log_step_result(self, result: StepResult) -> None:
"""Log the step result.
Expand All @@ -186,7 +200,7 @@ def log_step_result(self, result: StepResult) -> None:
"""
duration = format_duration(self.run_duration)
if result.status is StepStatus.FAILURE:
self.logger.error(f"{result.status.get_emoji()} failed (duration: {duration})")
self.logger.info(f"{result.status.get_emoji()} failed (duration: {duration})")
if result.status is StepStatus.SKIPPED:
self.logger.info(f"{result.status.get_emoji()} was skipped (duration: {duration})")
if result.status is StepStatus.SUCCESS:
Expand Down Expand Up @@ -322,6 +336,7 @@ class StepResult:
stderr: Optional[str] = None
stdout: Optional[str] = None
output_artifact: Any = None
exc_info: Optional[Exception] = None

def __repr__(self) -> str: # noqa D105
return f"{self.step.title}: {self.status.value}"
Expand Down
37 changes: 37 additions & 0 deletions airbyte-ci/connectors/pipelines/tests/test_bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import anyio
import pytest
from dagger import DaggerError
from pipelines import bases

pytestmark = [
Expand Down Expand Up @@ -40,6 +41,42 @@ async def test_run_with_timeout(self, test_context):
assert step_result.output_artifact == timed_out_step_result.output_artifact
assert step.retry_count == step.max_retries + 1

@pytest.mark.parametrize(
"step_status, exc_info, max_retries, max_dagger_error_retries, expect_retry",
[
(bases.StepStatus.SUCCESS, None, 0, 0, False),
(bases.StepStatus.SUCCESS, None, 3, 0, False),
(bases.StepStatus.SUCCESS, None, 0, 3, False),
(bases.StepStatus.SUCCESS, None, 3, 3, False),
(bases.StepStatus.SKIPPED, None, 0, 0, False),
(bases.StepStatus.SKIPPED, None, 3, 0, False),
(bases.StepStatus.SKIPPED, None, 0, 3, False),
(bases.StepStatus.SKIPPED, None, 3, 3, False),
(bases.StepStatus.FAILURE, DaggerError(), 0, 0, False),
(bases.StepStatus.FAILURE, DaggerError(), 0, 3, True),
(bases.StepStatus.FAILURE, None, 0, 0, False),
(bases.StepStatus.FAILURE, None, 0, 3, False),
(bases.StepStatus.FAILURE, None, 3, 0, True),
],
)
async def test_run_with_retries(self, mocker, test_context, step_status, exc_info, max_retries, max_dagger_error_retries, expect_retry):
step = self.DummyStep(test_context)
step.max_dagger_error_retries = max_dagger_error_retries
step.max_retries = max_retries
step.max_duration = timedelta(seconds=60)
step.retry_delay = timedelta(seconds=0)
step._run = mocker.AsyncMock(
side_effect=[bases.StepResult(step, step_status, exc_info=exc_info)] * (max(max_dagger_error_retries, max_retries) + 1)
)

step_result = await step.run()

if expect_retry:
assert step.retry_count > 0
else:
assert step.retry_count == 0
assert step_result.status == step_status


class TestReport:
@pytest.fixture
Expand Down

0 comments on commit b57eb84

Please sign in to comment.