Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix running models that use alias while supporting dbt versions #662

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ def create_test_task_metadata(
task_args["indirect_selection"] = test_indirect_selection.value
if node is not None:
if node.resource_type == DbtResourceType.MODEL:
task_args["models"] = node.name
task_args["models"] = node.resource_name
elif node.resource_type == DbtResourceType.SOURCE:
task_args["select"] = f"source:{node.unique_id[len('source.'):]}"
task_args["select"] = f"source:{node.resource_name}"
else: # tested with node.resource_type == DbtResourceType.SEED or DbtResourceType.SNAPSHOT
task_args["select"] = node.name
task_args["select"] = node.resource_name
return TaskMetadata(
id=test_task_name,
operator_class=calculate_operator_class(
Expand All @@ -108,8 +108,8 @@ def create_task_metadata(
:param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.LOCAL, ExecutionMode.KUBERNETES).
Default is ExecutionMode.LOCAL.
:param args: Arguments to be used to instantiate an Airflow Task
:param use_name_as_task_id_prefix: If resource_type is DbtResourceType.MODEL, it determines whether
using name as task id prefix or not. If it is True task_id = <node.name>_run, else task_id=run.
:param use_task_group: It determines whether to use the name as a prefix for the task id or not.
If it is False, then use the name as a prefix for the task id, otherwise do not.
:returns: The metadata necessary to instantiate the source dbt node as an Airflow task.
"""
dbt_resource_to_class = {
Expand All @@ -118,7 +118,7 @@ def create_task_metadata(
DbtResourceType.SEED: "DbtSeed",
DbtResourceType.TEST: "DbtTest",
}
args = {**args, **{"models": node.name}}
args = {**args, **{"models": node.resource_name}}

if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class:
if node.resource_type == DbtResourceType.MODEL:
Expand Down
29 changes: 18 additions & 11 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class DbtNode:
Metadata related to a dbt node (e.g. model, seed, snapshot).
"""

name: str
unique_id: str
resource_type: DbtResourceType
depends_on: list[str]
Expand All @@ -51,6 +50,23 @@ class DbtNode:
config: dict[str, Any] = field(default_factory=lambda: {})
has_test: bool = False

@property
def resource_name(self) -> str:
"""
Use this property to retrieve the resource name for command generation, for instance: ["dbt", "run", "--models", f"{resource_name}"].
The unique_id format is defined as [<resource_type>.<package>.<resource_name>](https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details).
For a special case like a versioned model, the unique_id follows this pattern: [model.<package>.<resource_name>.<version>](https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/contracts/graph/node_args.py#L26C3-L31)
"""
return self.unique_id.split(".", 2)[2]

@property
def name(self) -> str:
"""
Use this property as the task name or task group name.
Replace period (.) with underscore (_) due to versioned models.
"""
return self.resource_name.replace(".", "_")
tatiana marked this conversation as resolved.
Show resolved Hide resolved


def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str:
"""Run a command in a subprocess, returning the stdout."""
Expand Down Expand Up @@ -89,7 +105,6 @@ def parse_dbt_ls_output(project_path: Path, ls_stdout: str) -> dict[str, DbtNode
logger.debug("Skipped dbt ls line: %s", line)
else:
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
Expand Down Expand Up @@ -195,9 +210,6 @@ def load_via_dbt_ls(self) -> None:
This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command
line for both parsing and filtering the nodes.

Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0,
dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ...

Updates in-place:
* self.nodes
* self.filtered_nodes
Expand Down Expand Up @@ -291,8 +303,7 @@ def load_via_custom_parser(self) -> None:
for model_name, model in models:
config = {item.split(":")[0]: item.split(":")[-1] for item in model.config.config_selectors}
node = DbtNode(
name=model_name,
unique_id=model_name,
unique_id=f"{model.type.value}.{self.project.project_name}.{model_name}",
resource_type=DbtResourceType(model.type.value),
depends_on=list(model.config.upstream_models),
file_path=Path(
Expand Down Expand Up @@ -325,9 +336,6 @@ def load_from_dbt_manifest(self) -> None:
However, since the Manifest does not represent filters, it relies on the Custom Cosmos implementation
to filter out the nodes relevant to the user (based on self.exclude and self.select).

Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0,
dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ...

Updates in-place:
* self.nodes
* self.filtered_nodes
Expand All @@ -347,7 +355,6 @@ def load_from_dbt_manifest(self) -> None:
resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
for unique_id, node_dict in resources.items():
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=unique_id,
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
Expand Down
4 changes: 4 additions & 0 deletions dev/dags/dbt/model_version/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ models:
- include: all
exclude:
- full_name
config:
alias: '{{ "customers_" ~ var("division", "USA") ~ "_v1" }}'
- v: 2
config:
alias: '{{ "customers_" ~ var("division", "USA") ~ "_v2" }}'

- name: orders
description: This table has basic information about orders, as well as some derived facts based on payments
Expand Down
Loading
Loading