Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-rogers-dbt committed Sep 26, 2024
2 parents 90552ca + b590045 commit 45339c0
Show file tree
Hide file tree
Showing 52 changed files with 1,239 additions and 136 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240925-165002.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable `retry` support for microbatch models
time: 2024-09-25T16:50:02.105069-05:00
custom:
Author: QMalcolm MichelleArk
Issue: "10624"
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20240925-131028.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Ignore rendered jinja in configs for state:modified, behind state_modified_compare_more_unrendered_values
behaviour flag
time: 2024-09-25T13:10:28.490042+01:00
custom:
Author: michelleark
Issue: "9564"
3 changes: 3 additions & 0 deletions core/dbt/artifacts/resources/v1/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,16 @@ class ParsedResource(ParsedResourceMandatory):
unrendered_config: Dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=lambda: time.time())
config_call_dict: Dict[str, Any] = field(default_factory=dict)
unrendered_config_call_dict: Dict[str, Any] = field(default_factory=dict)
relation_name: Optional[str] = None
raw_code: str = ""

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if context and context.get("artifact") and "config_call_dict" in dct:
del dct["config_call_dict"]
if context and context.get("artifact") and "unrendered_config_call_dict" in dct:
del dct["unrendered_config_call_dict"]
return dct


Expand Down
21 changes: 21 additions & 0 deletions core/dbt/artifacts/schemas/batch_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Tuple

from dbt_common.dataclass_schema import dbtClassMixin

BatchType = Tuple[datetime, datetime]


@dataclass
class BatchResults(dbtClassMixin):
successful: List[BatchType] = field(default_factory=list)
failed: List[BatchType] = field(default_factory=list)

def __add__(self, other: BatchResults) -> BatchResults:
return BatchResults(
successful=self.successful + other.successful,
failed=self.failed + other.failed,
)
2 changes: 2 additions & 0 deletions core/dbt/artifacts/schemas/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class NodeStatus(StrEnum):
Fail = "fail"
Warn = "warn"
Skipped = "skipped"
PartialSuccess = "partial success"
Pass = "pass"
RuntimeErr = "runtime error"

Expand All @@ -63,6 +64,7 @@ class RunStatus(StrEnum):
Success = NodeStatus.Success
Error = NodeStatus.Error
Skipped = NodeStatus.Skipped
PartialSuccess = NodeStatus.PartialSuccess


class TestStatus(StrEnum):
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/artifacts/schemas/run/v5/run.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import copy
import threading
from dataclasses import dataclass, field
Expand All @@ -17,6 +19,7 @@
get_artifact_schema_version,
schema_version,
)
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.artifacts.schemas.results import (
BaseResult,
ExecutionResult,
Expand All @@ -34,6 +37,7 @@ class RunResult(NodeResult):
agate_table: Optional["agate.Table"] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
batch_results: Optional[BatchResults] = None

@property
def skipped(self):
Expand All @@ -51,6 +55,7 @@ def from_node(cls, node: ResultNode, status: RunStatus, message: Optional[str]):
node=node,
adapter_response={},
failures=None,
batch_results=None,
)


Expand All @@ -67,6 +72,7 @@ class RunResultOutput(BaseResult):
compiled: Optional[bool]
compiled_code: Optional[str]
relation_name: Optional[str]
batch_results: Optional[BatchResults] = None


def process_run_result(result: RunResult) -> RunResultOutput:
Expand All @@ -82,6 +88,7 @@ def process_run_result(result: RunResult) -> RunResultOutput:
message=result.message,
adapter_response=result.adapter_response,
failures=result.failures,
batch_results=result.batch_results,
compiled=result.node.compiled if compiled else None, # type:ignore
compiled_code=result.node.compiled_code if compiled else None, # type:ignore
relation_name=result.node.relation_name if compiled else None, # type:ignore
Expand Down
54 changes: 54 additions & 0 deletions core/dbt/clients/jinja_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,57 @@ def statically_parse_ref_or_source(expression: str) -> Union[RefArgs, List[str]]
raise ParsingError(f"Invalid ref or source expression: {expression}")

return ref_or_source


def statically_parse_unrendered_config(string: str) -> Optional[Dict[str, Any]]:
"""
Given a string with jinja, extract an unrendered config call.
If no config call is present, returns None.
For example, given:
"{{ config(materialized=env_var('DBT_TEST_STATE_MODIFIED')) }}\nselect 1 as id"
returns: {'materialized': "Keyword(key='materialized', value=Call(node=Name(name='env_var', ctx='load'), args=[Const(value='DBT_TEST_STATE_MODIFIED')], kwargs=[], dyn_args=None, dyn_kwargs=None))"}
No config call:
"select 1 as id"
returns: None
"""
# Return early to avoid creating jinja environemt if no config call in input string
if "config(" not in string:
return None

# set 'capture_macros' to capture undefined
env = get_environment(None, capture_macros=True)

