diff --git a/airbyte-ci/connectors/pipelines/pipelines/bases.py b/airbyte-ci/connectors/pipelines/pipelines/bases.py index 31a5ed8be91f..c69e6176f4a8 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/bases.py +++ b/airbyte-ci/connectors/pipelines/pipelines/bases.py @@ -18,12 +18,12 @@ import anyio import asyncer from anyio import Path -from pipelines.actions import remote_storage -from pipelines.consts import GCS_PUBLIC_DOMAIN, LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH -from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result, slugify from connector_ops.utils import console from dagger import Container, DaggerError, QueryError from jinja2 import Environment, PackageLoader, select_autoescape +from pipelines.actions import remote_storage +from pipelines.consts import GCS_PUBLIC_DOMAIN, LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH +from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result, slugify from rich.console import Group from rich.panel import Panel from rich.style import Style @@ -102,6 +102,9 @@ class Step(ABC): title: ClassVar[str] max_retries: ClassVar[int] = 0 should_log: ClassVar[bool] = True + # The max duration of a step run. If the step run for more than this duration it will be considered as timed out. + # The default of 5 hours is arbitrary and can be changed if needed. + max_duration: ClassVar[timedelta] = timedelta(hours=5) def __init__(self, context: PipelineContext) -> None: # noqa D107 self.context = context @@ -125,7 +128,8 @@ def logger(self) -> logging.Logger: disabled_logger.disabled = True return disabled_logger - async def log_progress(self, completion_event) -> None: + async def log_progress(self, completion_event: anyio.Event) -> None: + """Log the step progress every 30 seconds until the step is done.""" while not completion_event.is_set(): duration = datetime.utcnow() - self.started_at elapsed_seconds = duration.total_seconds() @@ -133,10 +137,18 @@ async def log_progress(self, completion_event) -> None: self.logger.info(f"⏳ Still running {self.title}... (duration: {format_duration(duration)})") await anyio.sleep(1) - async def run_with_completion(self, completion_event, *args, **kwargs) -> StepResult: - result = await self._run(*args, **kwargs) - completion_event.set() - return result + async def run_with_completion(self, completion_event: anyio.Event, *args, **kwargs) -> StepResult: + """Run the step with a timeout and set the completion event when the step is done.""" + try: + with anyio.fail_after(self.max_duration.total_seconds()): + result = await self._run(*args, **kwargs) + completion_event.set() + return result + except TimeoutError: + self.retry_count = self.max_retries + 1 + self.logger.error(f"🚨 {self.title} timed out after {self.max_duration}. No additional retry will happen.") + completion_event.set() + return self._get_timed_out_step_result() async def run(self, *args, **kwargs) -> StepResult: """Public method to run the step. It output a step result. @@ -223,6 +235,13 @@ async def get_step_result(self, container: Container) -> StepResult: output_artifact=container, ) + def _get_timed_out_step_result(self) -> StepResult: + return StepResult( + self, + StepStatus.FAILURE, + stdout=f"Timed out after the max duration of {format_duration(self.max_duration)}. Please checkout the Dagger logs to see what happened.", + ) + class PytestStep(Step, ABC): """An abstract class to run pytest tests and evaluate success or failure according to pytest logs.""" diff --git a/airbyte-ci/connectors/pipelines/pipelines/tests/python_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/tests/python_connectors.py index 8baf751d4b6e..ccbd6d865fc0 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/tests/python_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/tests/python_connectors.py @@ -4,9 +4,11 @@ """This module groups steps made to run tests for a specific Python connector given a test context.""" +from datetime import timedelta from typing import List import asyncer +from dagger import Container from pipelines.actions import environments, secrets from pipelines.bases import Step, StepResult, StepStatus from pipelines.builds import LOCAL_BUILD_PLATFORM @@ -15,7 +17,6 @@ from pipelines.helpers.steps import run_steps from pipelines.tests.common import AcceptanceTests, PytestStep from pipelines.utils import export_container_to_tarball -from dagger import Container class CodeFormatChecks(Step): @@ -58,6 +59,7 @@ class ConnectorPackageInstall(Step): """A step to install the Python connector package in a container.""" title = "Connector package install" + max_duration = timedelta(minutes=10) max_retries = 3 async def _run(self) -> StepResult: diff --git a/airbyte-ci/connectors/pipelines/tests/conftest.py b/airbyte-ci/connectors/pipelines/tests/conftest.py new file mode 100644 index 000000000000..192c181501f6 --- /dev/null +++ b/airbyte-ci/connectors/pipelines/tests/conftest.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +import dagger +import pytest +import requests + + +@pytest.fixture(scope="session") +def anyio_backend(): + return "asyncio" + + +@pytest.fixture(scope="session") +async def dagger_client(): + async with dagger.Connection() as client: + yield client + + +@pytest.fixture(scope="session") +def oss_registry(): + response = requests.get("https://connectors.airbyte.com/files/registries/v0/oss_registry.json") + response.raise_for_status() + return response.json() diff --git a/airbyte-ci/connectors/pipelines/tests/test_bases.py b/airbyte-ci/connectors/pipelines/tests/test_bases.py new file mode 100644 index 000000000000..07daec00e57a --- /dev/null +++ b/airbyte-ci/connectors/pipelines/tests/test_bases.py @@ -0,0 +1,41 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +from datetime import timedelta + +import anyio +import pytest +from pipelines import bases + +pytestmark = [ + pytest.mark.anyio, +] + + +class TestStep: + class DummyStep(bases.Step): + title = "Dummy step" + max_retries = 3 + max_duration = timedelta(seconds=2) + + async def _run(self, run_duration: timedelta) -> bases.StepResult: + await anyio.sleep(run_duration.total_seconds()) + return bases.StepResult(self, bases.StepStatus.SUCCESS) + + @pytest.fixture + def test_context(self, mocker): + return mocker.Mock(secrets_to_mask=[]) + + async def test_run_with_timeout(self, test_context): + step = self.DummyStep(test_context) + step_result = await step.run(run_duration=step.max_duration - timedelta(seconds=1)) + assert step_result.status == bases.StepStatus.SUCCESS + assert step.retry_count == 0 + + step_result = await step.run(run_duration=step.max_duration + timedelta(seconds=1)) + timed_out_step_result = step._get_timed_out_step_result() + assert step_result.status == timed_out_step_result.status + assert step_result.stdout == timed_out_step_result.stdout + assert step_result.stderr == timed_out_step_result.stderr + assert step_result.output_artifact == timed_out_step_result.output_artifact + assert step.retry_count == step.max_retries + 1