Skip to content

Commit

Permalink
Merge branch 'main' into additional-docs-for-manifest-path
Browse files Browse the repository at this point in the history
  • Loading branch information
dwreeves authored Dec 30, 2023
2 parents 236aeed + 7cfe715 commit e4b16a2
Show file tree
Hide file tree
Showing 13 changed files with 239 additions and 9 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ repos:
- --py37-plus
- --keep-runtime-typing
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.7
rev: v0.1.9
hooks:
- id: ruff
args:
- --fix
- repo: https://github.com/psf/black
rev: 23.11.0
rev: 23.12.1
hooks:
- id: black
args: ["--config", "./pyproject.toml"]
Expand All @@ -71,7 +71,7 @@ repos:
alias: black
additional_dependencies: [black>=22.10.0]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.7.1"
rev: "v1.8.0"

hooks:
- id: mypy
Expand Down
11 changes: 10 additions & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class RenderConfig:
dbt_executable_path: str | Path = get_system_dbt()
env_vars: dict[str, str] | None = None
dbt_project_path: InitVar[str | Path | None] = None
dbt_ls_path: Path | None = None

project_path: Path | None = field(init=False)

Expand Down Expand Up @@ -91,6 +92,15 @@ def validate_dbt_command(self, fallback_cmd: str | Path = "") -> None:
f"<{self.dbt_executable_path}>" + (f" and <{fallback_cmd}>." if fallback_cmd else ".")
)

def is_dbt_ls_file_available(self) -> bool:
"""
Check if the `dbt ls` output is set and if the file exists.
"""
if not self.dbt_ls_path:
return False

return self.dbt_ls_path.exists()


class ProjectConfig:
"""
Expand Down Expand Up @@ -287,7 +297,6 @@ class ExecutionConfig:
dbt_executable_path: str | Path = field(default_factory=get_system_dbt)

dbt_project_path: InitVar[str | Path | None] = None

project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class LoadMode(Enum):
AUTOMATIC = "automatic"
CUSTOM = "custom"
DBT_LS = "dbt_ls"
DBT_LS_FILE = "dbt_ls_file"
DBT_MANIFEST = "dbt_manifest"


Expand Down
25 changes: 25 additions & 0 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def load(
load_method = {
LoadMode.CUSTOM: self.load_via_custom_parser,
LoadMode.DBT_LS: self.load_via_dbt_ls,
LoadMode.DBT_LS_FILE: self.load_via_dbt_ls_file,
LoadMode.DBT_MANIFEST: self.load_from_dbt_manifest,
}

Expand Down Expand Up @@ -280,6 +281,30 @@ def load_via_dbt_ls(self) -> None:
logger.info("Total nodes: %i", len(self.nodes))
logger.info("Total filtered nodes: %i", len(self.nodes))

def load_via_dbt_ls_file(self) -> None:
"""
This is between dbt ls and full manifest. It allows to use the output (needs to be json output) of the dbt ls as a
file stored in the image you run Cosmos on. The advantage is that you can use the parser from LoadMode.DBT_LS without
actually running dbt ls every time. BUT you will need one dbt ls file for each separate group.
This technically should increase performance and also removes the necessity to have your whole dbt project copied
to the airflow image.
"""
logger.info("Trying to parse the dbt project `%s` using a dbt ls output file...", self.project.project_name)

if not self.render_config.is_dbt_ls_file_available():
raise CosmosLoadDbtException(f"Unable to load dbt ls file using {self.render_config.dbt_ls_path}")

project_path = self.render_config.project_path
if not project_path:
raise CosmosLoadDbtException("Unable to load dbt ls file without RenderConfig.project_path")
with open(self.render_config.dbt_ls_path) as fp: # type: ignore[arg-type]
dbt_ls_output = fp.read()
nodes = parse_dbt_ls_output(project_path=project_path, ls_stdout=dbt_ls_output)

self.nodes = nodes
self.filtered_nodes = nodes

def load_via_custom_parser(self) -> None:
"""
This is the least accurate way of loading `dbt` projects and filtering them out, since it uses custom Cosmos
Expand Down
7 changes: 6 additions & 1 deletion cosmos/profiles/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ class BaseProfileMapping(ABC):