global _TESTING_MACRO_CACHE
if test_caching_enabled() and _TESTING_MACRO_CACHE and string in _TESTING_MACRO_CACHE:
parsed = _TESTING_MACRO_CACHE.get(string, None)
func_calls = getattr(parsed, "_dbt_cached_calls")
else:
parsed = env.parse(string)
func_calls = tuple(parsed.find_all(jinja2.nodes.Call))

config_func_calls = list(
filter(
lambda f: hasattr(f, "node") and hasattr(f.node, "name") and f.node.name == "config",
func_calls,
)
)
# There should only be one {{ config(...) }} call per input
config_func_call = config_func_calls[0] if config_func_calls else None

if not config_func_call:
return None

unrendered_config = {}
for kwarg in config_func_call.kwargs:
unrendered_config[kwarg.key] = construct_static_kwarg_value(kwarg)

return unrendered_config


def construct_static_kwarg_value(kwarg):
# Instead of trying to re-assemble complex kwarg value, simply stringify the value
# This is still useful to be able to detect changes in unrendered configs, even if it is
# not an exact representation of the user input.
return str(kwarg)
20 changes: 19 additions & 1 deletion core/dbt/context/context_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dbt.config import IsFQNResource, Project, RuntimeConfig
from dbt.contracts.graph.model_config import get_config_for
from dbt.exceptions import SchemaConfigError
from dbt.flags import get_flags
from dbt.node_types import NodeType
from dbt.utils import fqn_search
from dbt_common.contracts.config.base import BaseConfig, merge_config_dicts
Expand Down Expand Up @@ -292,6 +293,7 @@ def __init__(
project_name: str,
) -> None:
self._config_call_dict: Dict[str, Any] = {}
self._unrendered_config_call_dict: Dict[str, Any] = {}
self._active_project = active_project
self._fqn = fqn
self._resource_type = resource_type
Expand All @@ -301,6 +303,10 @@ def add_config_call(self, opts: Dict[str, Any]) -> None:
dct = self._config_call_dict
merge_config_dicts(dct, opts)

def add_unrendered_config_call(self, opts: Dict[str, Any]) -> None:
# Cannot perform complex merge behaviours on unrendered configs as they may not be appropriate types.
self._unrendered_config_call_dict.update(opts)

