Skip to content

Commit

Permalink
airbyte-ci: Introduce --only-step option for connector tests (#34276)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored Jan 16, 2024
1 parent e0adbe8 commit 446eae3
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 6 deletions.
7 changes: 6 additions & 1 deletion airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ Test certified connectors:
Test connectors changed on the current branch:
`airbyte-ci connectors --modified test`

Run acceptance test only on the modified connectors, just run its full refresh tests:
`airbyte-ci connectors --modified test --only-step="acceptance" --acceptance.-k=test_full_refresh`

#### What it runs

```mermaid
Expand Down Expand Up @@ -261,11 +264,12 @@ flowchart TD
| Option | Multiple | Default value | Description |
| ------------------------------------------------------- | -------- | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `--skip-step/-x` | True | | Skip steps by id e.g. `-x unit -x acceptance` |
| `--only-step/-k` | True | | Only run specific steps by id e.g. `-k unit -k acceptance` |
| `--fail-fast` | False | False | Abort after any tests fail, rather than continuing to run additional tests. Use this setting to confirm a known bug is fixed (or not), or when you only require a pass/fail result. |
| `--code-tests-only` | True | False | Skip any tests not directly related to code updates. For instance, metadata checks, version bump checks, changelog verification, etc. Use this setting to help focus on code quality during development. |
| `--concurrent-cat` | False | False | Make CAT tests run concurrently using pytest-xdist. Be careful about source or destination API rate limits. |
| `--<step-id>.<extra-parameter>=<extra-parameter-value>` | True | | You can pass extra parameters for specific test steps. More details in the extra parameters section below |
| `--ci-requirements` | False | | | Output the CI requirements as a JSON payload. It is used to determine the CI runner to use.
| `--ci-requirements` | False | | | Output the CI requirements as a JSON payload. It is used to determine the CI runner to use.

Note:

Expand Down Expand Up @@ -539,6 +543,7 @@ E.G.: running `pytest` on a specific test folder:

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------- |
| 3.4.0 | [#34276](https://github.com/airbytehq/airbyte/pull/34276) | Introduce `--only-step` option for connector tests. |
| 3.3.0 | [#34218](https://github.com/airbytehq/airbyte/pull/34218) | Introduce `--ci-requirements` option for client defined CI runners. |
| 3.2.0 | [#34050](https://github.com/airbytehq/airbyte/pull/34050) | Connector test steps can take extra parameters |
| 3.1.3 | [#34136](https://github.com/airbytehq/airbyte/pull/34136) | Fix issue where dagger excludes were not being properly applied |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,19 @@
@click.option(
"--skip-step",
"-x",
"skip_steps",
multiple=True,
type=click.Choice([step_id.value for step_id in CONNECTOR_TEST_STEP_ID]),
help="Skip a step by name. Can be used multiple times to skip multiple steps.",
)
@click.option(
"--only-step",
"-k",
"only_steps",
multiple=True,
type=click.Choice([step_id.value for step_id in CONNECTOR_TEST_STEP_ID]),
help="Only run specific step by name. Can be used multiple times to keep multiple steps.",
)
@click.argument(
"extra_params", nargs=-1, type=click.UNPROCESSED, callback=argument_parsing.build_extra_params_mapping(CONNECTOR_TEST_STEP_ID)
)
Expand All @@ -66,14 +75,17 @@ async def test(
code_tests_only: bool,
fail_fast: bool,
concurrent_cat: bool,
skip_step: List[str],
skip_steps: List[str],
only_steps: List[str],
extra_params: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS],
) -> bool:
"""Runs a test pipeline for the selected connectors.
Args:
ctx (click.Context): The click context.
"""
if only_steps and skip_steps:
raise click.UsageError("Cannot use both --only-step and --skip-step at the same time.")
if ctx.obj["is_ci"]:
fail_if_missing_docker_hub_creds(ctx)
if ctx.obj["is_ci"] and ctx.obj["pull_request"] and ctx.obj["pull_request"].draft:
Expand All @@ -89,7 +101,8 @@ async def test(

run_step_options = RunStepOptions(
fail_fast=fail_fast,
skip_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in skip_step],
skip_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in skip_steps],
keep_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in only_steps],
step_params=extra_params,
)
connectors_tests_contexts = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import inspect
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple, Union
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple, Union

import anyio
import asyncer
Expand All @@ -27,16 +27,78 @@ class InvalidStepConfiguration(Exception):
pass


def _get_dependency_graph(steps: STEP_TREE) -> Dict[str, List[str]]:
"""
Get the dependency graph of a step tree.
"""
dependency_graph: Dict[str, List[str]] = {}
for step in steps:
if isinstance(step, StepToRun):
dependency_graph[step.id] = step.depends_on
elif isinstance(step, list):
nested_dependency_graph = _get_dependency_graph(list(step))
dependency_graph = {**dependency_graph, **nested_dependency_graph}
else:
raise Exception(f"Unexpected step type: {type(step)}")

return dependency_graph


def _get_transitive_dependencies_for_step_id(
dependency_graph: Dict[str, List[str]], step_id: str, visited: Optional[Set[str]] = None
) -> List[str]:
"""Get the transitive dependencies for a step id.
Args:
dependency_graph (Dict[str, str]): The dependency graph to use.
step_id (str): The step id to get the transitive dependencies for.
visited (Optional[Set[str]], optional): The set of visited step ids. Defaults to None.
Returns:
List[str]: List of transitive dependencies as step ids.
"""
if visited is None:
visited = set()

if step_id not in visited:
visited.add(step_id)

dependencies: List[str] = dependency_graph.get(step_id, [])
for dependency in dependencies:
dependencies.extend(_get_transitive_dependencies_for_step_id(dependency_graph, dependency, visited))

return dependencies
else:
return []


@dataclass
class RunStepOptions:
"""Options for the run_step function."""

fail_fast: bool = True
skip_steps: List[str] = field(default_factory=list)
keep_steps: List[str] = field(default_factory=list)
log_step_tree: bool = True
concurrency: int = 10
step_params: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS] = field(default_factory=dict)

def __post_init__(self) -> None:
if self.skip_steps and self.keep_steps:
raise ValueError("Cannot use both skip_steps and keep_steps at the same time")

def get_step_ids_to_skip(self, runnables: STEP_TREE) -> List[str]:
if self.skip_steps:
return self.skip_steps
if self.keep_steps:
step_ids_to_keep = set(self.keep_steps)
dependency_graph = _get_dependency_graph(runnables)
all_step_ids = set(dependency_graph.keys())
for step_id in self.keep_steps:
step_ids_to_keep.update(_get_transitive_dependencies_for_step_id(dependency_graph, step_id))
return list(all_step_ids - step_ids_to_keep)
return []


@dataclass(frozen=True)
class StepToRun:
Expand Down Expand Up @@ -217,6 +279,7 @@ async def run_steps(
if not runnables:
return results

step_ids_to_skip = options.get_step_ids_to_skip(runnables)
# Log the step tree
if options.log_step_tree:
main_logger.info(f"STEP TREE: {runnables}")
Expand All @@ -232,7 +295,7 @@ async def run_steps(
steps_to_evaluate, remaining_steps = _get_next_step_group(runnables)

# Remove any skipped steps
steps_to_run, results = _filter_skipped_steps(steps_to_evaluate, options.skip_steps, results)
steps_to_run, results = _filter_skipped_steps(steps_to_evaluate, step_ids_to_skip, results)

# Run all steps in list concurrently
semaphore = anyio.Semaphore(options.concurrency)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "3.3.0"
version = "3.4.0"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <contact@airbyte.io>"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,66 @@ async def test_run_steps_with_params():
TestStep.accept_extra_params = True
await run_steps(steps, options=options)
assert steps[0].step.params_as_cli_options == ["--param1=value1"]


class TestRunStepOptions:
def test_init(self):
options = RunStepOptions()
assert options.fail_fast is True
assert options.concurrency == 10
assert options.skip_steps == []
assert options.step_params == {}

options = RunStepOptions(fail_fast=False, concurrency=1, skip_steps=["step1"], step_params={"step1": {"--param1": ["value1"]}})
assert options.fail_fast is False
assert options.concurrency == 1
assert options.skip_steps == ["step1"]
assert options.step_params == {"step1": {"--param1": ["value1"]}}

with pytest.raises(ValueError):
RunStepOptions(skip_steps=["step1"], keep_steps=["step2"])

@pytest.mark.parametrize(
"step_tree, options, expected_skipped_ids",
[
(
[
[StepToRun(id="step1", step=TestStep(test_context)), StepToRun(id="step2", step=TestStep(test_context))],
StepToRun(id="step3", step=TestStep(test_context)),
StepToRun(id="step4", step=TestStep(test_context), depends_on=["step3", "step1"]),
StepToRun(id="step5", step=TestStep(test_context)),
],
RunStepOptions(keep_steps=["step4"]),
{"step2", "step5"},
),
(
[
[StepToRun(id="step1", step=TestStep(test_context)), StepToRun(id="step2", step=TestStep(test_context))],
StepToRun(id="step3", step=TestStep(test_context)),
[
StepToRun(id="step4", step=TestStep(test_context), depends_on=["step1"]),
StepToRun(id="step6", step=TestStep(test_context), depends_on=["step4", "step5"]),
],
StepToRun(id="step5", step=TestStep(test_context), depends_on=["step3"]),
],
RunStepOptions(keep_steps=["step6"]),
{"step2"},
),
(
[
[StepToRun(id="step1", step=TestStep(test_context)), StepToRun(id="step2", step=TestStep(test_context))],
StepToRun(id="step3", step=TestStep(test_context)),
[
StepToRun(id="step4", step=TestStep(test_context), depends_on=["step1"]),
StepToRun(id="step6", step=TestStep(test_context), depends_on=["step4", "step5"]),
],
StepToRun(id="step5", step=TestStep(test_context), depends_on=["step3"]),
],
RunStepOptions(skip_steps=["step1"]),
{"step1"},
),
],
)
def test_get_step_ids_to_skip(self, step_tree, options, expected_skipped_ids):
skipped_ids = options.get_step_ids_to_skip(step_tree)
assert set(skipped_ids) == expected_skipped_ids

0 comments on commit 446eae3

Please sign in to comment.