diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 9ea919118..6c2307596 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -342,6 +342,7 @@ def generate_task_or_group( normalize_task_id=normalize_task_id, test_behavior=test_behavior, on_warning_callback=on_warning_callback, + detached_from_parent=detached_from_parent, ) # In most cases, we'll map one DBT node to one Airflow task diff --git a/tests/test_converter.py b/tests/test_converter.py index 1868cc4de..f1fc6f7b3 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -7,7 +7,7 @@ from airflow.models import DAG from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import DbtResourceType, ExecutionMode, InvocationMode, LoadMode +from cosmos.constants import DbtResourceType, ExecutionMode, InvocationMode, LoadMode, TestBehavior from cosmos.converter import DbtToAirflowConverter, validate_arguments, validate_initial_user_config from cosmos.dbt.graph import DbtGraph, DbtNode from cosmos.exceptions import CosmosValueError @@ -205,6 +205,56 @@ def test_converter_creates_dag_with_test_with_multiple_parents(): assert args[1:] == ["test", "--select", "custom_test_combined_model_combined_model_.c6e4587380"] +@pytest.mark.integration +def test_converter_creates_dag_with_test_with_multiple_parents_and_build(): + """ + Validate topology of a project that uses the MULTIPLE_PARENTS_TEST_DBT_PROJECT project + """ + project_config = ProjectConfig(dbt_project_path=MULTIPLE_PARENTS_TEST_DBT_PROJECT) + execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL) + profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), + ) + with DAG("sample_dag", start_date=datetime(2024, 4, 16)) as dag: + converter = DbtToAirflowConverter( + dag=dag, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=RenderConfig(test_behavior=TestBehavior.BUILD), + ) + tasks = converter.tasks_map + + assert len(converter.tasks_map) == 4 + + # We exclude the test that depends on combined_model and model_a from their commands + args = tasks["model.my_dbt_project.combined_model"].build_cmd({})[0] + assert args[1:] == [ + "build", + "--exclude", + "custom_test_combined_model_combined_model_", + "--models", + "combined_model", + ] + + args = tasks["model.my_dbt_project.model_a"].build_cmd({})[0] + assert args[1:] == ["build", "--exclude", "custom_test_combined_model_combined_model_", "--models", "model_a"] + + # The test for model_b should not be changed, since it is not a parent of this test + args = tasks["model.my_dbt_project.model_b"].build_cmd({})[0] + assert args[1:] == ["build", "--models", "model_b"] + + # We should have a task dedicated to run the test with multiple parents + args = tasks["test.my_dbt_project.custom_test_combined_model_combined_model_.c6e4587380"].build_cmd({})[0] + assert args[1:] == ["test", "--select", "custom_test_combined_model_combined_model_.c6e4587380"] + + @pytest.mark.parametrize( "execution_mode,operator_args", [