def build_config_dict(
self,
base: bool = False,
Expand All @@ -311,12 +317,24 @@ def build_config_dict(
if rendered:
# TODO CT-211
src = ContextConfigGenerator(self._active_project) # type: ignore[var-annotated]
config_call_dict = self._config_call_dict
else:
# TODO CT-211
src = UnrenderedConfigGenerator(self._active_project) # type: ignore[assignment]

# preserve legacy behaviour - using unreliable (potentially rendered) _config_call_dict
if get_flags().state_modified_compare_more_unrendered_values is False:
config_call_dict = self._config_call_dict
else:
# Prefer _config_call_dict if it is available and _unrendered_config_call_dict is not,
# as _unrendered_config_call_dict is unreliable for non-sql nodes (e.g. no jinja config block rendered for python models, etc)
if self._config_call_dict and not self._unrendered_config_call_dict:
config_call_dict = self._config_call_dict
else:
config_call_dict = self._unrendered_config_call_dict

return src.calculate_node_config_dict(
config_call_dict=self._config_call_dict,
config_call_dict=config_call_dict,
fqn=self._fqn,
resource_type=self._resource_type,
project_name=self._project_name,
Expand Down
10 changes: 10 additions & 0 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
UnitTestMacroGenerator,
get_rendered,
)
from dbt.clients.jinja_static import statically_parse_unrendered_config
from dbt.config import IsFQNResource, Project, RuntimeConfig
from dbt.constants import DEFAULT_ENV_PLACEHOLDER
from dbt.context.base import Var, contextmember, contextproperty
Expand Down Expand Up @@ -78,6 +79,7 @@
SecretEnvVarLocationError,
TargetNotFoundError,
)
from dbt.flags import get_flags
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
from dbt.node_types import ModelLanguage, NodeType
from dbt.utils import MultiDict, args_to_dict
Expand Down Expand Up @@ -395,6 +397,14 @@ def __call__(self, *args, **kwargs):
# not call it!
if self.context_config is None:
raise DbtRuntimeError("At parse time, did not receive a context config")

# Track unrendered opts to build parsed node unrendered_config later on
if get_flags().state_modified_compare_more_unrendered_values:
unrendered_config = statically_parse_unrendered_config(self.model.raw_code)
if unrendered_config:
self.context_config.add_unrendered_config_call(unrendered_config)

# Use rendered opts to populate context_config
self.context_config.add_config_call(opts)
return ""

Expand Down
36 changes: 36 additions & 0 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ class SchemaSourceFile(BaseSourceFile):
# created too, but those are in 'sources'
sop: List[SourceKey] = field(default_factory=list)
env_vars: Dict[str, Any] = field(default_factory=dict)
unrendered_configs: Dict[str, Any] = field(default_factory=dict)
pp_dict: Optional[Dict[str, Any]] = None
pp_test_index: Optional[Dict[str, Any]] = None

Expand Down Expand Up @@ -317,6 +318,41 @@ def get_all_test_ids(self):
test_ids.extend(self.data_tests[key][name])
return test_ids

def add_unrendered_config(self, unrendered_config, yaml_key, name, version=None):
versioned_name = f"{name}_v{version}" if version is not None else name

if yaml_key not in self.unrendered_configs:
self.unrendered_configs[yaml_key] = {}

if versioned_name not in self.unrendered_configs[yaml_key]:
self.unrendered_configs[yaml_key][versioned_name] = unrendered_config

def get_unrendered_config(self, yaml_key, name, version=None) -> Optional[Dict[str, Any]]:
versioned_name = f"{name}_v{version}" if version is not None else name

if yaml_key not in self.unrendered_configs:
return None
if versioned_name not in self.unrendered_configs[yaml_key]:
return None

return self.unrendered_configs[yaml_key][versioned_name]

def delete_from_unrendered_configs(self, yaml_key, name):
# We delete all unrendered_configs for this yaml_key/name because the
# entry has been scheduled for reparsing.
if self.get_unrendered_config(yaml_key, name):
del self.unrendered_configs[yaml_key][name]
# Delete all versioned keys associated with name
version_names_to_delete = []
for potential_version_name in self.unrendered_configs[yaml_key]:
if potential_version_name.startswith(f"{name}_v"):
version_names_to_delete.append(potential_version_name)
for version_name in version_names_to_delete:
del self.unrendered_configs[yaml_key][version_name]

if not self.unrendered_configs[yaml_key]:
del self.unrendered_configs[yaml_key]

def add_env_var(self, var, yaml_key, name):
if yaml_key not in self.env_vars:
self.env_vars[yaml_key] = {}
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.schemas.batch_results import BatchType
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
from dbt.contracts.graph.unparsed import (
Expand Down Expand Up @@ -442,6 +443,8 @@ def resource_class(cls) -> Type[HookNodeResource]:

@dataclass
class ModelNode(ModelResource, CompiledNode):
batches: Optional[List[BatchType]] = None

@classmethod
def resource_class(cls) -> Type[ModelResource]:
return ModelResource
Expand Down
4 changes: 3 additions & 1 deletion core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,17 +337,19 @@ class ProjectFlags(ExtensibleDbtClassMixin):
warn_error_options: Optional[Dict[str, Union[str, List[str]]]] = None
write_json: Optional[bool] = None

# legacy behaviors
# legacy behaviors - https://github.com/dbt-labs/dbt-core/blob/main/docs/guides/behavior-change-flags.md
require_explicit_package_overrides_for_builtin_materializations: bool = True
require_resource_names_without_spaces: bool = False
source_freshness_run_project_hooks: bool = False
state_modified_compare_more_unrendered_values: bool = False

@property
def project_only_flags(self) -> Dict[str, Any]:
return {
"require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations,
"require_resource_names_without_spaces": self.require_resource_names_without_spaces,
"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks,
"state_modified_compare_more_unrendered_values": self.state_modified_compare_more_unrendered_values,
}


Expand Down
1 change: 1 addition & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,7 @@ message EndOfRunSummary {
int32 num_errors = 1;
int32 num_warnings = 2;
bool keyboard_interrupt = 3;
int32 num_partial_success = 4;
}

message EndOfRunSummaryMsg {
Expand Down
136 changes: 68 additions & 68 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,9 @@ def message(self) -> str:
if self.status == "error":
info = "ERROR creating"
status = red(self.status.upper())
elif "PARTIAL SUCCESS" in self.status:
info = "PARTIALLY created"
status = yellow(self.status.upper())
else:
info = "OK created"
status = green(self.status)
Expand Down Expand Up @@ -1860,10 +1863,16 @@ def code(self) -> str:
def message(self) -> str:
error_plural = pluralize(self.num_errors, "error")
warn_plural = pluralize(self.num_warnings, "warning")
partial_success_plural = pluralize(self.num_partial_success, "partial success")

if self.keyboard_interrupt:
message = yellow("Exited because of keyboard interrupt")
elif self.num_errors > 0:
message = red(f"Completed with {error_plural} and {warn_plural}:")
message = red(
f"Completed with {error_plural}, {partial_success_plural}, and {warn_plural}:"
)
elif self.num_partial_success > 0:
message = yellow(f"Completed with {partial_success_plural} and {warn_plural}")
elif self.num_warnings > 0:
message = yellow(f"Completed with {warn_plural}:")
else:
Expand Down
Loading

0 comments on commit 45339c0

Please sign in to comment.