Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-388] Add group metadata info to LogModelResult and LogTestResult #10775

Merged
merged 21 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240926-143448.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add group metadata info to LogModelResult and LogTestResult
time: 2024-09-26T14:34:48.334703+01:00
custom:
Author: aranke
Issue: "10775"
2 changes: 1 addition & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,7 @@ def to_logging_dict(self) -> Dict[str, Union[str, Dict[str, str]]]:
return {
"name": self.name,
"package_name": self.package_name,
"owner": self.owner.to_dict(),
"owner": self.owner.to_dict(omit_none=True),
}


Expand Down
17 changes: 10 additions & 7 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,12 @@ message SQLRunnerExceptionMsg {
SQLRunnerException data = 2;
}

message Group {
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}

// Q007
message LogTestResult {
NodeInfo node_info = 1;
Expand All @@ -1280,6 +1286,8 @@ message LogTestResult {
int32 num_models = 5;
float execution_time = 6;
int32 num_failures = 7;
Group group = 8;
string attached_node = 9;
}

message LogTestResultMsg {
Expand Down Expand Up @@ -1312,6 +1320,7 @@ message LogModelResult {
int32 index = 4;
int32 total = 5;
float execution_time = 6;
Group group = 7;
}

message LogModelResultMsg {
Expand Down Expand Up @@ -1373,7 +1382,7 @@ message LogFreshnessResultMsg {
LogFreshnessResult data = 2;
}

// Q018
// Q019
message LogNodeNoOpResult {
NodeInfo node_info = 1;
string description = 2;
Expand Down Expand Up @@ -1820,12 +1829,6 @@ message ServingDocsExitInfoMsg {
ServingDocsExitInfo data = 2;
}

message Group {
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}

// Z021
message RunResultWarning {
string resource_type = 1;
Expand Down
574 changes: 287 additions & 287 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions core/dbt/task/group_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import AbstractSet, Dict, Optional, Union

from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import Group

_node_id_to_group_name_map: Dict[str, str] = {}
_group_name_to_group_map: Dict[str, Group] = {}


def init(manifest: Optional[Manifest], selected_ids: AbstractSet[str]) -> None:
if not manifest:
return

Check warning on line 12 in core/dbt/task/group_lookup.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/group_lookup.py#L12

Added line #L12 was not covered by tests

_every_group_name_to_group_map = {v.name: v for v in manifest.groups.values()}

for group_name, node_ids in manifest.group_map.items():
for node_id in node_ids:
# only add node to lookup if it's selected
if node_id in selected_ids:
_node_id_to_group_name_map[node_id] = group_name

# only add group to lookup if it's not already there and if node is selected
if group_name not in _group_name_to_group_map:
_group_name_to_group_map[group_name] = _every_group_name_to_group_map[
group_name
]


def get(node_id: str) -> Optional[Dict[str, Union[str, Dict[str, str]]]]:
group_name = _node_id_to_group_name_map.get(node_id)

if group_name is None:
return None

group = _group_name_to_group_map.get(group_name)

if group is None:
return None

Check warning on line 38 in core/dbt/task/group_lookup.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/group_lookup.py#L38

Added line #L38 was not covered by tests

return group.to_logging_dict()
31 changes: 13 additions & 18 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Dict, Optional
from typing import Dict, Optional, Union

from dbt.artifacts.schemas.results import NodeStatus
from dbt.contracts.graph.nodes import Group
from dbt.events.types import (
CheckNodeTestFailure,
EndOfRunSummary,
Expand All @@ -14,6 +13,7 @@
StatsLine,
)
from dbt.node_types import NodeType
from dbt.task import group_lookup
from dbt_common.events.base_types import EventLevel
from dbt_common.events.format import pluralize
from dbt_common.events.functions import fire_event
Expand Down Expand Up @@ -70,7 +70,10 @@ def print_run_status_line(results) -> None:


def print_run_result_error(
result, newline: bool = True, is_warning: bool = False, group: Optional[Group] = None
result,
newline: bool = True,
is_warning: bool = False,
group: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
) -> None:
# set node_info for logging events
node_info = None
Expand All @@ -80,36 +83,31 @@ def print_run_result_error(
if newline:
fire_event(Formatting(""))
if is_warning:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultWarning(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
group=group,
)
)
else:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultFailure(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
group=group,
)
)

if result.message:
if is_warning:
fire_event(RunResultWarningMessage(msg=result.message, node_info=node_info))
else:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultError(msg=result.message, node_info=node_info, group=group_dict)
)
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group))
else:
fire_event(RunResultErrorNoMessage(status=result.status, node_info=node_info))

Expand All @@ -129,13 +127,10 @@ def print_run_result_error(
elif result.message is not None:
if newline:
fire_event(Formatting(""))
group_dict = group.to_logging_dict() if group else None
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group_dict))
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group))


def print_run_end_messages(
results, keyboard_interrupt: bool = False, groups: Optional[Dict[str, Group]] = None
) -> None:
def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
errors, warnings, partial_successes = [], [], []
for r in results:
if r.status in (NodeStatus.RuntimeErr, NodeStatus.Error, NodeStatus.Fail):
Expand All @@ -160,11 +155,11 @@ def print_run_end_messages(
)

for error in errors:
group = groups.get(error.node.unique_id) if groups and hasattr(error, "node") else None
group = group_lookup.get(error.node.unique_id) if hasattr(error, "node") else None
print_run_result_error(error, is_warning=False, group=group)

