Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dbt clone operator #1326

Merged
merged 7 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from cosmos.operators.lazy_load import MissingPackage
from cosmos.operators.local import (
DbtBuildLocalOperator,
DbtCloneLocalOperator,
DbtDepsLocalOperator,
DbtLSLocalOperator,
DbtRunLocalOperator,
Expand All @@ -44,6 +45,7 @@
try:
from cosmos.operators.docker import (
DbtBuildDockerOperator,
DbtCloneDockerOperator,
DbtLSDockerOperator,
DbtRunDockerOperator,
DbtRunOperationDockerOperator,
Expand All @@ -65,6 +67,7 @@
try:
from cosmos.operators.kubernetes import (
DbtBuildKubernetesOperator,
DbtCloneKubernetesOperator,
DbtLSKubernetesOperator,
DbtRunKubernetesOperator,
DbtRunOperationKubernetesOperator,
Expand Down Expand Up @@ -106,6 +109,7 @@
try:
from cosmos.operators.azure_container_instance import (
DbtBuildAzureContainerInstanceOperator,
DbtCloneAzureContainerInstanceOperator,
DbtLSAzureContainerInstanceOperator,
DbtRunAzureContainerInstanceOperator,
DbtRunOperationAzureContainerInstanceOperator,
Expand Down Expand Up @@ -142,6 +146,7 @@
try:
from cosmos.operators.aws_eks import (
DbtBuildAwsEksOperator,
DbtCloneAwsEksOperator,
DbtLSAwsEksOperator,
DbtRunAwsEksOperator,
DbtRunOperationAwsEksOperator,
Expand Down Expand Up @@ -170,6 +175,7 @@
try:
from cosmos.operators.gcp_cloud_run_job import (
DbtBuildGcpCloudRunJobOperator,
DbtCloneGcpCloudRunJobOperator,
DbtLSGcpCloudRunJobOperator,
DbtRunGcpCloudRunJobOperator,
DbtRunOperationGcpCloudRunJobOperator,
Expand Down Expand Up @@ -217,6 +223,7 @@
"DbtResourceType",
# Local Execution Mode
"DbtBuildLocalOperator",
"DbtCloneLocalOperator",
"DbtDepsLocalOperator", # deprecated, to be delete in Cosmos 2.x
"DbtLSLocalOperator",
"DbtRunLocalOperator",
Expand All @@ -226,6 +233,7 @@
"DbtTestLocalOperator",
# Docker Execution Mode
"DbtBuildDockerOperator",
"DbtCloneDockerOperator",
"DbtLSDockerOperator",
"DbtRunDockerOperator",
"DbtRunOperationDockerOperator",
Expand All @@ -234,6 +242,7 @@
"DbtTestDockerOperator",
# Kubernetes Execution Mode
"DbtBuildKubernetesOperator",
"DbtCloneKubernetesOperator",
"DbtLSKubernetesOperator",
"DbtRunKubernetesOperator",
"DbtRunOperationKubernetesOperator",
Expand All @@ -242,6 +251,7 @@
"DbtTestKubernetesOperator",
# Azure Container Instance Execution Mode
"DbtBuildAzureContainerInstanceOperator",
"DbtCloneAzureContainerInstanceOperator",
"DbtLSAzureContainerInstanceOperator",
"DbtRunAzureContainerInstanceOperator",
"DbtRunOperationAzureContainerInstanceOperator",
Expand All @@ -250,6 +260,7 @@
"DbtTestAzureContainerInstanceOperator",
# AWS EKS Execution Mode
"DbtBuildAwsEksOperator",
"DbtCloneAwsEksOperator",
"DbtLSAwsEksOperator",
"DbtRunAwsEksOperator",
"DbtRunOperationAwsEksOperator",
Expand All @@ -258,6 +269,7 @@
"DbtTestAwsEksOperator",
# GCP Cloud Run Job Execution Mode
"DbtBuildGcpCloudRunJobOperator",
"DbtCloneGcpCloudRunJobOperator",
"DbtLSGcpCloudRunJobOperator",
"DbtRunGcpCloudRunJobOperator",
"DbtRunOperationGcpCloudRunJobOperator",
Expand Down
5 changes: 5 additions & 0 deletions cosmos/operators/airflow_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cosmos.operators.base import AbstractDbtBaseOperator
from cosmos.operators.local import (
DbtBuildLocalOperator,
DbtCloneLocalOperator,
DbtCompileLocalOperator,
DbtLocalBaseOperator,
DbtLSLocalOperator,
Expand Down Expand Up @@ -188,3 +189,7 @@ class DbtRunOperationAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtRunOpe

class DbtCompileAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtCompileLocalOperator): # type: ignore
pass


class DbtCloneAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtCloneLocalOperator):
pass
10 changes: 10 additions & 0 deletions cosmos/operators/aws_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from cosmos.operators.kubernetes import (
DbtBuildKubernetesOperator,
DbtCloneKubernetesOperator,
DbtKubernetesBaseOperator,
DbtLSKubernetesOperator,
DbtRunKubernetesOperator,
Expand Down Expand Up @@ -160,3 +161,12 @@ class DbtRunOperationAwsEksOperator(DbtAwsEksBaseOperator, DbtRunOperationKubern

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtCloneAwsEksOperator(DbtAwsEksBaseOperator, DbtCloneKubernetesOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
tatiana marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 10 additions & 0 deletions cosmos/operators/azure_container_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtCloneMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
Expand Down Expand Up @@ -167,3 +168,12 @@ class DbtRunOperationAzureContainerInstanceOperator(DbtRunOperationMixin, DbtAzu

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtCloneAzureContainerInstanceOperator(DbtCloneMixin, DbtAzureContainerInstanceBaseOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any):
tatiana marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(*args, **kwargs)
25 changes: 25 additions & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,28 @@

base_cmd = ["compile"]
ui_color = "#877c7c"


class DbtCloneMixin:
"""Mixin for dbt clone command."""

base_cmd = ["clone"]
ui_color = "#83a300"

def __init__(self, full_refresh: bool | str = False, **kwargs: Any) -> None:
self.full_refresh = full_refresh
super().__init__(**kwargs)

def add_cmd_flags(self) -> list[str]:
flags = []

if isinstance(self.full_refresh, str):
# Handle template fields when render_template_as_native_obj=False
full_refresh = to_boolean(self.full_refresh)

Check warning on line 458 in cosmos/operators/base.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/base.py#L458

Added line #L458 was not covered by tests
else:
full_refresh = self.full_refresh

if full_refresh is True:
flags.append("--full-refresh")

return flags
10 changes: 10 additions & 0 deletions cosmos/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtCloneMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
Expand Down Expand Up @@ -148,3 +149,12 @@ class DbtRunOperationDockerOperator(DbtRunOperationMixin, DbtDockerBaseOperator)

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtCloneDockerOperator(DbtCloneMixin, DbtDockerBaseOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
10 changes: 10 additions & 0 deletions cosmos/operators/gcp_cloud_run_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtCloneMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
Expand Down Expand Up @@ -180,3 +181,12 @@ class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRun

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtCloneGcpCloudRunJobOperator(DbtCloneMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
8 changes: 8 additions & 0 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtCloneMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
Expand Down Expand Up @@ -260,3 +261,10 @@ class DbtRunOperationKubernetesOperator(DbtRunOperationMixin, DbtKubernetesBaseO

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtCloneKubernetesOperator(DbtCloneMixin, DbtKubernetesBaseOperator):
"""Executes a dbt core clone command."""

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
10 changes: 10 additions & 0 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtCloneMixin,
DbtCompileMixin,
DbtLSMixin,
DbtRunMixin,
Expand Down Expand Up @@ -1009,3 +1010,12 @@ class DbtCompileLocalOperator(DbtCompileMixin, DbtLocalBaseOperator):
def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs["should_upload_compiled_sql"] = True
super().__init__(*args, **kwargs)


class DbtCloneLocalOperator(DbtCloneMixin, DbtLocalBaseOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
10 changes: 10 additions & 0 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from cosmos.log import get_logger
from cosmos.operators.local import (
DbtBuildLocalOperator,
DbtCloneLocalOperator,
DbtDocsLocalOperator,
DbtLocalBaseOperator,
DbtLSLocalOperator,
Expand Down Expand Up @@ -286,3 +287,12 @@ class DbtDocsVirtualenvOperator(DbtVirtualenvBaseOperator, DbtDocsLocalOperator)

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)


class DbtCloneVirtualenvOperator(DbtVirtualenvBaseOperator, DbtCloneLocalOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
50 changes: 50 additions & 0 deletions dev/dags/example_operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
from datetime import datetime
from pathlib import Path

from airflow import DAG

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profiles_yml_filepath=DBT_PROFILE_PATH,
)

with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
)
run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)

# [START clone_example]
clone_operator = DbtCloneLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="clone",
dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
install_deps=True,
append_env=True,
)
# [END clone_example]

seed_operator >> run_operator >> clone_operator
1 change: 1 addition & 0 deletions docs/getting_started/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Azure Container Instance Execution Mode <azure-container-instance>
GCP Cloud Run Job Execution Mode <gcp-cloud-run-job>
dbt and Airflow Similar Concepts <dbt-airflow-concepts>
Operators <operators>


Getting Started
Expand Down
24 changes: 24 additions & 0 deletions docs/getting_started/operators.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
.. _operators:

Operators
=========

Cosmos exposes individual operators that correspond to specific dbt commands, which can be used just like traditional
`Apache Airflow® <https://airflow.apache.org/>`_ operators. Cosmos names these operators using the format ``Dbt<dbt-command><execution-mode>Operator``. For example, ``DbtBuildLocalOperator``.

Clone
-----

Requirement

* Cosmos >= 1.8.0
* dbt-core >= 1.6.0

The ``DbtCloneLocalOperator`` implement `dbt clone <https://docs.getdbt.com/reference/commands/clone>`_ command.

Example of how to use

.. literalinclude:: ../../dev/dags/example_operators.py
:language: python
:start-after: [START clone_example]
:end-before: [END clone_example]
2 changes: 2 additions & 0 deletions tests/operators/test_aws_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from cosmos.operators.aws_eks import (
DbtBuildAwsEksOperator,
DbtCloneAwsEksOperator,
DbtLSAwsEksOperator,
DbtRunAwsEksOperator,
DbtSeedAwsEksOperator,
Expand Down Expand Up @@ -44,6 +45,7 @@ def test_dbt_kubernetes_build_command():
"test": DbtTestAwsEksOperator(**base_kwargs),
"build": DbtBuildAwsEksOperator(**base_kwargs),
"seed": DbtSeedAwsEksOperator(**base_kwargs),
"clone": DbtCloneAwsEksOperator(**base_kwargs),
}

for command_name, command_operator in result_map.items():
Expand Down
Loading