From 3947238d235426113569884059124804360854a4 Mon Sep 17 00:00:00 2001 From: MrBones757 Date: Thu, 16 Nov 2023 12:00:28 +0000 Subject: [PATCH 1/3] Delayed validation of profiles yml existance, added validation if both profile_mapping and profiles_yml_filepath are defined --- cosmos/config.py | 18 ++++++++++++------ cosmos/converter.py | 27 +++++++++++++++++++-------- tests/test_config.py | 23 ++++++++++++++++------- tests/test_converter.py | 9 +++++++-- 4 files changed, 54 insertions(+), 23 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index a33e96830..5c64193c1 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -41,8 +41,8 @@ class RenderConfig: :param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly') :param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing :param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. - :param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. Mutually Exclusive with ProjectConfig.dbt_project_path - :param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` + :param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. + :param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``. """ emit_datasets: bool = True @@ -195,15 +195,21 @@ class ProfileConfig: profile_mapping: BaseProfileMapping | None = None def __post_init__(self) -> None: - "Validates that we have enough information to render a profile." - # if using a user-supplied profiles.yml, validate that it exists - if self.profiles_yml_filepath and not Path(self.profiles_yml_filepath).exists(): - raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.") + self.validate_profile() def validate_profile(self) -> None: "Validates that we have enough information to render a profile." if not self.profiles_yml_filepath and not self.profile_mapping: raise CosmosValueError("Either profiles_yml_filepath or profile_mapping must be set to render a profile") + if self.profiles_yml_filepath and self.profile_mapping: + raise CosmosValueError( + "Both profiles_yml_filepath and profile_mapping are defined and are mutually exclusive. Ensure only one of these is defined." + ) + + def validate_profiles_yml(self) -> None: + "Validates a user-supplied profiles.yml is present" + if self.profiles_yml_filepath and not Path(self.profiles_yml_filepath).exists(): + raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.") @contextlib.contextmanager def ensure_profile( diff --git a/cosmos/converter.py b/cosmos/converter.py index 559b7ea69..27039c4d1 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -11,6 +11,7 @@ from airflow.utils.task_group import TaskGroup from cosmos.airflow.graph import build_airflow_graph +from cosmos.constants import ExecutionMode from cosmos.dbt.graph import DbtGraph from cosmos.dbt.selector import retrieve_by_label from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ProfileConfig @@ -49,7 +50,11 @@ def airflow_kwargs(**kwargs: dict[str, Any]) -> dict[str, Any]: def validate_arguments( - select: list[str], exclude: list[str], profile_args: dict[str, Any], task_args: dict[str, Any] + select: list[str], + exclude: list[str], + profile_config: ProfileConfig, + task_args: dict[str, Any], + execution_mode: ExecutionMode, ) -> None: """ Validate that mutually exclusive selectors filters have not been given. @@ -57,8 +62,9 @@ def validate_arguments( :param select: A list of dbt select arguments (e.g. 'config.materialized:incremental') :param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly') - :param profile_args: Arguments to pass to the dbt profile + :param profile_config: ProfileConfig Object :param task_args: Arguments to be used to instantiate an Airflow Task + :param execution_mode: the current execution mode """ for field in ("tags", "paths"): select_items = retrieve_by_label(select, field) @@ -69,9 +75,12 @@ def validate_arguments( # if task_args has a schema, add it to the profile args and add a deprecated warning if "schema" in task_args: - profile_args["schema"] = task_args["schema"] + profile_config.profile_mapping.profile_args["schema"] = task_args["schema"] logger.warning("Specifying a schema in the `task_args` is deprecated. Please use the `profile_args` instead.") + if execution_mode in [ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV]: + profile_config.validate_profiles_yml() + class DbtToAirflowConverter: """ @@ -139,10 +148,6 @@ def __init__( "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." ) - profile_args = {} - if profile_config.profile_mapping: - profile_args = profile_config.profile_mapping.profile_args - if not operator_args: operator_args = {} @@ -174,7 +179,13 @@ def __init__( if execution_config.dbt_executable_path: task_args["dbt_executable_path"] = execution_config.dbt_executable_path - validate_arguments(render_config.select, render_config.exclude, profile_args, task_args) + validate_arguments( + render_config.select, + render_config.exclude, + profile_config, + task_args, + execution_mode=execution_config.execution_mode, + ) build_airflow_graph( nodes=dbt_graph.filtered_nodes, diff --git a/tests/test_config.py b/tests/test_config.py index cc0711043..d7cd2db2e 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,5 +1,6 @@ from pathlib import Path from unittest.mock import patch +from cosmos.profiles.postgres.user_pass import PostgresUserPasswordProfileMapping import pytest @@ -8,6 +9,7 @@ DBT_PROJECTS_ROOT_DIR = Path(__file__).parent / "sample/" +SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" PIPELINE_FOLDER = "jaffle_shop" @@ -111,17 +113,24 @@ def test_project_name(): assert dbt_project.project_name == "sample" -def test_profile_config_post_init(): +def test_profile_config_validate_none(): with pytest.raises(CosmosValueError) as err_info: - ProfileConfig(profiles_yml_filepath="/tmp/some-profile", profile_name="test", target_name="test") - assert err_info.value.args[0] == "The file /tmp/some-profile does not exist." + ProfileConfig(profile_name="test", target_name="test") + assert err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile" -def test_profile_config_validate(): +def test_profile_config_validate_both(): with pytest.raises(CosmosValueError) as err_info: - profile_config = ProfileConfig(profile_name="test", target_name="test") - assert profile_config.validate_profile() is None - assert err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile" + ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), + ) + assert ( + err_info.value.args[0] + == "Both profiles_yml_filepath and profile_mapping are defined and are mutually exclusive. Ensure only one of these is defined." + ) @patch("cosmos.config.shutil.which", return_value=None) diff --git a/tests/test_converter.py b/tests/test_converter.py index 4210b24d6..b6def0bf1 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1,6 +1,7 @@ from datetime import datetime from pathlib import Path from unittest.mock import patch +from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping import pytest from airflow.models import DAG @@ -22,10 +23,14 @@ def test_validate_arguments_tags(argument_key): selector_name = argument_key[:-1] select = [f"{selector_name}:a,{selector_name}:b"] exclude = [f"{selector_name}:b,{selector_name}:c"] - profile_args = {} + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), + ) task_args = {} with pytest.raises(CosmosValueError) as err: - validate_arguments(select, exclude, profile_args, task_args) + validate_arguments(select, exclude, profile_config, task_args, execution_mode=ExecutionMode.LOCAL) expected = f"Can't specify the same {selector_name} in `select` and `exclude`: {{'b'}}" assert err.value.args[0] == expected From dfdddd729bfbc832676b413ee6e9db8ea4dc9c51 Mon Sep 17 00:00:00 2001 From: MrBones757 Date: Thu, 16 Nov 2023 14:40:07 +0000 Subject: [PATCH 2/3] updated tests for coverage --- tests/test_config.py | 8 ++++++++ tests/test_converter.py | 13 +++++++++++++ 2 files changed, 21 insertions(+) diff --git a/tests/test_config.py b/tests/test_config.py index d7cd2db2e..578a68f76 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -133,6 +133,14 @@ def test_profile_config_validate_both(): ) +def test_profile_config_validate_profiles_yml(): + profile_config = ProfileConfig(profile_name="test", target_name="test", profiles_yml_filepath="/tmp/no-exists") + with pytest.raises(CosmosValueError) as err_info: + profile_config.validate_profiles_yml() + + assert err_info.value.args[0] == "The file /tmp/no-exists does not exist." + + @patch("cosmos.config.shutil.which", return_value=None) def test_render_config_without_dbt_cmd(mock_which): render_config = RenderConfig() diff --git a/tests/test_converter.py b/tests/test_converter.py index b6def0bf1..f8a4645e5 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -35,6 +35,19 @@ def test_validate_arguments_tags(argument_key): assert err.value.args[0] == expected +def test_validate_arguments_schema_in_task_args(): + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), + ) + task_args = {"schema": "abcd"} + validate_arguments( + select=[], exclude=[], profile_config=profile_config, task_args=task_args, execution_mode=ExecutionMode.LOCAL + ) + assert profile_config.profile_mapping.profile_args["schema"] == "abcd" + + parent_seed = DbtNode( name="seed_parent", unique_id="seed_parent", From 0e6de62f311db27af824461ef9001afa5d2a235e Mon Sep 17 00:00:00 2001 From: MrBones757 Date: Fri, 17 Nov 2023 01:10:18 +0000 Subject: [PATCH 3/3] add additional validation to ensure value exists --- cosmos/converter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 27039c4d1..2142cc6e4 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -75,8 +75,9 @@ def validate_arguments( # if task_args has a schema, add it to the profile args and add a deprecated warning if "schema" in task_args: - profile_config.profile_mapping.profile_args["schema"] = task_args["schema"] logger.warning("Specifying a schema in the `task_args` is deprecated. Please use the `profile_args` instead.") + if profile_config.profile_mapping: + profile_config.profile_mapping.profile_args["schema"] = task_args["schema"] if execution_mode in [ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV]: profile_config.validate_profiles_yml()