Skip to content

Commit

Permalink
Fix running models that use alias while supporting dbt versions (astr…
Browse files Browse the repository at this point in the history
…onomer#662)

Current version, cosmos will got bug `Not found node` because it run
with alias selection as: `--models customers_abc_v1 ` and `--models
customers_abc_v2` .
I propose to parsing node selection in `unique_id` instead of using
`alias` .
So node selection should be: `unique_id.split('.', 2)[2]` , reference to
[function](https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/contracts/graph/node_args.py#L26)
and [resource-details
document](https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details).
In addition, with this change help cosmos also support versioned models
on dbt-core `>=1.5.0` instead `>=1.6.0` as current version.

Cosmos will support dynamic aliases and versioned models

Closes: astronomer#636
  • Loading branch information
binhnq94 authored and arojasb3 committed Jul 14, 2024
1 parent 79aff93 commit 2f65657
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 75 deletions.
12 changes: 6 additions & 6 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ def create_test_task_metadata(
task_args["indirect_selection"] = test_indirect_selection.value
if node is not None:
if node.resource_type == DbtResourceType.MODEL:
task_args["models"] = node.name
task_args["models"] = node.resource_name
elif node.resource_type == DbtResourceType.SOURCE:
task_args["select"] = f"source:{node.unique_id[len('source.'):]}"
task_args["select"] = f"source:{node.resource_name}"
else: # tested with node.resource_type == DbtResourceType.SEED or DbtResourceType.SNAPSHOT
task_args["select"] = node.name
task_args["select"] = node.resource_name
return TaskMetadata(
id=test_task_name,
operator_class=calculate_operator_class(
Expand All @@ -108,8 +108,8 @@ def create_task_metadata(
:param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.LOCAL, ExecutionMode.KUBERNETES).
Default is ExecutionMode.LOCAL.
:param args: Arguments to be used to instantiate an Airflow Task
:param use_name_as_task_id_prefix: If resource_type is DbtResourceType.MODEL, it determines whether
using name as task id prefix or not. If it is True task_id = <node.name>_run, else task_id=run.
:param use_task_group: It determines whether to use the name as a prefix for the task id or not.
If it is False, then use the name as a prefix for the task id, otherwise do not.
:returns: The metadata necessary to instantiate the source dbt node as an Airflow task.
"""
dbt_resource_to_class = {
Expand All @@ -119,7 +119,7 @@ def create_task_metadata(
DbtResourceType.TEST: "DbtTest",
DbtResourceType.SOURCE: "DbtSource",
}
args = {**args, **{"models": node.name}}
args = {**args, **{"models": node.resource_name}}

if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class:
if node.resource_type == DbtResourceType.MODEL:
Expand Down
29 changes: 18 additions & 11 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class DbtNode:
Metadata related to a dbt node (e.g. model, seed, snapshot, source).
"""

name: str
unique_id: str
resource_type: DbtResourceType
depends_on: list[str]
Expand All @@ -52,6 +51,23 @@ class DbtNode:
has_freshness: bool = False
has_test: bool = False

@property
def resource_name(self) -> str:
"""
Use this property to retrieve the resource name for command generation, for instance: ["dbt", "run", "--models", f"{resource_name}"].
The unique_id format is defined as [<resource_type>.<package>.<resource_name>](https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details).
For a special case like a versioned model, the unique_id follows this pattern: [model.<package>.<resource_name>.<version>](https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/contracts/graph/node_args.py#L26C3-L31)
"""
return self.unique_id.split(".", 2)[2]

@property
def name(self) -> str:
"""
Use this property as the task name or task group name.
Replace period (.) with underscore (_) due to versioned models.
"""
return self.resource_name.replace(".", "_")


def is_freshness_effective(freshness: dict[str, Any]) -> bool:
"""Function to find if a source has null freshness. Scenarios where freshness
Expand Down Expand Up @@ -116,7 +132,6 @@ def parse_dbt_ls_output(project_path: Path, ls_stdout: str) -> dict[str, DbtNode
logger.debug("Skipped dbt ls line: %s", line)
else:
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
Expand Down Expand Up @@ -232,9 +247,6 @@ def load_via_dbt_ls(self) -> None:
This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command
line for both parsing and filtering the nodes.
Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0,
dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ...
Updates in-place:
* self.nodes
* self.filtered_nodes
Expand Down Expand Up @@ -328,8 +340,7 @@ def load_via_custom_parser(self) -> None:
for model_name, model in models:
config = {item.split(":")[0]: item.split(":")[-1] for item in model.config.config_selectors}
node = DbtNode(
name=model_name,
unique_id=model_name,
unique_id=f"{model.type.value}.{self.project.project_name}.{model_name}",
resource_type=DbtResourceType(model.type.value),
depends_on=list(model.config.upstream_models),
file_path=Path(
Expand Down Expand Up @@ -362,9 +373,6 @@ def load_from_dbt_manifest(self) -> None:
However, since the Manifest does not represent filters, it relies on the Custom Cosmos implementation
to filter out the nodes relevant to the user (based on self.exclude and self.select).
Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0,
dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ...
Updates in-place:
* self.nodes
* self.filtered_nodes
Expand All @@ -384,7 +392,6 @@ def load_from_dbt_manifest(self) -> None:
resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
for unique_id, node_dict in resources.items():
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=unique_id,
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
Expand Down
4 changes: 4 additions & 0 deletions dev/dags/dbt/model_version/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ models:
- include: all
exclude:
- full_name
config:
alias: '{{ "customers_" ~ var("division", "USA") ~ "_v1" }}'
- v: 2
config:
alias: '{{ "customers_" ~ var("division", "USA") ~ "_v2" }}'

- name: orders
description: This table has basic information about orders, as well as some derived facts based on payments
Expand Down
Loading

0 comments on commit 2f65657

Please sign in to comment.