for warning in warnings:
group = groups.get(warning.node.unique_id) if groups and hasattr(warning, "node") else None
group = group_lookup.get(warning.node.unique_id) if hasattr(warning, "node") else None
print_run_result_error(warning, is_warning=True, group=group)

print_run_status_line(results)
19 changes: 7 additions & 12 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from dbt_common.events.types import Formatting
from dbt_common.exceptions import DbtValidationError

from . import group_lookup
from .compile import CompileRunner, CompileTask
from .printer import get_counts, print_run_end_messages

Expand Down Expand Up @@ -215,6 +216,7 @@ def print_start_line(self):

def print_result_line(self, result):
description = self.describe_node()
group = group_lookup.get(self.node.unique_id)
if result.status == NodeStatus.Error:
status = result.status
level = EventLevel.ERROR
Expand All @@ -229,6 +231,7 @@ def print_result_line(self, result):
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
group=group,
),
level=level,
)
Expand All @@ -242,6 +245,7 @@ def print_batch_result_line(
exception: Optional[Exception],
):
description = self.describe_batch(batch_start)
group = group_lookup.get(self.node.unique_id)
if result.status == NodeStatus.Error:
status = result.status
level = EventLevel.ERROR
Expand All @@ -256,6 +260,7 @@ def print_batch_result_line(
total=batch_total,
execution_time=result.execution_time,
node_info=self.node.node_info,
group=group,
),
level=level,
)
Expand Down Expand Up @@ -716,6 +721,7 @@ def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None:
self.populate_adapter_cache(adapter, required_schemas)
self.populate_microbatch_batches(selected_uids)
self.safe_run_hooks(adapter, RunHookType.Start, {})
group_lookup.init(self.manifest, selected_uids)

def after_run(self, adapter, results) -> None:
# in on-run-end hooks, provide the value 'database_schemas', which is a
Expand Down Expand Up @@ -753,17 +759,6 @@ def get_node_selector(self) -> ResourceTypeSelector:
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return ModelRunner

def get_groups_for_nodes(self, nodes):
node_to_group_name_map = {i: k for k, v in self.manifest.group_map.items() for i in v}
group_name_to_group_map = {v.name: v for v in self.manifest.groups.values()}

return {
node.unique_id: group_name_to_group_map.get(node_to_group_name_map.get(node.unique_id))
for node in nodes
}

def task_end_messages(self, results) -> None:
groups = self.get_groups_for_nodes([r.node for r in results if hasattr(r, "node")])

if results:
print_run_end_messages(results, groups=groups)
print_run_end_messages(results)
12 changes: 9 additions & 3 deletions core/dbt/task/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from dbt_common.exceptions import DbtBaseException, DbtRuntimeError
from dbt_common.ui import green, red

from . import group_lookup
from .compile import CompileRunner
from .run import RunTask

Expand Down Expand Up @@ -93,7 +94,6 @@ class UnitTestResultData(dbtClassMixin):

class TestRunner(CompileRunner):
_ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
_LOG_TEST_RESULT_EVENTS = LogTestResult

def describe_node_name(self) -> str:
if self.node.resource_type == NodeType.Unit:
Expand All @@ -107,16 +107,22 @@ def describe_node(self) -> str:

def print_result_line(self, result):
model = result.node
group = group_lookup.get(model.unique_id)
attached_node = (
result.node.attached_node if isinstance(result.node, GenericTestNode) else None
)

fire_event(
self._LOG_TEST_RESULT_EVENTS(
LogTestResult(
name=self.describe_node_name(),
status=str(result.status),
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
num_failures=result.failures,
group=group,
attached_node=attached_node,
),
level=LogTestResult.status_to_level(str(result.status)),
)
Expand Down Expand Up @@ -298,7 +304,7 @@ def build_test_run_result(self, test: TestNode, result: TestResultData) -> RunRe
failures = result.failures
elif result.should_warn:
if get_flags().WARN_ERROR or get_flags().WARN_ERROR_OPTIONS.includes(
self._LOG_TEST_RESULT_EVENTS.__name__
LogTestResult.__name__
):
status = TestStatus.Fail
message = f"Got {num_errors}, configured to fail if {test.config.warn_if}"
Expand Down
7 changes: 7 additions & 0 deletions tests/functional/adapter/basic/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,15 @@

generic_test_view_yml = """
version: 2

groups:
- name: my_group
owner:
name: group_owner

models:
- name: view_model
group: my_group
columns:
- name: id
data_tests:
Expand Down
12 changes: 10 additions & 2 deletions tests/functional/adapter/basic/test_generic_tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from dbt.tests.util import run_dbt
from dbt.tests.util import run_dbt, run_dbt_and_capture
from tests.functional.adapter.basic.files import (
base_table_sql,
base_view_sql,
Expand Down Expand Up @@ -58,9 +58,17 @@ def test_generic_tests(self, project):
assert len(results) == 2

# test command, all tests
results = run_dbt(["test"])
results, log_output = run_dbt_and_capture(["test", "--log-format", "json"])
assert len(results) == 3

result_log_lines = [
line for line in log_output.split("\n") if "LogTestResult" in line and "group" in line
]
assert len(result_log_lines) == 1
assert "my_group" in result_log_lines[0]
assert "group_owner" in result_log_lines[0]
assert "model.generic_tests.view_model" in result_log_lines[0]


class TestGenericTests(BaseGenericTests):
pass
Loading
Loading