Skip to content

Commit

Permalink
Create protobuf message representations of graph nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Feb 10, 2023
1 parent 311a57a commit 7d5a50d
Show file tree
Hide file tree
Showing 17 changed files with 2,182 additions and 19 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230131-212702.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Crate protobuf representation of nodes
time: 2023-01-31T21:27:02.209758-05:00
custom:
Author: gshank
Issue: "6391"
2 changes: 2 additions & 0 deletions core/dbt/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@
PIN_PACKAGE_URL = (
"https://docs.getdbt.com/docs/package-management#section-specifying-package-versions"
)

MANIFEST_FILE_NAME = "manifest.json"
28 changes: 28 additions & 0 deletions core/dbt/contracts/graph/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# nodes.proto messages

For new fields in a node or node config to be included in the protobuf serialized output,
the messages in nodes.proto need to be updated.

Then proto.nodes need to be compiled: ```protoc --python_betterproto_out . nodes.proto```

In order to use optional fields (really necessary for nodes and configs) we had to use
a beta version of betterproto. This version has a bug in the way that it writes the
names of some generated classes, so we will have to update the name of the rpc node from
RpcNode to RPCNode in the generated file.

In addition, betterproto now always creates the generated python file as an __init__.py
file in a subdirectory. For now, I'm moving it up from proto_nodes/__init__.py to proto_nodes.py.

# updating nodes.py and model_config.py for nodes.proto changes

Protobuf python messages objects are created to "to_msg" methods. There is often a list
of attributes to set in a "msg_attributes" method, but this isn't entirely consistent.
If a class has a small number of additional attributes they are sometimes set directly.
Some attributes aren't handled well by the "get_msg_attribute_value" utility function,
in which case they are set directly. This is particularly true of lists or dictionaries
of objects, which need to be converted using a "to_msg" method.

The utility class "get_msg_attribute_value" does a couple of common conversions of
attribute values, such as getting the string value of an enum or converting dictionaries to
dictionaries of strings. A few common more elaborate conversions are also performed, such as
"columns".
91 changes: 90 additions & 1 deletion core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
register_pattern,
)
from dbt.contracts.graph.unparsed import AdditionalPropertiesAllowed, Docs
from dbt.contracts.graph.utils import validate_color
from dbt.contracts.graph.utils import validate_color, get_msg_attribute_value
from dbt.exceptions import DbtInternalError, CompilationError
from dbt.contracts.util import Replaceable, list_str
from dbt import hooks
from dbt.node_types import NodeType
from dbt.contracts.graph import proto_nodes


M = TypeVar("M", bound="Metadata")
Expand Down Expand Up @@ -195,6 +196,9 @@ class Hook(dbtClassMixin, Replaceable):
transaction: bool = True
index: Optional[int] = None

def to_msg(self):
return proto_nodes.Hook(sql=self.sql, transaction=self.transaction, index=self.index)


T = TypeVar("T", bound="BaseConfig")

Expand Down Expand Up @@ -404,9 +408,15 @@ class NodeAndTestConfig(BaseConfig):
metadata=MergeBehavior.Update.meta(),
)

@classmethod
def msg_attributes(self):
return ["enabled", "alias", "schema", "database", "tags", "meta"]


@dataclass
class NodeConfig(NodeAndTestConfig):
"""This config is used by ModelNode, AnalysisNode, RPCNode, SqlNode, HookNode"""

# Note: if any new fields are added with MergeBehavior, also update the
# 'mergebehavior' dictionary
materialized: str = "view"
Expand Down Expand Up @@ -447,6 +457,35 @@ class NodeConfig(NodeAndTestConfig):
metadata=MergeBehavior.Update.meta(),
)

@classmethod
def msg_attributes(self):
return [
"materialized",
"incremental_strategy",
"persist_docs",
"quoting",
"full_refresh",
"unique_key",
"on_schema_change",
"grants",
"packages",
"docs",
]

def to_msg(self):
# Get matching msg config class
config_name = type(self).__name__
msg_cls = getattr(proto_nodes, config_name)
msg_config = msg_cls()

for cls in [NodeAndTestConfig, NodeConfig]:
for attribute in cls.msg_attributes():
value = get_msg_attribute_value(self, attribute)
setattr(msg_config, attribute, value)
msg_config.post_hook = [hk.to_msg() for hk in self.post_hook]
msg_config.pre_hook = [hk.to_msg() for hk in self.pre_hook]
return msg_config

# we validate that node_color has a suitable value to prevent dbt-docs from crashing
def __post_init__(self):
if self.docs.node_color:
Expand Down Expand Up @@ -495,15 +534,23 @@ class SeedConfig(NodeConfig):
materialized: str = "seed"
quote_columns: Optional[bool] = None

def to_msg(self):
msg = super().to_msg()
msg.quote_columns = self.quote_columns
return msg

@classmethod
def validate(cls, data):
super().validate(data)
if data.get("materialized") and data.get("materialized") != "seed":
raise ValidationError("A seed must have a materialized value of 'seed'")


# This is used in both GenericTestNode and SingularTestNode, but some
# of these attributes seem specific to GenericTestNode.
@dataclass
class TestConfig(NodeAndTestConfig):
__test__ = False
# this is repeated because of a different default
schema: Optional[str] = field(
default="dbt_test__audit",
Expand All @@ -518,6 +565,27 @@ class TestConfig(NodeAndTestConfig):
warn_if: str = "!= 0"
error_if: str = "!= 0"

@classmethod
def msg_attributes(self):
return [
"materialized",
"severity",
"store_failures",
"where",
"limit",
"fail_calc",
"warn_if",
"error_if",
]

def to_msg(self):
msg_config = proto_nodes.TestConfig()
for cls in [NodeAndTestConfig, TestConfig]:
for attribute in cls.msg_attributes():
value = get_msg_attribute_value(self, attribute)
setattr(msg_config, attribute, value)
return msg_config

@classmethod
def same_contents(cls, unrendered: Dict[str, Any], other: Dict[str, Any]) -> bool:
"""This is like __eq__, except it explicitly checks certain fields."""
Expand Down Expand Up @@ -563,6 +631,27 @@ class SnapshotConfig(EmptySnapshotConfig):
# Not using Optional because of serialization issues with a Union of str and List[str]
check_cols: Union[str, List[str], None] = None

@classmethod
def msg_attributes(self):
return ["strategy", "unique_key", "target_schema", "target_database"]

def to_msg(self):
msg_config = super().to_msg() # Uses NodeConfig to_msg
for attribute in self.msg_attributes():
value = get_msg_attribute_value(self, attribute)
setattr(msg_config, attribute, value)
msg_config.check_cols = self.normalize_check_cols()
return msg_config

def normalize_check_cols(self):
"""Ensure that check_cols is always a list"""
if self.check_cols is None:
return []
elif isinstance(self.check_cols, str):
return [self.check_cols]
else:
return self.check_cols

@classmethod
def validate(cls, data):
super().validate(data)
Expand Down
Loading

0 comments on commit 7d5a50d

Please sign in to comment.