Skip to content

Commit

Permalink
CT 1998 use google protobuf to enable more flexible dictionaries (#7190)
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank authored Mar 22, 2023
1 parent 73ff497 commit ae485f9
Show file tree
Hide file tree
Showing 37 changed files with 1,463 additions and 3,793 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20230318-164326.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Switch from betterproto to google protobuf and enable more flexible meta dictionary
in logs
time: 2023-03-18T16:43:26.782738-04:00
custom:
Author: gshank
Issue: "6832"
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
core/dbt/include/index.html binary
tests/functional/artifacts/data/state/*/manifest.json binary
core/dbt/events/types_pb2.py binary
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Configuration for pre-commit hooks (see https://pre-commit.com/).
# Eventually the hooks described here will be run as tests before merging each PR.

exclude: ^(core/dbt/docs/build/)
exclude: ^(core/dbt/docs/build/|core/dbt/events/types_pb2.py)

# Force all unspecified python hooks to run python 3.8
default_language_version:
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ dev: dev_req ## Installs dbt-* packages in develop mode along with development d
@\
pre-commit install

.PHONY: proto_types
proto_types: ## generates google protobuf python file from types.proto
protoc -I=./core/dbt/events --python_out=./core/dbt/events ./core/dbt/events/types.proto

.PHONY: mypy
mypy: .env ## Runs mypy against staged changes for static type checking.
@\
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
)
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.base import Credentials
from dbt.adapters.cache import RelationsCache, _make_ref_key_msg
from dbt.adapters.cache import RelationsCache, _make_ref_key_dict


GET_CATALOG_MACRO_NAME = "get_catalog"
Expand Down Expand Up @@ -722,7 +722,7 @@ def list_relations(self, database: Optional[str], schema: str) -> List[BaseRelat
ListRelations(
database=cast_to_str(database),
schema=schema,
relations=[_make_ref_key_msg(x) for x in relations],
relations=[_make_ref_key_dict(x) for x in relations],
)
)

Expand Down
33 changes: 15 additions & 18 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

from dbt.adapters.reference_keys import (
_make_ref_key,
_make_ref_key_msg,
_make_msg_from_ref_key,
_make_ref_key_dict,
_ReferenceKey,
)
from dbt.exceptions import (
Expand Down Expand Up @@ -230,7 +229,7 @@ def dump_graph(self):
# self.relations or any cache entry's referenced_by during iteration
# it's a runtime error!
with self.lock:
return {dot_separated(k): v.dump_graph_entry() for k, v in self.relations.items()}
return {dot_separated(k): str(v.dump_graph_entry()) for k, v in self.relations.items()}

def _setdefault(self, relation: _CachedRelation):
"""Add a relation to the cache, or return it if it already exists.
Expand Down Expand Up @@ -290,8 +289,8 @@ def add_link(self, referenced, dependent):
# a link - we will never drop the referenced relation during a run.
fire_event(
CacheAction(
ref_key=_make_msg_from_ref_key(ref_key),
ref_key_2=_make_msg_from_ref_key(dep_key),
ref_key=ref_key._asdict(),
ref_key_2=dep_key._asdict(),
)
)
return
Expand All @@ -306,8 +305,8 @@ def add_link(self, referenced, dependent):
fire_event(
CacheAction(
action="add_link",
ref_key=_make_msg_from_ref_key(dep_key),
ref_key_2=_make_msg_from_ref_key(ref_key),
ref_key=dep_key._asdict(),
ref_key_2=ref_key._asdict(),
)
)
with self.lock:
Expand All @@ -325,7 +324,7 @@ def add(self, relation):
flags.LOG_CACHE_EVENTS,
lambda: CacheDumpGraph(before_after="before", action="adding", dump=self.dump_graph()),
)
fire_event(CacheAction(action="add_relation", ref_key=_make_ref_key_msg(cached)))
fire_event(CacheAction(action="add_relation", ref_key=_make_ref_key_dict(cached)))

with self.lock:
self._setdefault(cached)
Expand Down Expand Up @@ -359,15 +358,15 @@ def drop(self, relation):
:param str identifier: The identifier of the relation to drop.
"""
dropped_key = _make_ref_key(relation)
dropped_key_msg = _make_ref_key_msg(relation)
dropped_key_msg = _make_ref_key_dict(relation)
fire_event(CacheAction(action="drop_relation", ref_key=dropped_key_msg))
with self.lock:
if dropped_key not in self.relations:
fire_event(CacheAction(action="drop_missing_relation", ref_key=dropped_key_msg))
return
consequences = self.relations[dropped_key].collect_consequences()
# convert from a list of _ReferenceKeys to a list of ReferenceKeyMsgs
consequence_msgs = [_make_msg_from_ref_key(key) for key in consequences]
consequence_msgs = [key._asdict() for key in consequences]
fire_event(
CacheAction(
action="drop_cascade", ref_key=dropped_key_msg, ref_list=consequence_msgs
Expand Down Expand Up @@ -397,9 +396,9 @@ def _rename_relation(self, old_key, new_relation):
fire_event(
CacheAction(
action="update_reference",
ref_key=_make_ref_key_msg(old_key),
ref_key_2=_make_ref_key_msg(new_key),
ref_key_3=_make_ref_key_msg(cached.key()),
ref_key=_make_ref_key_dict(old_key),
ref_key_2=_make_ref_key_dict(new_key),
ref_key_3=_make_ref_key_dict(cached.key()),
)
)

Expand Down Expand Up @@ -430,9 +429,7 @@ def _check_rename_constraints(self, old_key, new_key):
raise TruncatedModelNameCausedCollisionError(new_key, self.relations)

if old_key not in self.relations:
fire_event(
CacheAction(action="temporary_relation", ref_key=_make_msg_from_ref_key(old_key))
)
fire_event(CacheAction(action="temporary_relation", ref_key=old_key._asdict()))
return False
return True

Expand All @@ -453,8 +450,8 @@ def rename(self, old, new):
fire_event(
CacheAction(
action="rename_relation",
ref_key=_make_msg_from_ref_key(old_key),
ref_key_2=_make_msg_from_ref_key(new),
ref_key=old_key._asdict(),
ref_key_2=new_key._asdict(),
)
)
flags = get_flags()
Expand Down
15 changes: 6 additions & 9 deletions core/dbt/adapters/reference_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from collections import namedtuple
from typing import Any, Optional
from dbt.events.proto_types import ReferenceKeyMsg


_ReferenceKey = namedtuple("_ReferenceKey", "database schema identifier")
Expand Down Expand Up @@ -30,11 +29,9 @@ def _make_ref_key(relation: Any) -> _ReferenceKey:
)


def _make_ref_key_msg(relation: Any):
return _make_msg_from_ref_key(_make_ref_key(relation))


def _make_msg_from_ref_key(ref_key: _ReferenceKey) -> ReferenceKeyMsg:
return ReferenceKeyMsg(
database=ref_key.database, schema=ref_key.schema, identifier=ref_key.identifier
)
def _make_ref_key_dict(relation: Any):
return {
"database": relation.database,
"schema": relation.schema,
"identifier": relation.identifier,
}
8 changes: 4 additions & 4 deletions core/dbt/adapters/sql/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dbt.contracts.connection import Connection
from dbt.exceptions import RelationTypeNullError
from dbt.adapters.base import BaseAdapter, available
from dbt.adapters.cache import _make_ref_key_msg
from dbt.adapters.cache import _make_ref_key_dict
from dbt.adapters.sql import SQLConnectionManager
from dbt.events.functions import fire_event
from dbt.events.types import ColTypeChange, SchemaCreation, SchemaDrop
Expand Down Expand Up @@ -109,7 +109,7 @@ def expand_column_types(self, goal, current):
ColTypeChange(
orig_type=target_column.data_type,
new_type=new_type,
table=_make_ref_key_msg(current),
table=_make_ref_key_dict(current),
)
)

Expand Down Expand Up @@ -152,7 +152,7 @@ def get_columns_in_relation(self, relation):

def create_schema(self, relation: BaseRelation) -> None:
relation = relation.without_identifier()
fire_event(SchemaCreation(relation=_make_ref_key_msg(relation)))
fire_event(SchemaCreation(relation=_make_ref_key_dict(relation)))
kwargs = {
"relation": relation,
}
Expand All @@ -163,7 +163,7 @@ def create_schema(self, relation: BaseRelation) -> None:

def drop_schema(self, relation: BaseRelation) -> None:
relation = relation.without_identifier()
fire_event(SchemaDrop(relation=_make_ref_key_msg(relation)))
fire_event(SchemaDrop(relation=_make_ref_key_dict(relation)))
kwargs = {
"relation": relation,
}
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/clients/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ def run_cmd(cwd: str, cmd: List[str], env: Optional[Dict[str, Any]] = None) -> T
except OSError as exc:
_interpret_oserror(exc, cwd, cmd)

fire_event(SystemStdOut(bmsg=out))
fire_event(SystemStdErr(bmsg=err))
fire_event(SystemStdOut(bmsg=str(out)))
fire_event(SystemStdErr(bmsg=str(err)))

if proc.returncode != 0:
fire_event(SystemReportReturnCode(returncode=proc.returncode))
Expand Down
9 changes: 2 additions & 7 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
MetricTime,
)
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
from dbt.events.proto_types import NodeInfo
from dbt.events.functions import warn_or_error
from dbt.exceptions import ParsingError, InvalidAccessTypeError
from dbt.events.types import (
Expand All @@ -50,7 +49,6 @@
from dbt.events.contextvars import set_contextvars
from dbt.flags import get_flags
from dbt.node_types import ModelLanguage, NodeType, AccessType
from dbt.utils import cast_dict_to_dict_of_strings


from .model_config import (
Expand Down Expand Up @@ -243,8 +241,6 @@ class NodeInfoMixin:

@property
def node_info(self):
meta = getattr(self, "meta", {})
meta_stringified = cast_dict_to_dict_of_strings(meta)
node_info = {
"node_path": getattr(self, "path", None),
"node_name": getattr(self, "name", None),
Expand All @@ -254,10 +250,9 @@ def node_info(self):
"node_status": str(self._event_status.get("node_status")),
"node_started_at": self._event_status.get("started_at"),
"node_finished_at": self._event_status.get("finished_at"),
"meta": meta_stringified,
"meta": getattr(self, "meta", {}),
}
node_info_msg = NodeInfo(**node_info)
return node_info_msg
return node_info

def update_event_status(self, **kwargs):
for k, v in kwargs.items():
Expand Down
39 changes: 21 additions & 18 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from dbt.exceptions import DbtInternalError
from dbt.events.functions import fire_event
from dbt.events.types import TimingInfoCollected
from dbt.events.proto_types import RunResultMsg, TimingInfoMsg
from dbt.events.contextvars import get_node_info
from dbt.events.helpers import datetime_to_json_string
from dbt.logger import TimingProcessor
from dbt.utils import lowercase, cast_to_str, cast_to_int, cast_dict_to_dict_of_strings
from dbt.utils import lowercase, cast_to_str, cast_to_int
from dbt.dataclass_schema import dbtClassMixin, StrEnum

import agate
Expand Down Expand Up @@ -45,11 +45,13 @@ def begin(self):
def end(self):
self.completed_at = datetime.utcnow()

def to_msg(self):
timsg = TimingInfoMsg(
name=self.name, started_at=self.started_at, completed_at=self.completed_at
)
return timsg
def to_msg_dict(self):
msg_dict = {"name": self.name}
if self.started_at:
msg_dict["started_at"] = datetime_to_json_string(self.started_at)
if self.completed_at:
msg_dict["completed_at"] = datetime_to_json_string(self.completed_at)
return msg_dict


# This is a context manager
Expand All @@ -67,7 +69,7 @@ def __exit__(self, exc_type, exc_value, traceback):
with TimingProcessor(self.timing_info):
fire_event(
TimingInfoCollected(
timing_info=self.timing_info.to_msg(), node_info=get_node_info()
timing_info=self.timing_info.to_msg_dict(), node_info=get_node_info()
)
)

Expand Down Expand Up @@ -129,16 +131,17 @@ def __pre_deserialize__(cls, data):
data["failures"] = None
return data

def to_msg(self):
msg = RunResultMsg()
msg.status = str(self.status)
msg.message = cast_to_str(self.message)
msg.thread = self.thread_id
msg.execution_time = self.execution_time
msg.num_failures = cast_to_int(self.failures)
msg.timing_info = [ti.to_msg() for ti in self.timing]
msg.adapter_response = cast_dict_to_dict_of_strings(self.adapter_response)
return msg
def to_msg_dict(self):
msg_dict = {
"status": str(self.status),
"message": cast_to_str(self.message),
"thread": self.thread_id,
"execution_time": self.execution_time,
"num_failures": cast_to_int(self.failures),
"timing_info": [ti.to_msg_dict() for ti in self.timing],
"adapter_response": self.adapter_response,
}
return msg_dict


@dataclass
Expand Down
22 changes: 11 additions & 11 deletions core/dbt/events/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ The event module provides types that represent what is happening in dbt in `even
When events are processed via `fire_event`, nearly everything is logged. Whether or not the user has enabled the debug flag, all debug messages are still logged to the file. However, some events are particularly time consuming to construct because they return a huge amount of data. Today, the only messages in this category are cache events and are only logged if the `--log-cache-events` flag is on. This is important because these messages should not be created unless they are going to be logged, because they cause a noticable performance degredation. These events use a "fire_event_if" functions.

# Adding a New Event
* Add a new message in types.proto with an EventInfo field first
* run the protoc compiler to update proto_types.py: ```protoc --python_betterproto_out . types.proto```
* Add a wrapping class in core/dbt/event/types.py with a Level superclass and the superclass from proto_types.py, plus code and message methods
* Add a new message in types.proto, and a second message with the same name + "Msg". The "Msg" message should have two fields, an "info" field of EventInfo, and a "data" field referring to the message name without "Msg"
* run the protoc compiler to update types_pb2.py: make proto_types
* Add a wrapping class in core/dbt/event/types.py with a Level superclass plus code and message methods
* Add the class to tests/unit/test_events.py

Note that no attributes can exist in these event classes except for fields defined in the protobuf definitions, because the betterproto metaclass will throw an error. Betterproto provides a to_dict() method to convert the generated classes to a dictionary and from that to json. However some attributes will successfully convert to dictionaries but not to serialized protobufs, so we need to test both output formats.
We have switched from using betterproto to using google protobuf, because of a lack of support for Struct fields in betterproto.

The google protobuf interface is janky and very much non-Pythonic. The "generated" classes in types_pb2.py do not resemble regular Python classes. They do not have normal constructors; they can only be constructed empty. They can be "filled" by setting fields individually or using a json_format method like ParseDict. We have wrapped the logging events with a class (in types.py) which allows using a constructor -- keywords only, no positional parameters.

## Required for Every Event

Expand All @@ -24,8 +25,7 @@ Note that no attributes can exist in these event classes except for fields defin

Example
```
@dataclass
class PartialParsingDeletedExposure(DebugLevel, pt.PartialParsingDeletedExposure):
class PartialParsingDeletedExposure(DebugLevel):
def code(self):
return "I049"
Expand All @@ -50,8 +50,8 @@ logger = AdapterLogger("<database name>")

## Compiling types.proto

After adding a new message in types.proto:
```
cd core/dbt/events
protoc --python_betterproto_out . types.proto
```
After adding a new message in types.proto, execute Makefile target:

make proto_types in the repository root directory, or
`protoc -I=. --python_out=. types.proto`
in the core/dbt/events directory
Loading

0 comments on commit ae485f9

Please sign in to comment.