Skip to content

Commit

Permalink
Add integration test to confirm the behaviour for build works as expe…
Browse files Browse the repository at this point in the history
…cted
  • Loading branch information
tatiana committed Dec 27, 2024
1 parent ca9e9b3 commit 1c97e32
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
1 change: 1 addition & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ def generate_task_or_group(
normalize_task_id=normalize_task_id,
test_behavior=test_behavior,
on_warning_callback=on_warning_callback,
detached_from_parent=detached_from_parent,
)

# In most cases, we'll map one DBT node to one Airflow task
Expand Down
52 changes: 51 additions & 1 deletion tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.models import DAG

from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import DbtResourceType, ExecutionMode, InvocationMode, LoadMode
from cosmos.constants import DbtResourceType, ExecutionMode, InvocationMode, LoadMode, TestBehavior
from cosmos.converter import DbtToAirflowConverter, validate_arguments, validate_initial_user_config
from cosmos.dbt.graph import DbtGraph, DbtNode
from cosmos.exceptions import CosmosValueError
Expand Down Expand Up @@ -205,6 +205,56 @@ def test_converter_creates_dag_with_test_with_multiple_parents():
assert args[1:] == ["test", "--select", "custom_test_combined_model_combined_model_.c6e4587380"]


@pytest.mark.integration
def test_converter_creates_dag_with_test_with_multiple_parents_and_build():
"""
Validate topology of a project that uses the MULTIPLE_PARENTS_TEST_DBT_PROJECT project
"""
project_config = ProjectConfig(dbt_project_path=MULTIPLE_PARENTS_TEST_DBT_PROJECT)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)
with DAG("sample_dag", start_date=datetime(2024, 4, 16)) as dag:
converter = DbtToAirflowConverter(
dag=dag,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=RenderConfig(test_behavior=TestBehavior.BUILD),
)
tasks = converter.tasks_map

assert len(converter.tasks_map) == 4

# We exclude the test that depends on combined_model and model_a from their commands
args = tasks["model.my_dbt_project.combined_model"].build_cmd({})[0]
assert args[1:] == [
"build",
"--exclude",
"custom_test_combined_model_combined_model_",
"--models",
"combined_model",
]

args = tasks["model.my_dbt_project.model_a"].build_cmd({})[0]
assert args[1:] == ["build", "--exclude", "custom_test_combined_model_combined_model_", "--models", "model_a"]

# The test for model_b should not be changed, since it is not a parent of this test
args = tasks["model.my_dbt_project.model_b"].build_cmd({})[0]
assert args[1:] == ["build", "--models", "model_b"]

# We should have a task dedicated to run the test with multiple parents
args = tasks["test.my_dbt_project.custom_test_combined_model_combined_model_.c6e4587380"].build_cmd({})[0]
assert args[1:] == ["test", "--select", "custom_test_combined_model_combined_model_.c6e4587380"]


@pytest.mark.parametrize(
"execution_mode,operator_args",
[
Expand Down

0 comments on commit 1c97e32

Please sign in to comment.