From 8dcf654b415471172127e8d45462651631cace88 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Fri, 15 Nov 2024 13:11:34 +0530 Subject: [PATCH 1/7] Add basic docs --- cosmos/__init__.py | 12 +++++ cosmos/operators/airflow_async.py | 5 ++ cosmos/operators/aws_eks.py | 10 ++++ cosmos/operators/azure_container_instance.py | 10 ++++ cosmos/operators/base.py | 7 +++ cosmos/operators/docker.py | 10 ++++ cosmos/operators/gcp_cloud_run_job.py | 10 ++++ cosmos/operators/kubernetes.py | 8 +++ cosmos/operators/local.py | 10 ++++ cosmos/operators/virtualenv.py | 10 ++++ dev/dags/example_clone.py | 51 +++++++++++++++++++ docs/getting_started/index.rst | 1 + docs/getting_started/operators.rst | 24 +++++++++ tests/operators/test_aws_eks.py | 2 + .../test_azure_container_instance.py | 2 + tests/operators/test_docker.py | 2 + tests/operators/test_gcp_cloud_run_job.py | 2 + tests/operators/test_kubernetes.py | 2 + tests/operators/test_local.py | 14 +++++ tests/operators/test_virtualenv.py | 15 +++++- tests/test_example_dags.py | 2 + tests/test_example_dags_no_connections.py | 1 + 22 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 dev/dags/example_clone.py create mode 100644 docs/getting_started/operators.rst diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 546ffefde..884a90659 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -30,6 +30,7 @@ from cosmos.operators.lazy_load import MissingPackage from cosmos.operators.local import ( DbtBuildLocalOperator, + DbtCloneLocalOperator, DbtDepsLocalOperator, DbtLSLocalOperator, DbtRunLocalOperator, @@ -44,6 +45,7 @@ try: from cosmos.operators.docker import ( DbtBuildDockerOperator, + DbtCloneDockerOperator, DbtLSDockerOperator, DbtRunDockerOperator, DbtRunOperationDockerOperator, @@ -65,6 +67,7 @@ try: from cosmos.operators.kubernetes import ( DbtBuildKubernetesOperator, + DbtCloneKubernetesOperator, DbtLSKubernetesOperator, DbtRunKubernetesOperator, DbtRunOperationKubernetesOperator, @@ -106,6 +109,7 @@ try: from cosmos.operators.azure_container_instance import ( DbtBuildAzureContainerInstanceOperator, + DbtCloneAzureContainerInstanceOperator, DbtLSAzureContainerInstanceOperator, DbtRunAzureContainerInstanceOperator, DbtRunOperationAzureContainerInstanceOperator, @@ -142,6 +146,7 @@ try: from cosmos.operators.aws_eks import ( DbtBuildAwsEksOperator, + DbtCloneAwsEksOperator, DbtLSAwsEksOperator, DbtRunAwsEksOperator, DbtRunOperationAwsEksOperator, @@ -170,6 +175,7 @@ try: from cosmos.operators.gcp_cloud_run_job import ( DbtBuildGcpCloudRunJobOperator, + DbtCloneGcpCloudRunJobOperator, DbtLSGcpCloudRunJobOperator, DbtRunGcpCloudRunJobOperator, DbtRunOperationGcpCloudRunJobOperator, @@ -217,6 +223,7 @@ "DbtResourceType", # Local Execution Mode "DbtBuildLocalOperator", + "DbtCloneLocalOperator", "DbtDepsLocalOperator", # deprecated, to be delete in Cosmos 2.x "DbtLSLocalOperator", "DbtRunLocalOperator", @@ -226,6 +233,7 @@ "DbtTestLocalOperator", # Docker Execution Mode "DbtBuildDockerOperator", + "DbtCloneDockerOperator", "DbtLSDockerOperator", "DbtRunDockerOperator", "DbtRunOperationDockerOperator", @@ -234,6 +242,7 @@ "DbtTestDockerOperator", # Kubernetes Execution Mode "DbtBuildKubernetesOperator", + "DbtCloneKubernetesOperator", "DbtLSKubernetesOperator", "DbtRunKubernetesOperator", "DbtRunOperationKubernetesOperator", @@ -242,6 +251,7 @@ "DbtTestKubernetesOperator", # Azure Container Instance Execution Mode "DbtBuildAzureContainerInstanceOperator", + "DbtCloneAzureContainerInstanceOperator", "DbtLSAzureContainerInstanceOperator", "DbtRunAzureContainerInstanceOperator", "DbtRunOperationAzureContainerInstanceOperator", @@ -250,6 +260,7 @@ "DbtTestAzureContainerInstanceOperator", # AWS EKS Execution Mode "DbtBuildAwsEksOperator", + "DbtCloneAwsEksOperator", "DbtLSAwsEksOperator", "DbtRunAwsEksOperator", "DbtRunOperationAwsEksOperator", @@ -258,6 +269,7 @@ "DbtTestAwsEksOperator", # GCP Cloud Run Job Execution Mode "DbtBuildGcpCloudRunJobOperator", + "DbtCloneGcpCloudRunJobOperator", "DbtLSGcpCloudRunJobOperator", "DbtRunGcpCloudRunJobOperator", "DbtRunOperationGcpCloudRunJobOperator", diff --git a/cosmos/operators/airflow_async.py b/cosmos/operators/airflow_async.py index a7f30a330..ac5b774c4 100644 --- a/cosmos/operators/airflow_async.py +++ b/cosmos/operators/airflow_async.py @@ -14,6 +14,7 @@ from cosmos.operators.base import AbstractDbtBaseOperator from cosmos.operators.local import ( DbtBuildLocalOperator, + DbtCloneLocalOperator, DbtCompileLocalOperator, DbtLocalBaseOperator, DbtLSLocalOperator, @@ -188,3 +189,7 @@ class DbtRunOperationAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtRunOpe class DbtCompileAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtCompileLocalOperator): # type: ignore pass + + +class DbtCloneAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtCloneLocalOperator): + pass diff --git a/cosmos/operators/aws_eks.py b/cosmos/operators/aws_eks.py index 1c194a3e4..7f20eda9a 100644 --- a/cosmos/operators/aws_eks.py +++ b/cosmos/operators/aws_eks.py @@ -8,6 +8,7 @@ from cosmos.operators.kubernetes import ( DbtBuildKubernetesOperator, + DbtCloneKubernetesOperator, DbtKubernetesBaseOperator, DbtLSKubernetesOperator, DbtRunKubernetesOperator, @@ -160,3 +161,12 @@ class DbtRunOperationAwsEksOperator(DbtAwsEksBaseOperator, DbtRunOperationKubern def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + + +class DbtCloneAwsEksOperator(DbtAwsEksBaseOperator, DbtCloneKubernetesOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/azure_container_instance.py b/cosmos/operators/azure_container_instance.py index d3c8ebfc3..7f335bd99 100644 --- a/cosmos/operators/azure_container_instance.py +++ b/cosmos/operators/azure_container_instance.py @@ -8,6 +8,7 @@ from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, + DbtCloneMixin, DbtLSMixin, DbtRunMixin, DbtRunOperationMixin, @@ -167,3 +168,12 @@ class DbtRunOperationAzureContainerInstanceOperator(DbtRunOperationMixin, DbtAzu def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + + +class DbtCloneAzureContainerInstanceOperator(DbtCloneMixin, DbtAzureContainerInstanceBaseOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index ed7969ebd..0751f7892 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -438,3 +438,10 @@ class DbtCompileMixin: base_cmd = ["compile"] ui_color = "#877c7c" + + +class DbtCloneMixin: + """Mixin for dbt clone command.""" + + base_cmd = ["clone"] + ui_color = "#83a300" diff --git a/cosmos/operators/docker.py b/cosmos/operators/docker.py index 6f0956237..05671b4d0 100644 --- a/cosmos/operators/docker.py +++ b/cosmos/operators/docker.py @@ -7,6 +7,7 @@ from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, + DbtCloneMixin, DbtLSMixin, DbtRunMixin, DbtRunOperationMixin, @@ -148,3 +149,12 @@ class DbtRunOperationDockerOperator(DbtRunOperationMixin, DbtDockerBaseOperator) def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + + +class DbtCloneDockerOperator(DbtCloneMixin, DbtDockerBaseOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/gcp_cloud_run_job.py b/cosmos/operators/gcp_cloud_run_job.py index 76570d56a..ef47db2cc 100644 --- a/cosmos/operators/gcp_cloud_run_job.py +++ b/cosmos/operators/gcp_cloud_run_job.py @@ -10,6 +10,7 @@ from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, + DbtCloneMixin, DbtLSMixin, DbtRunMixin, DbtRunOperationMixin, @@ -180,3 +181,12 @@ class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRun def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + + +class DbtCloneGcpCloudRunJobOperator(DbtCloneMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index 452932f07..f86925fde 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -11,6 +11,7 @@ from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, + DbtCloneMixin, DbtLSMixin, DbtRunMixin, DbtRunOperationMixin, @@ -260,3 +261,10 @@ class DbtRunOperationKubernetesOperator(DbtRunOperationMixin, DbtKubernetesBaseO def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + + +class DbtCloneKubernetesOperator(DbtCloneMixin, DbtKubernetesBaseOperator): + """Executes a dbt core clone command.""" + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 618d9e944..bf47ab4aa 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -70,6 +70,7 @@ from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, + DbtCloneMixin, DbtCompileMixin, DbtLSMixin, DbtRunMixin, @@ -1009,3 +1010,12 @@ class DbtCompileLocalOperator(DbtCompileMixin, DbtLocalBaseOperator): def __init__(self, *args: Any, **kwargs: Any) -> None: kwargs["should_upload_compiled_sql"] = True super().__init__(*args, **kwargs) + + +class DbtCloneLocalOperator(DbtCloneMixin, DbtLocalBaseOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 0b06b2a81..3bd54da99 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -17,6 +17,7 @@ from cosmos.log import get_logger from cosmos.operators.local import ( DbtBuildLocalOperator, + DbtCloneLocalOperator, DbtDocsLocalOperator, DbtLocalBaseOperator, DbtLSLocalOperator, @@ -286,3 +287,12 @@ class DbtDocsVirtualenvOperator(DbtVirtualenvBaseOperator, DbtDocsLocalOperator) def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) + + +class DbtCloneVirtualenvOperator(DbtVirtualenvBaseOperator, DbtCloneLocalOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) diff --git a/dev/dags/example_clone.py b/dev/dags/example_clone.py new file mode 100644 index 000000000..d3d238758 --- /dev/null +++ b/dev/dags/example_clone.py @@ -0,0 +1,51 @@ +from datetime import datetime + +from airflow import DAG + +from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig + +DBT_PROJ_DIR = "/usr/local/airflow/dbt/jaffle_shop" + +profile_config1 = ProfileConfig( + profile_name="bigquery_dev", + target_name="dev", + profiles_yml_filepath="/usr/local/airflow/dbt/jaffle_shop/profiles.yml", +) + +profile_config2 = ProfileConfig( + profile_name="bigquery_clone", + target_name="dev", + profiles_yml_filepath="/usr/local/airflow/dbt/jaffle_shop/profiles.yml", +) + + +with DAG("test-id-1", start_date=datetime(2024, 1, 1), catchup=False) as dag: + seed_operator = DbtSeedLocalOperator( + profile_config=profile_config1, + project_dir=DBT_PROJ_DIR, + task_id="seed", + dbt_cmd_flags=["--select", "raw_customers"], + install_deps=True, + append_env=True, + ) + run_operator = DbtRunLocalOperator( + profile_config=profile_config1, + project_dir=DBT_PROJ_DIR, + task_id="run", + dbt_cmd_flags=["--models", "stg_customers"], + install_deps=True, + append_env=True, + ) + + # [START clone_example] + clone_operator = DbtCloneLocalOperator( + profile_config=profile_config2, + project_dir=DBT_PROJ_DIR, + task_id="clone", + dbt_cmd_flags=["--models", "stg_customers", "--state", "/usr/local/airflow/dbt/jaffle_shop/target"], + install_deps=True, + append_env=True, + ) + # [END clone_example] + + seed_operator >> run_operator >> clone_operator diff --git a/docs/getting_started/index.rst b/docs/getting_started/index.rst index ed1952793..958f115e1 100644 --- a/docs/getting_started/index.rst +++ b/docs/getting_started/index.rst @@ -14,6 +14,7 @@ Azure Container Instance Execution Mode GCP Cloud Run Job Execution Mode dbt and Airflow Similar Concepts + Operators Getting Started diff --git a/docs/getting_started/operators.rst b/docs/getting_started/operators.rst new file mode 100644 index 000000000..9f955047d --- /dev/null +++ b/docs/getting_started/operators.rst @@ -0,0 +1,24 @@ +.. _operators: + +Operators +========= + +Cosmos exposes individual operators that correspond to specific dbt commands, which can be used just like traditional +`Apache Airflow® `_ operators. Cosmos names these operators using the format ``DbtOperator``. For example, ``DbtBuildLocalOperator``. + +Clone +----- + +Requirement + +* Cosmos >= 1.8.0 +* dbt-core >= 1.6.0 + +The ``DbtCloneLocalOperator`` implement `dbt clone `_ command. + +Example of how to use + +.. literalinclude:: ../../dev/dags/example_clone.py + :language: python + :start-after: [START clone_example] + :end-before: [END clone_example] diff --git a/tests/operators/test_aws_eks.py b/tests/operators/test_aws_eks.py index 35717a061..bca007c4d 100644 --- a/tests/operators/test_aws_eks.py +++ b/tests/operators/test_aws_eks.py @@ -5,6 +5,7 @@ from cosmos.operators.aws_eks import ( DbtBuildAwsEksOperator, + DbtCloneAwsEksOperator, DbtLSAwsEksOperator, DbtRunAwsEksOperator, DbtSeedAwsEksOperator, @@ -44,6 +45,7 @@ def test_dbt_kubernetes_build_command(): "test": DbtTestAwsEksOperator(**base_kwargs), "build": DbtBuildAwsEksOperator(**base_kwargs), "seed": DbtSeedAwsEksOperator(**base_kwargs), + "clone": DbtCloneAwsEksOperator(**base_kwargs), } for command_name, command_operator in result_map.items(): diff --git a/tests/operators/test_azure_container_instance.py b/tests/operators/test_azure_container_instance.py index c57466619..4f1bdfaee 100644 --- a/tests/operators/test_azure_container_instance.py +++ b/tests/operators/test_azure_container_instance.py @@ -7,6 +7,7 @@ from cosmos.operators.azure_container_instance import ( DbtAzureContainerInstanceBaseOperator, DbtBuildAzureContainerInstanceOperator, + DbtCloneAzureContainerInstanceOperator, DbtLSAzureContainerInstanceOperator, DbtRunAzureContainerInstanceOperator, DbtSeedAzureContainerInstanceOperator, @@ -127,6 +128,7 @@ def test_dbt_azure_container_instance_operator_check_environment_variables( "run": DbtRunAzureContainerInstanceOperator(**base_kwargs), "test": DbtTestAzureContainerInstanceOperator(**base_kwargs), "seed": DbtSeedAzureContainerInstanceOperator(**base_kwargs), + "clone": DbtCloneAzureContainerInstanceOperator(**base_kwargs), } diff --git a/tests/operators/test_docker.py b/tests/operators/test_docker.py index 2cfb6b835..ba2ed43c9 100644 --- a/tests/operators/test_docker.py +++ b/tests/operators/test_docker.py @@ -7,6 +7,7 @@ from cosmos.operators.docker import ( DbtBuildDockerOperator, + DbtCloneDockerOperator, DbtLSDockerOperator, DbtRunDockerOperator, DbtSeedDockerOperator, @@ -113,6 +114,7 @@ def test_dbt_docker_operator_get_env(p_context_to_airflow_vars: MagicMock, base_ "test": DbtTestDockerOperator(**base_kwargs), "build": DbtBuildDockerOperator(**base_kwargs), "seed": DbtSeedDockerOperator(**base_kwargs), + "clone": DbtCloneDockerOperator(**base_kwargs), } diff --git a/tests/operators/test_gcp_cloud_run_job.py b/tests/operators/test_gcp_cloud_run_job.py index 08b7ba999..9cdd96bdb 100644 --- a/tests/operators/test_gcp_cloud_run_job.py +++ b/tests/operators/test_gcp_cloud_run_job.py @@ -10,6 +10,7 @@ try: from cosmos.operators.gcp_cloud_run_job import ( DbtBuildGcpCloudRunJobOperator, + DbtCloneGcpCloudRunJobOperator, DbtGcpCloudRunJobBaseOperator, DbtLSGcpCloudRunJobOperator, DbtRunGcpCloudRunJobOperator, @@ -173,6 +174,7 @@ def test_dbt_gcp_cloud_run_job_build_command(): "build": DbtBuildGcpCloudRunJobOperator(**BASE_KWARGS), "snapshot": DbtSnapshotGcpCloudRunJobOperator(**BASE_KWARGS), "source": DbtSourceGcpCloudRunJobOperator(**BASE_KWARGS), + "clone": DbtCloneGcpCloudRunJobOperator(**BASE_KWARGS), "run-operation": DbtRunOperationGcpCloudRunJobOperator(macro_name="some-macro", **BASE_KWARGS), } diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 51375f66b..e6ccdc4d7 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -10,6 +10,7 @@ from cosmos.operators.kubernetes import ( DbtBuildKubernetesOperator, + DbtCloneKubernetesOperator, DbtLSKubernetesOperator, DbtRunKubernetesOperator, DbtSeedKubernetesOperator, @@ -128,6 +129,7 @@ def test_dbt_kubernetes_operator_get_env(p_context_to_airflow_vars: MagicMock, b "test": DbtTestKubernetesOperator(**base_kwargs), "build": DbtBuildKubernetesOperator(**base_kwargs), "seed": DbtSeedKubernetesOperator(**base_kwargs), + "clone": DbtCloneKubernetesOperator(**base_kwargs), } diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 2de6ca1e3..6f4948804 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -29,6 +29,7 @@ from cosmos.hooks.subprocess import FullOutputSubprocessResult from cosmos.operators.local import ( DbtBuildLocalOperator, + DbtCloneLocalOperator, DbtCompileLocalOperator, DbtDocsAzureStorageLocalOperator, DbtDocsGCSLocalOperator, @@ -1161,6 +1162,19 @@ def test_dbt_compile_local_operator_initialisation(): assert "compile" in operator.base_cmd +def test_dbt_clone_local_operator_initialisation(): + operator = DbtCloneLocalOperator( + profile_config=profile_config, + project_dir=DBT_PROJ_DIR, + task_id="clone", + dbt_cmd_flags=["--state", "/usr/local/airflow/dbt/jaffle_shop/target"], + install_deps=True, + append_env=True, + ) + + assert "clone" in operator.base_cmd + + @patch("cosmos.operators.local.remote_target_path", new="s3://some-bucket/target") @patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", new=False) def test_configure_remote_target_path_object_storage_unavailable_on_earlier_airflow_versions(): diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index fdc76f321..5c950f478 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -15,7 +15,7 @@ from cosmos.config import ProfileConfig from cosmos.constants import InvocationMode from cosmos.exceptions import CosmosValueError -from cosmos.operators.virtualenv import DbtVirtualenvBaseOperator +from cosmos.operators.virtualenv import DbtCloneVirtualenvOperator, DbtVirtualenvBaseOperator from cosmos.profiles import PostgresUserPasswordProfileMapping AIRFLOW_VERSION = Version(airflow.__version__) @@ -376,3 +376,16 @@ def test_integration_virtualenv_operator(caplog): assert "Trying to run the command:\n ['/tmp/persistent-venv2/bin/dbt', 'deps'" in caplog.text assert "Trying to run the command:\n ['/tmp/persistent-venv2/bin/dbt', 'seed'" in caplog.text + + +def test_dbt_clone_virtualenv_operator_initialisation(): + operator = DbtCloneVirtualenvOperator( + profile_config=profile_config, + project_dir=DBT_PROJ_DIR, + task_id="clone", + dbt_cmd_flags=["--state", "/usr/local/airflow/dbt/jaffle_shop/target"], + install_deps=True, + append_env=True, + ) + + assert "clone" in operator.base_cmd diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 4be51a176..e647bc2fa 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -79,6 +79,8 @@ def get_dag_bag() -> DagBag: file.writelines(["example_cosmos_sources.py\n"]) if DBT_VERSION < Version("1.6.0"): file.writelines(["example_model_version.py\n"]) + file.writelines(["example_clone.py\n"]) + if DBT_VERSION < Version("1.5.0"): file.writelines(["example_source_rendering.py\n"]) diff --git a/tests/test_example_dags_no_connections.py b/tests/test_example_dags_no_connections.py index 70cfbc041..d18d21730 100644 --- a/tests/test_example_dags_no_connections.py +++ b/tests/test_example_dags_no_connections.py @@ -55,6 +55,7 @@ def get_dag_bag() -> DagBag: if DBT_VERSION < Version("1.6.0"): file.writelines(["example_model_version.py\n"]) + file.writelines(["example_clone.py\n"]) # cosmos_profile_mapping uses the automatic profile rendering from an Airflow connection. # so we can't parse that without live connections for file_name in ["cosmos_profile_mapping.py"]: From 0fd3cac2dd449504d6cb128504a1188358ff3493 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sun, 17 Nov 2024 19:34:02 +0530 Subject: [PATCH 2/7] Add tests --- dev/dags/dbt/jaffle_shop/profiles.yml | 25 +++++++++++++++++++ ...{example_clone.py => example_operators.py} | 14 ++++++++--- docs/getting_started/operators.rst | 2 +- 3 files changed, 36 insertions(+), 5 deletions(-) rename dev/dags/{example_clone.py => example_operators.py} (72%) diff --git a/dev/dags/dbt/jaffle_shop/profiles.yml b/dev/dags/dbt/jaffle_shop/profiles.yml index 3960ca84d..503f49c49 100644 --- a/dev/dags/dbt/jaffle_shop/profiles.yml +++ b/dev/dags/dbt/jaffle_shop/profiles.yml @@ -22,3 +22,28 @@ postgres_profile: port: 5432 # "{{ env_var('POSTGRES_PORT') | as_number }}" schema: "{{ env_var('POSTGRES_SCHEMA') }}" user: "{{ env_var('POSTGRES_USER') }}" + +bigquery_dev: + target: dev + outputs: + dev: + type: bigquery + method: service-account + project: astronomer-dag-authoring + dataset: bq_dev + threads: 4 # Must be a value of 1 or greater + keyfile: /usr/local/airflow/include/key.json + location: US + + +bigquery_clone: + target: dev + outputs: + dev: + type: bigquery + method: service-account + project: astronomer-dag-authoring + dataset: bq_clone + threads: 4 # Must be a value of 1 or greater + keyfile: /usr/local/airflow/include/key.json + location: US diff --git a/dev/dags/example_clone.py b/dev/dags/example_operators.py similarity index 72% rename from dev/dags/example_clone.py rename to dev/dags/example_operators.py index d3d238758..0fdbd1fc5 100644 --- a/dev/dags/example_clone.py +++ b/dev/dags/example_operators.py @@ -1,21 +1,27 @@ +import os from datetime import datetime +from pathlib import Path from airflow import DAG from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig -DBT_PROJ_DIR = "/usr/local/airflow/dbt/jaffle_shop" +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) +DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop" +DBT_PROFILE_PATH = DBT_ROOT_PATH / "profiles.yml" +DBT_ARTIFACT = DBT_ROOT_PATH / "target" profile_config1 = ProfileConfig( profile_name="bigquery_dev", target_name="dev", - profiles_yml_filepath="/usr/local/airflow/dbt/jaffle_shop/profiles.yml", + profiles_yml_filepath=DBT_PROFILE_PATH, ) profile_config2 = ProfileConfig( profile_name="bigquery_clone", target_name="dev", - profiles_yml_filepath="/usr/local/airflow/dbt/jaffle_shop/profiles.yml", + profiles_yml_filepath=DBT_PROFILE_PATH, ) @@ -42,7 +48,7 @@ profile_config=profile_config2, project_dir=DBT_PROJ_DIR, task_id="clone", - dbt_cmd_flags=["--models", "stg_customers", "--state", "/usr/local/airflow/dbt/jaffle_shop/target"], + dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT], install_deps=True, append_env=True, ) diff --git a/docs/getting_started/operators.rst b/docs/getting_started/operators.rst index 9f955047d..691a0eb31 100644 --- a/docs/getting_started/operators.rst +++ b/docs/getting_started/operators.rst @@ -18,7 +18,7 @@ The ``DbtCloneLocalOperator`` implement `dbt clone Date: Sun, 17 Nov 2024 19:43:28 +0530 Subject: [PATCH 3/7] Add tests --- dev/dags/example_operators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dags/example_operators.py b/dev/dags/example_operators.py index 0fdbd1fc5..6935f9972 100644 --- a/dev/dags/example_operators.py +++ b/dev/dags/example_operators.py @@ -25,7 +25,7 @@ ) -with DAG("test-id-1", start_date=datetime(2024, 1, 1), catchup=False) as dag: +with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag: seed_operator = DbtSeedLocalOperator( profile_config=profile_config1, project_dir=DBT_PROJ_DIR, From 5b3de19b194f9adb7fe2ed528f5a7e6cbef8fd55 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 18 Nov 2024 14:42:18 +0530 Subject: [PATCH 4/7] Fix tests --- dev/dags/dbt/jaffle_shop/profiles.yml | 1 - dev/dags/example_operators.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dev/dags/dbt/jaffle_shop/profiles.yml b/dev/dags/dbt/jaffle_shop/profiles.yml index 503f49c49..e3420b203 100644 --- a/dev/dags/dbt/jaffle_shop/profiles.yml +++ b/dev/dags/dbt/jaffle_shop/profiles.yml @@ -35,7 +35,6 @@ bigquery_dev: keyfile: /usr/local/airflow/include/key.json location: US - bigquery_clone: target: dev outputs: diff --git a/dev/dags/example_operators.py b/dev/dags/example_operators.py index 6935f9972..dccdd0d8b 100644 --- a/dev/dags/example_operators.py +++ b/dev/dags/example_operators.py @@ -9,8 +9,8 @@ DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop" -DBT_PROFILE_PATH = DBT_ROOT_PATH / "profiles.yml" -DBT_ARTIFACT = DBT_ROOT_PATH / "target" +DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml" +DBT_ARTIFACT = DBT_PROJ_DIR / "target" profile_config1 = ProfileConfig( profile_name="bigquery_dev", From 25e9af7cbdc749cd5516b51d4359ae79bf462f46 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 18 Nov 2024 18:26:23 +0530 Subject: [PATCH 5/7] Fix tests --- dev/dags/dbt/jaffle_shop/profiles.yml | 24 ------------------------ dev/dags/example_operators.py | 17 +++++------------ 2 files changed, 5 insertions(+), 36 deletions(-) diff --git a/dev/dags/dbt/jaffle_shop/profiles.yml b/dev/dags/dbt/jaffle_shop/profiles.yml index e3420b203..3960ca84d 100644 --- a/dev/dags/dbt/jaffle_shop/profiles.yml +++ b/dev/dags/dbt/jaffle_shop/profiles.yml @@ -22,27 +22,3 @@ postgres_profile: port: 5432 # "{{ env_var('POSTGRES_PORT') | as_number }}" schema: "{{ env_var('POSTGRES_SCHEMA') }}" user: "{{ env_var('POSTGRES_USER') }}" - -bigquery_dev: - target: dev - outputs: - dev: - type: bigquery - method: service-account - project: astronomer-dag-authoring - dataset: bq_dev - threads: 4 # Must be a value of 1 or greater - keyfile: /usr/local/airflow/include/key.json - location: US - -bigquery_clone: - target: dev - outputs: - dev: - type: bigquery - method: service-account - project: astronomer-dag-authoring - dataset: bq_clone - threads: 4 # Must be a value of 1 or greater - keyfile: /usr/local/airflow/include/key.json - location: US diff --git a/dev/dags/example_operators.py b/dev/dags/example_operators.py index dccdd0d8b..0fd50016d 100644 --- a/dev/dags/example_operators.py +++ b/dev/dags/example_operators.py @@ -12,22 +12,15 @@ DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml" DBT_ARTIFACT = DBT_PROJ_DIR / "target" -profile_config1 = ProfileConfig( - profile_name="bigquery_dev", +profile_config = ProfileConfig( + profile_name="postgres_profile", target_name="dev", profiles_yml_filepath=DBT_PROFILE_PATH, ) -profile_config2 = ProfileConfig( - profile_name="bigquery_clone", - target_name="dev", - profiles_yml_filepath=DBT_PROFILE_PATH, -) - - with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag: seed_operator = DbtSeedLocalOperator( - profile_config=profile_config1, + profile_config=profile_config, project_dir=DBT_PROJ_DIR, task_id="seed", dbt_cmd_flags=["--select", "raw_customers"], @@ -35,7 +28,7 @@ append_env=True, ) run_operator = DbtRunLocalOperator( - profile_config=profile_config1, + profile_config=profile_config, project_dir=DBT_PROJ_DIR, task_id="run", dbt_cmd_flags=["--models", "stg_customers"], @@ -45,7 +38,7 @@ # [START clone_example] clone_operator = DbtCloneLocalOperator( - profile_config=profile_config2, + profile_config=profile_config, project_dir=DBT_PROJ_DIR, task_id="clone", dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT], From 0ee79dfb552e540f0bb9f4baba46717fc4b21587 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 18 Nov 2024 18:39:42 +0530 Subject: [PATCH 6/7] Add full-refresh in mixing class --- cosmos/operators/base.py | 18 ++++++++++++++++++ tests/operators/test_local.py | 5 +++++ 2 files changed, 23 insertions(+) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 0751f7892..52fb98bac 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -445,3 +445,21 @@ class DbtCloneMixin: base_cmd = ["clone"] ui_color = "#83a300" + + def __init__(self, full_refresh: bool | str = False, **kwargs: Any) -> None: + self.full_refresh = full_refresh + super().__init__(**kwargs) + + def add_cmd_flags(self) -> list[str]: + flags = [] + + if isinstance(self.full_refresh, str): + # Handle template fields when render_template_as_native_obj=False + full_refresh = to_boolean(self.full_refresh) + else: + full_refresh = self.full_refresh + + if full_refresh is True: + flags.append("--full-refresh") + + return flags diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 6f4948804..1f065fd3e 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -805,6 +805,11 @@ def test_store_compiled_sql() -> None: {"full_refresh": True}, {"context": {}, "env": {}, "cmd_flags": ["run", "--full-refresh"]}, ), + ( + DbtCloneLocalOperator, + {"full_refresh": True}, + {"context": {}, "env": {}, "cmd_flags": ["clone", "--full-refresh"]}, + ), ( DbtTestLocalOperator, {}, From 090e095151bd26e6819f294087dd2fb0ea783686 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 18 Nov 2024 18:42:54 +0530 Subject: [PATCH 7/7] Add full-refresh in mixing class --- dev/dags/example_operators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dags/example_operators.py b/dev/dags/example_operators.py index 0fd50016d..1c8624a34 100644 --- a/dev/dags/example_operators.py +++ b/dev/dags/example_operators.py @@ -13,7 +13,7 @@ DBT_ARTIFACT = DBT_PROJ_DIR / "target" profile_config = ProfileConfig( - profile_name="postgres_profile", + profile_name="default", target_name="dev", profiles_yml_filepath=DBT_PROFILE_PATH, )