Skip to content

Commit

Permalink
connectors-ci: implement per step time out (#28771)
Browse files Browse the repository at this point in the history
* connectors-ci: implement per step time out

* set default timeout to 5hours

* DEMO - to revert

* Revert "DEMO - to revert"

This reverts commit 2f4fd39.
  • Loading branch information
alafanechere authored Jul 27, 2023
1 parent 9f6963c commit 9cee15b
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 9 deletions.
35 changes: 27 additions & 8 deletions airbyte-ci/connectors/pipelines/pipelines/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import anyio
import asyncer
from anyio import Path
from pipelines.actions import remote_storage
from pipelines.consts import GCS_PUBLIC_DOMAIN, LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH
from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result, slugify
from connector_ops.utils import console
from dagger import Container, DaggerError, QueryError
from jinja2 import Environment, PackageLoader, select_autoescape
from pipelines.actions import remote_storage
from pipelines.consts import GCS_PUBLIC_DOMAIN, LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH
from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result, slugify
from rich.console import Group
from rich.panel import Panel
from rich.style import Style
Expand Down Expand Up @@ -102,6 +102,9 @@ class Step(ABC):
title: ClassVar[str]
max_retries: ClassVar[int] = 0
should_log: ClassVar[bool] = True
# The max duration of a step run. If the step run for more than this duration it will be considered as timed out.
# The default of 5 hours is arbitrary and can be changed if needed.
max_duration: ClassVar[timedelta] = timedelta(hours=5)

def __init__(self, context: PipelineContext) -> None: # noqa D107
self.context = context
Expand All @@ -125,18 +128,27 @@ def logger(self) -> logging.Logger:
disabled_logger.disabled = True
return disabled_logger

async def log_progress(self, completion_event) -> None:
async def log_progress(self, completion_event: anyio.Event) -> None:
"""Log the step progress every 30 seconds until the step is done."""
while not completion_event.is_set():
duration = datetime.utcnow() - self.started_at
elapsed_seconds = duration.total_seconds()
if elapsed_seconds > 30 and round(elapsed_seconds) % 30 == 0:
self.logger.info(f"⏳ Still running {self.title}... (duration: {format_duration(duration)})")
await anyio.sleep(1)

async def run_with_completion(self, completion_event, *args, **kwargs) -> StepResult:
result = await self._run(*args, **kwargs)
completion_event.set()
return result
async def run_with_completion(self, completion_event: anyio.Event, *args, **kwargs) -> StepResult:
"""Run the step with a timeout and set the completion event when the step is done."""
try:
with anyio.fail_after(self.max_duration.total_seconds()):
result = await self._run(*args, **kwargs)
completion_event.set()
return result
except TimeoutError:
self.retry_count = self.max_retries + 1
self.logger.error(f"🚨 {self.title} timed out after {self.max_duration}. No additional retry will happen.")
completion_event.set()
return self._get_timed_out_step_result()

async def run(self, *args, **kwargs) -> StepResult:
"""Public method to run the step. It output a step result.
Expand Down Expand Up @@ -223,6 +235,13 @@ async def get_step_result(self, container: Container) -> StepResult:
output_artifact=container,
)

def _get_timed_out_step_result(self) -> StepResult:
return StepResult(
self,
StepStatus.FAILURE,
stdout=f"Timed out after the max duration of {format_duration(self.max_duration)}. Please checkout the Dagger logs to see what happened.",
)


class PytestStep(Step, ABC):
"""An abstract class to run pytest tests and evaluate success or failure according to pytest logs."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

"""This module groups steps made to run tests for a specific Python connector given a test context."""

from datetime import timedelta
from typing import List

import asyncer
from dagger import Container
from pipelines.actions import environments, secrets
from pipelines.bases import Step, StepResult, StepStatus
from pipelines.builds import LOCAL_BUILD_PLATFORM
Expand All @@ -15,7 +17,6 @@
from pipelines.helpers.steps import run_steps
from pipelines.tests.common import AcceptanceTests, PytestStep
from pipelines.utils import export_container_to_tarball
from dagger import Container


class CodeFormatChecks(Step):
Expand Down Expand Up @@ -58,6 +59,7 @@ class ConnectorPackageInstall(Step):
"""A step to install the Python connector package in a container."""

title = "Connector package install"
max_duration = timedelta(minutes=10)
max_retries = 3

async def _run(self) -> StepResult:
Expand Down
24 changes: 24 additions & 0 deletions airbyte-ci/connectors/pipelines/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import dagger
import pytest
import requests


@pytest.fixture(scope="session")
def anyio_backend():
return "asyncio"


@pytest.fixture(scope="session")
async def dagger_client():
async with dagger.Connection() as client:
yield client


@pytest.fixture(scope="session")
def oss_registry():
response = requests.get("https://connectors.airbyte.com/files/registries/v0/oss_registry.json")
response.raise_for_status()
return response.json()
41 changes: 41 additions & 0 deletions airbyte-ci/connectors/pipelines/tests/test_bases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from datetime import timedelta

import anyio
import pytest
from pipelines import bases

pytestmark = [
pytest.mark.anyio,
]


class TestStep:
class DummyStep(bases.Step):
title = "Dummy step"
max_retries = 3
max_duration = timedelta(seconds=2)

async def _run(self, run_duration: timedelta) -> bases.StepResult:
await anyio.sleep(run_duration.total_seconds())
return bases.StepResult(self, bases.StepStatus.SUCCESS)

@pytest.fixture
def test_context(self, mocker):
return mocker.Mock(secrets_to_mask=[])

async def test_run_with_timeout(self, test_context):
step = self.DummyStep(test_context)
step_result = await step.run(run_duration=step.max_duration - timedelta(seconds=1))
assert step_result.status == bases.StepStatus.SUCCESS
assert step.retry_count == 0

step_result = await step.run(run_duration=step.max_duration + timedelta(seconds=1))
timed_out_step_result = step._get_timed_out_step_result()
assert step_result.status == timed_out_step_result.status
assert step_result.stdout == timed_out_step_result.stdout
assert step_result.stderr == timed_out_step_result.stderr
assert step_result.output_artifact == timed_out_step_result.output_artifact
assert step.retry_count == step.max_retries + 1

0 comments on commit 9cee15b

Please sign in to comment.