diff --git a/airbyte-ci/connectors/pipelines/README.md b/airbyte-ci/connectors/pipelines/README.md index 20e7bdf6ad3e..cb91ce4b03d0 100644 --- a/airbyte-ci/connectors/pipelines/README.md +++ b/airbyte-ci/connectors/pipelines/README.md @@ -223,6 +223,9 @@ Test certified connectors: Test connectors changed on the current branch: `airbyte-ci connectors --modified test` +Run acceptance test only on the modified connectors, just run its full refresh tests: +`airbyte-ci connectors --modified test --only-step="acceptance" --acceptance.-k=test_full_refresh` + #### What it runs ```mermaid @@ -261,11 +264,12 @@ flowchart TD | Option | Multiple | Default value | Description | | ------------------------------------------------------- | -------- | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `--skip-step/-x` | True | | Skip steps by id e.g. `-x unit -x acceptance` | +| `--only-step/-k` | True | | Only run specific steps by id e.g. `-k unit -k acceptance` | | `--fail-fast` | False | False | Abort after any tests fail, rather than continuing to run additional tests. Use this setting to confirm a known bug is fixed (or not), or when you only require a pass/fail result. | | `--code-tests-only` | True | False | Skip any tests not directly related to code updates. For instance, metadata checks, version bump checks, changelog verification, etc. Use this setting to help focus on code quality during development. | | `--concurrent-cat` | False | False | Make CAT tests run concurrently using pytest-xdist. Be careful about source or destination API rate limits. | | `--.=` | True | | You can pass extra parameters for specific test steps. More details in the extra parameters section below | -| `--ci-requirements` | False | | | Output the CI requirements as a JSON payload. It is used to determine the CI runner to use. +| `--ci-requirements` | False | | | Output the CI requirements as a JSON payload. It is used to determine the CI runner to use. Note: @@ -539,6 +543,7 @@ E.G.: running `pytest` on a specific test folder: | Version | PR | Description | | ------- | ---------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------- | +| 3.4.0 | [#34276](https://github.com/airbytehq/airbyte/pull/34276) | Introduce `--only-step` option for connector tests. | | 3.3.0 | [#34218](https://github.com/airbytehq/airbyte/pull/34218) | Introduce `--ci-requirements` option for client defined CI runners. | | 3.2.0 | [#34050](https://github.com/airbytehq/airbyte/pull/34050) | Connector test steps can take extra parameters | | 3.1.3 | [#34136](https://github.com/airbytehq/airbyte/pull/34136) | Fix issue where dagger excludes were not being properly applied | diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py index 9d97f0c90a7b..245f6156228b 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py @@ -53,10 +53,19 @@ @click.option( "--skip-step", "-x", + "skip_steps", multiple=True, type=click.Choice([step_id.value for step_id in CONNECTOR_TEST_STEP_ID]), help="Skip a step by name. Can be used multiple times to skip multiple steps.", ) +@click.option( + "--only-step", + "-k", + "only_steps", + multiple=True, + type=click.Choice([step_id.value for step_id in CONNECTOR_TEST_STEP_ID]), + help="Only run specific step by name. Can be used multiple times to keep multiple steps.", +) @click.argument( "extra_params", nargs=-1, type=click.UNPROCESSED, callback=argument_parsing.build_extra_params_mapping(CONNECTOR_TEST_STEP_ID) ) @@ -66,7 +75,8 @@ async def test( code_tests_only: bool, fail_fast: bool, concurrent_cat: bool, - skip_step: List[str], + skip_steps: List[str], + only_steps: List[str], extra_params: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS], ) -> bool: """Runs a test pipeline for the selected connectors. @@ -74,6 +84,8 @@ async def test( Args: ctx (click.Context): The click context. """ + if only_steps and skip_steps: + raise click.UsageError("Cannot use both --only-step and --skip-step at the same time.") if ctx.obj["is_ci"]: fail_if_missing_docker_hub_creds(ctx) if ctx.obj["is_ci"] and ctx.obj["pull_request"] and ctx.obj["pull_request"].draft: @@ -89,7 +101,8 @@ async def test( run_step_options = RunStepOptions( fail_fast=fail_fast, - skip_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in skip_step], + skip_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in skip_steps], + keep_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in only_steps], step_params=extra_params, ) connectors_tests_contexts = [ diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/run_steps.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/run_steps.py index 8fb320d9fd5e..5db187af307c 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/run_steps.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/run_steps.py @@ -8,7 +8,7 @@ import inspect from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple, Union +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple, Union import anyio import asyncer @@ -27,16 +27,78 @@ class InvalidStepConfiguration(Exception): pass +def _get_dependency_graph(steps: STEP_TREE) -> Dict[str, List[str]]: + """ + Get the dependency graph of a step tree. + """ + dependency_graph: Dict[str, List[str]] = {} + for step in steps: + if isinstance(step, StepToRun): + dependency_graph[step.id] = step.depends_on + elif isinstance(step, list): + nested_dependency_graph = _get_dependency_graph(list(step)) + dependency_graph = {**dependency_graph, **nested_dependency_graph} + else: + raise Exception(f"Unexpected step type: {type(step)}") + + return dependency_graph + + +def _get_transitive_dependencies_for_step_id( + dependency_graph: Dict[str, List[str]], step_id: str, visited: Optional[Set[str]] = None +) -> List[str]: + """Get the transitive dependencies for a step id. + + Args: + dependency_graph (Dict[str, str]): The dependency graph to use. + step_id (str): The step id to get the transitive dependencies for. + visited (Optional[Set[str]], optional): The set of visited step ids. Defaults to None. + + Returns: + List[str]: List of transitive dependencies as step ids. + """ + if visited is None: + visited = set() + + if step_id not in visited: + visited.add(step_id) + + dependencies: List[str] = dependency_graph.get(step_id, []) + for dependency in dependencies: + dependencies.extend(_get_transitive_dependencies_for_step_id(dependency_graph, dependency, visited)) + + return dependencies + else: + return [] + + @dataclass class RunStepOptions: """Options for the run_step function.""" fail_fast: bool = True skip_steps: List[str] = field(default_factory=list) + keep_steps: List[str] = field(default_factory=list) log_step_tree: bool = True concurrency: int = 10 step_params: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS] = field(default_factory=dict) + def __post_init__(self) -> None: + if self.skip_steps and self.keep_steps: + raise ValueError("Cannot use both skip_steps and keep_steps at the same time") + + def get_step_ids_to_skip(self, runnables: STEP_TREE) -> List[str]: + if self.skip_steps: + return self.skip_steps + if self.keep_steps: + step_ids_to_keep = set(self.keep_steps) + dependency_graph = _get_dependency_graph(runnables) + all_step_ids = set(dependency_graph.keys()) + for step_id in self.keep_steps: + step_ids_to_keep.update(_get_transitive_dependencies_for_step_id(dependency_graph, step_id)) + return list(all_step_ids - step_ids_to_keep) + return [] + @dataclass(frozen=True) class StepToRun: @@ -217,6 +279,7 @@ async def run_steps( if not runnables: return results + step_ids_to_skip = options.get_step_ids_to_skip(runnables) # Log the step tree if options.log_step_tree: main_logger.info(f"STEP TREE: {runnables}") @@ -232,7 +295,7 @@ async def run_steps( steps_to_evaluate, remaining_steps = _get_next_step_group(runnables) # Remove any skipped steps - steps_to_run, results = _filter_skipped_steps(steps_to_evaluate, options.skip_steps, results) + steps_to_run, results = _filter_skipped_steps(steps_to_evaluate, step_ids_to_skip, results) # Run all steps in list concurrently semaphore = anyio.Semaphore(options.concurrency) diff --git a/airbyte-ci/connectors/pipelines/pyproject.toml b/airbyte-ci/connectors/pipelines/pyproject.toml index 4c7be71da341..3315d923151a 100644 --- a/airbyte-ci/connectors/pipelines/pyproject.toml +++ b/airbyte-ci/connectors/pipelines/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pipelines" -version = "3.3.0" +version = "3.4.0" description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines" authors = ["Airbyte "] diff --git a/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_run_steps.py b/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_run_steps.py index cc2f2e4cb7c6..b1975799ae35 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_run_steps.py +++ b/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_run_steps.py @@ -359,3 +359,66 @@ async def test_run_steps_with_params(): TestStep.accept_extra_params = True await run_steps(steps, options=options) assert steps[0].step.params_as_cli_options == ["--param1=value1"] + + +class TestRunStepOptions: + def test_init(self): + options = RunStepOptions() + assert options.fail_fast is True + assert options.concurrency == 10 + assert options.skip_steps == [] + assert options.step_params == {} + + options = RunStepOptions(fail_fast=False, concurrency=1, skip_steps=["step1"], step_params={"step1": {"--param1": ["value1"]}}) + assert options.fail_fast is False + assert options.concurrency == 1 + assert options.skip_steps == ["step1"] + assert options.step_params == {"step1": {"--param1": ["value1"]}} + + with pytest.raises(ValueError): + RunStepOptions(skip_steps=["step1"], keep_steps=["step2"]) + + @pytest.mark.parametrize( + "step_tree, options, expected_skipped_ids", + [ + ( + [ + [StepToRun(id="step1", step=TestStep(test_context)), StepToRun(id="step2", step=TestStep(test_context))], + StepToRun(id="step3", step=TestStep(test_context)), + StepToRun(id="step4", step=TestStep(test_context), depends_on=["step3", "step1"]), + StepToRun(id="step5", step=TestStep(test_context)), + ], + RunStepOptions(keep_steps=["step4"]), + {"step2", "step5"}, + ), + ( + [ + [StepToRun(id="step1", step=TestStep(test_context)), StepToRun(id="step2", step=TestStep(test_context))], + StepToRun(id="step3", step=TestStep(test_context)), + [ + StepToRun(id="step4", step=TestStep(test_context), depends_on=["step1"]), + StepToRun(id="step6", step=TestStep(test_context), depends_on=["step4", "step5"]), + ], + StepToRun(id="step5", step=TestStep(test_context), depends_on=["step3"]), + ], + RunStepOptions(keep_steps=["step6"]), + {"step2"}, + ), + ( + [ + [StepToRun(id="step1", step=TestStep(test_context)), StepToRun(id="step2", step=TestStep(test_context))], + StepToRun(id="step3", step=TestStep(test_context)), + [ + StepToRun(id="step4", step=TestStep(test_context), depends_on=["step1"]), + StepToRun(id="step6", step=TestStep(test_context), depends_on=["step4", "step5"]), + ], + StepToRun(id="step5", step=TestStep(test_context), depends_on=["step3"]), + ], + RunStepOptions(skip_steps=["step1"]), + {"step1"}, + ), + ], + ) + def test_get_step_ids_to_skip(self, step_tree, options, expected_skipped_ids): + skipped_ids = options.get_step_ids_to_skip(step_tree) + assert set(skipped_ids) == expected_skipped_ids