_conn: Connection | None = None

def __init__(self, conn_id: str, profile_args: dict[str, Any] | None = None):
def __init__(self, conn_id: str, profile_args: dict[str, Any] | None = None, disable_event_tracking: bool = False):
self.conn_id = conn_id
self.profile_args = profile_args or {}
self._validate_profile_args()
self.disable_event_tracking = disable_event_tracking

def _validate_profile_args(self) -> None:
"""
Expand Down Expand Up @@ -178,6 +179,10 @@ def get_profile_file_contents(
"outputs": {target_name: profile_vars},
}
}

if self.disable_event_tracking:
profile_contents["config"] = {"send_anonymous_usage_stats": "False"}

return str(yaml.dump(profile_contents, indent=4))

def get_dbt_value(self, name: str) -> Any:
Expand Down
9 changes: 9 additions & 0 deletions dev/dags/dbt/jaffle_shop/dbt_ls_models_staging.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
14:26:04 Running with dbt=1.6.9
14:26:04 Registered adapter: exasol=1.6.2
14:26:04 Found 5 models, 3 seeds, 20 tests, 0 sources, 0 exposures, 0 metrics, 366 macros, 0 groups, 0 semantic models
{"name": "stg_customers", "resource_type": "model", "package_name": "jaffle_shop", "original_file_path": "models/staging/stg_customers.sql", "unique_id": "model.jaffle_shop.stg_customers", "alias": "stg_customers", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": [], "nodes": ["seed.jaffle_shop.raw_customers"]}}
{"name": "stg_orders", "resource_type": "model", "package_name": "jaffle_shop", "original_file_path": "models/staging/stg_orders.sql", "unique_id": "model.jaffle_shop.stg_orders", "alias": "stg_orders", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": [], "nodes": ["seed.jaffle_shop.raw_orders"]}}
{"name": "stg_payments", "resource_type": "model", "package_name": "jaffle_shop", "original_file_path": "models/staging/stg_payments.sql", "unique_id": "model.jaffle_shop.stg_payments", "alias": "stg_payments", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": [], "nodes": ["seed.jaffle_shop.raw_payments"]}}
{"name": "raw_customers", "resource_type": "seed", "package_name": "jaffle_shop", "original_file_path": "seeds/raw_customers.csv", "unique_id": "seed.jaffle_shop.raw_customers", "alias": "raw_customers", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "seed", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "quote_columns": null, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": []}}
{"name": "raw_orders", "resource_type": "seed", "package_name": "jaffle_shop", "original_file_path": "seeds/raw_orders.csv", "unique_id": "seed.jaffle_shop.raw_orders", "alias": "raw_orders", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "seed", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "quote_columns": null, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": []}}
{"name": "raw_payments", "resource_type": "seed", "package_name": "jaffle_shop", "original_file_path": "seeds/raw_payments.csv", "unique_id": "seed.jaffle_shop.raw_payments", "alias": "raw_payments", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "seed", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "quote_columns": null, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": []}}
7 changes: 6 additions & 1 deletion dev/dags/user_defined_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig, LoadMode

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
PROFILES_FILE_PATH = Path(DBT_ROOT_PATH, "jaffle_shop", "profiles.yml")
DBT_LS_PATH = Path(DBT_ROOT_PATH, "jaffle_shop", "dbt_ls_models_staging.txt")


@dag(
Expand All @@ -35,6 +36,10 @@ def user_defined_profile() -> None:
target_name="dev",
profiles_yml_filepath=PROFILES_FILE_PATH,
),
render_config=RenderConfig(
load_method=LoadMode.DBT_LS_FILE,
dbt_ls_path=DBT_LS_PATH,
),
operator_args={"append_env": True, "install_deps": True},
default_args={"retries": 2},
)
Expand Down
22 changes: 22 additions & 0 deletions docs/configuration/parsing-methods.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ Cosmos offers several options to parse your dbt project:
- ``automatic``. Tries to find a user-supplied ``manifest.json`` file. If it can't find one, it will run ``dbt ls`` to generate one. If that fails, it will use Cosmos' dbt parser.
- ``dbt_manifest``. Parses a user-supplied ``manifest.json`` file. This can be generated manually with dbt commands or via a CI/CD process.
- ``dbt_ls``. Parses a dbt project directory using the ``dbt ls`` command.
- ``dbt_ls_file``. Parses a dbt project directory using the output of ``dbt ls`` command from a file.
- ``custom``. Uses Cosmos' custom dbt parser, which extracts dependencies from your dbt's model code.

There are benefits and drawbacks to each method:

- ``dbt_manifest``: You have to generate the manifest file on your own. When using the manifest, Cosmos gets a complete set of metadata about your models. However, Cosmos uses its own selecting & excluding logic to determine which models to run, which may not be as robust as dbt's.
- ``dbt_ls``: Cosmos will generate the manifest file for you. This method uses dbt's metadata AND dbt's selecting/excluding logic. This is the most robust method. However, this requires the dbt executable to be installed on your machine (either on the host directly or in a virtual environment).
- ``dbt_ls_file`` (new in 1.3): Path to a file containing the ``dbt ls`` output. To use this method, run ``dbt ls`` using ``--output json`` and store the output in a file. ``RenderConfig.select`` and ``RenderConfig.exclude`` will not work using this method.
- ``custom``: Cosmos will parse your project and model files for you. This means that Cosmos will not have access to dbt's metadata. However, this method does not require the dbt executable to be installed on your machine.

If you're using the ``local`` mode, you should use the ``dbt_ls`` method.
Expand Down Expand Up @@ -78,6 +80,26 @@ To use this:
# ...,
)
``dbt_ls_file``
----------------

.. note::
New in Cosmos 1.3.

If you provide the output of ``dbt ls --output json`` as a file, you can use this to parse similar to ``dbt_ls``.
You can supply a ``dbt_ls_path`` parameter on the DbtDag / DbtTaskGroup with a path to a ``dbt_ls_output.txt`` file.
Check `this Dag <https://github.com/astronomer/astronomer-cosmos/blob/main/dev/dags/user_defined_profile.py>`_ for an example.

To use this:

.. code-block:: python
DbtDag(
render_config=RenderConfig(
load_method=LoadMode.DBT_MANIFEST, dbt_ls_path="/path/to/dbt_ls_file.txt"
)
# ...,
)
``custom``
----------
Expand Down
31 changes: 31 additions & 0 deletions docs/templates/index.rst.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,37 @@ but override the ``database`` and ``schema`` values:
Note that when using a profile mapping, the profiles.yml file gets generated with the profile name and target name
you specify in ``ProfileConfig``.

Disabling dbt event tracking
--------------------------------
.. versionadded:: 1.3

By default `dbt will track events <https://docs.getdbt.com/reference/global-configs/usage-stats>`_ by sending anonymous usage data
when dbt commands are invoked. Users have an option to opt out of event tracking by updating their ``profiles.yml`` file.

If you'd like to disable this behavior in the Cosmos generated profile, you can pass ``disable_event_tracking=True`` to the profile mapping like in
the example below:

.. code-block:: python

from cosmos.profiles import SnowflakeUserPasswordProfileMapping

profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="my_snowflake_conn_id",
profile_args={
"database": "my_snowflake_database",
"schema": "my_snowflake_schema",
},
disable_event_tracking=True,
),
)

dag = DbtDag(profile_config=profile_config, ...)




Using your own profiles.yml file
++++++++++++++++++++++++++++++++++++
Expand Down
88 changes: 87 additions & 1 deletion tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
SAMPLE_MANIFEST_PY = Path(__file__).parent.parent / "sample/manifest_python.json"
SAMPLE_MANIFEST_MODEL_VERSION = Path(__file__).parent.parent / "sample/manifest_model_version.json"
SAMPLE_MANIFEST_SOURCE = Path(__file__).parent.parent / "sample/manifest_source.json"
SAMPLE_DBT_LS_OUTPUT = Path(__file__).parent.parent / "sample/sample_dbt_ls.txt"


@pytest.fixture
Expand Down Expand Up @@ -124,6 +125,52 @@ def test_load_automatic_manifest_is_available(mock_load_from_dbt_manifest):
assert mock_load_from_dbt_manifest.called


@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_file", return_value=None)
def test_load_automatic_dbt_ls_file_is_available(mock_load_via_dbt_ls_file):
project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml",
)
render_config = RenderConfig(dbt_ls_path=SAMPLE_DBT_LS_OUTPUT)
dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, render_config=render_config)
dbt_graph.load(method=LoadMode.DBT_LS_FILE, execution_mode=ExecutionMode.LOCAL)
assert mock_load_via_dbt_ls_file.called


def test_load_dbt_ls_file_without_file():
project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml",
)
render_config = RenderConfig(dbt_ls_path=None)
dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, render_config=render_config)
with pytest.raises(CosmosLoadDbtException) as err_info:
dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_LS_FILE)
assert err_info.value.args[0] == "Unable to load dbt ls file using None"


def test_load_dbt_ls_file_without_project_path():
project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml",
)
render_config = RenderConfig(dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, dbt_project_path=None)
dbt_graph = DbtGraph(
project=project_config,
profile_config=profile_config,
render_config=render_config,
)
with pytest.raises(CosmosLoadDbtException) as err_info:
dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_LS_FILE)
assert err_info.value.args[0] == "Unable to load dbt ls file without RenderConfig.project_path"


@patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", side_effect=None)
@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", return_value=None)
def test_load_automatic_without_manifest_with_profile_yml(mock_load_via_dbt_ls, mock_load_via_custom_parser):
Expand Down Expand Up @@ -214,8 +261,15 @@ def test_load_manifest_with_manifest(mock_load_from_dbt_manifest):
@patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", return_value=None)
@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", return_value=None)
@patch("cosmos.dbt.graph.DbtGraph.load_from_dbt_manifest", return_value=None)
@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_file", return_value=None)
def test_load(
mock_load_from_dbt_manifest, mock_load_via_dbt_ls, mock_load_via_custom_parser, exec_mode, method, expected_function
mock_load_from_dbt_manifest,
mock_load_via_dbt_ls_file,
mock_load_via_dbt_ls,
mock_load_via_custom_parser,
exec_mode,
method,
expected_function,
):
project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
profile_config = ProfileConfig(
Expand Down Expand Up @@ -678,6 +732,38 @@ def test_load_dbt_ls_and_manifest_with_model_version(load_method, postgres_profi
} == set(dbt_graph.nodes["model.jaffle_shop.orders"].depends_on)


@pytest.mark.integration
def test_load_via_dbt_ls_file():
project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml",
)
render_config = RenderConfig(
dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME
)
dbt_graph = DbtGraph(
project=project_config,
profile_config=profile_config,
render_config=render_config,
)
dbt_graph.load(method=LoadMode.DBT_LS_FILE, execution_mode=ExecutionMode.LOCAL)

expected_dbt_nodes = {
"model.jaffle_shop.stg_customers": "stg_customers",
"model.jaffle_shop.stg_orders": "stg_orders",
"model.jaffle_shop.stg_payments": "stg_payments",
}
for unique_id, name in expected_dbt_nodes.items():
assert unique_id in dbt_graph.nodes
assert name == dbt_graph.nodes[unique_id].name
# Test dependencies
assert {"seed.jaffle_shop.raw_customers"} == set(dbt_graph.nodes["model.jaffle_shop.stg_customers"].depends_on)
assert {"seed.jaffle_shop.raw_orders"} == set(dbt_graph.nodes["model.jaffle_shop.stg_orders"].depends_on)
assert {"seed.jaffle_shop.raw_payments"} == set(dbt_graph.nodes["model.jaffle_shop.stg_payments"].depends_on)


@pytest.mark.parametrize(
"stdout,returncode",
[
Expand Down
Loading

0 comments on commit e4b16a2

Please sign in to comment.