From 74fbaa18cd7522161b8e0db97393c73d869a0498 Mon Sep 17 00:00:00 2001 From: Nathaniel May Date: Thu, 2 Dec 2021 15:04:52 -0500 Subject: [PATCH] change json override strategy (#4396) --- core/dbt/adapters/base/impl.py | 2 +- core/dbt/events/base_types.py | 36 +++----- core/dbt/events/functions.py | 31 ++++--- core/dbt/events/types.py | 163 ++++++++------------------------- test/unit/test_events.py | 14 +-- 5 files changed, 80 insertions(+), 166 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index f8ac6d5d4fa..deeb06d84dd 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -291,7 +291,7 @@ def _schema_is_cached(self, database: Optional[str], schema: str) -> bool: if (database, schema) not in self.cache: fire_event( CacheMiss( - conn_name=self.nice_connection_name, + conn_name=self.nice_connection_name(), database=database, schema=schema ) diff --git a/core/dbt/events/base_types.py b/core/dbt/events/base_types.py index cdf80a36628..c4c8bf3f591 100644 --- a/core/dbt/events/base_types.py +++ b/core/dbt/events/base_types.py @@ -1,7 +1,6 @@ from abc import ABCMeta, abstractmethod, abstractproperty from dataclasses import dataclass from datetime import datetime -import json import os import threading from typing import Any, Optional @@ -97,26 +96,6 @@ def level_tag(self) -> str: def message(self) -> str: raise Exception("msg not implemented for Event") - # override this method to convert non-json serializable fields to json. - # for override examples, see existing concrete types. - # - # there is no type-level mechanism to have mypy enforce json serializability, so we just try - # to serialize and raise an exception at runtime when that fails. This safety mechanism - # only works if we have attempted to serialize every concrete event type in our tests. - def fields_to_json(self, field_value: Any) -> Any: - try: - json.dumps(field_value, sort_keys=True) - return field_value - except TypeError: - val_type = type(field_value).__name__ - event_type = type(self).__name__ - return Exception( - f"type {val_type} is not serializable to json." - f" First make sure that the call sites for {event_type} match the type hints" - f" and if they do, you can override Event::fields_to_json in {event_type} in" - " types.py to define your own serialization function to any valid json type" - ) - # exactly one time stamp per concrete event def get_ts(self) -> datetime: if not self.ts: @@ -146,6 +125,21 @@ def get_invocation_id(cls) -> str: from dbt.events.functions import get_invocation_id return get_invocation_id() + # default dict factory for all events. can override on concrete classes. + @classmethod + def asdict(cls, data: list) -> dict: + d = dict() + for k, v in data: + # stringify all exceptions + if isinstance(v, Exception) or isinstance(v, BaseException): + d[k] = str(v) + # skip all binary data + elif isinstance(v, bytes): + continue + else: + d[k] = v + return d + @dataclass # type: ignore class NodeInfo(Event, metaclass=ABCMeta): diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index d5cb62d5bc5..e749ae6b4a2 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -132,18 +132,25 @@ def event_to_serializable_dict( ) -> Dict[str, Any]: data = dict() node_info = dict() - if hasattr(e, '__dataclass_fields__'): - if isinstance(e, NodeInfo): - node_info = dataclasses.asdict(e.get_node_info()) - - for field, value in dataclasses.asdict(e).items(): # type: ignore[attr-defined] - if field not in ["code", "report_node_data"]: - _json_value = e.fields_to_json(value) - - if not isinstance(_json_value, Exception): - data[field] = _json_value - else: - data[field] = f"JSON_SERIALIZE_FAILED: {type(value).__name__, 'NA'}" + log_line = dict() + try: + log_line = dataclasses.asdict(e, dict_factory=type(e).asdict) + except AttributeError: + event_type = type(e).__name__ + raise Exception( # TODO this may hang async threads + f"type {event_type} is not serializable to json." + f" First make sure that the call sites for {event_type} match the type hints" + f" and if they do, you can override the dataclass method `asdict` in {event_type} in" + " types.py to define your own serialization function to a dictionary of valid json" + " types" + ) + + if isinstance(e, NodeInfo): + node_info = dataclasses.asdict(e.get_node_info()) + + for field, value in log_line.items(): # type: ignore[attr-defined] + if field not in ["code", "report_node_data"]: + data[field] = value event_dict = { 'type': 'log_line', diff --git a/core/dbt/events/types.py b/core/dbt/events/types.py index 87e85e04bf5..8df29c8bd2e 100644 --- a/core/dbt/events/types.py +++ b/core/dbt/events/types.py @@ -115,14 +115,6 @@ class MainEncounteredError(ErrorLevel, Cli): def message(self) -> str: return f"Encountered an error:\n{str(self.e)}" - # overriding default json serialization for this event - def fields_to_json(self, val: Any) -> Any: - # equality on BaseException is not good enough of a comparison here - if isinstance(val, BaseException): - return str(val) - - return val - @dataclass class MainStackTrace(DebugLevel, Cli): @@ -150,12 +142,9 @@ class MainReportArgs(DebugLevel, Cli, File): def message(self): return f"running dbt with arguments {str(self.args)}" - # overriding default json serialization for this event - def fields_to_json(self, val: Any) -> Any: - if isinstance(val, argparse.Namespace): - return str(val) - - return val + @classmethod + def asdict(cls, data: list) -> dict: + return dict((k, str(v)) for k, v in data) @dataclass @@ -354,13 +343,6 @@ def message(self) -> str: f"{self.reason}\nexception: {self.exc}" ) - # overriding default json serialization for this event - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class SystemExecutingCmd(DebugLevel, Cli, File): @@ -542,7 +524,7 @@ def message(self) -> str: @dataclass class CacheMiss(DebugLevel, Cli, File): - conn_name: Any # TODO mypy says this is `Callable[[], str]`?? ¯\_(ツ)_/¯ + conn_name: str database: Optional[str] schema: str code: str = "E013" @@ -564,6 +546,14 @@ class ListRelations(DebugLevel, Cli, File): def message(self) -> str: return f"with database={self.database}, schema={self.schema}, relations={self.relations}" + @classmethod + def asdict(cls, data: list) -> dict: + d = dict() + for k, v in data: + if type(v) == list: + d[k] = [str(x) for x in v] + return d + @dataclass class ConnectionUsed(DebugLevel, Cli, File): @@ -623,6 +613,10 @@ class SchemaCreation(DebugLevel, Cli, File): def message(self) -> str: return f'Creating schema "{self.relation}"' + @classmethod + def asdict(cls, data: list) -> dict: + return dict((k, str(v)) for k, v in data) + @dataclass class SchemaDrop(DebugLevel, Cli, File): @@ -632,6 +626,10 @@ class SchemaDrop(DebugLevel, Cli, File): def message(self) -> str: return f'Dropping schema "{self.relation}".' + @classmethod + def asdict(cls, data: list) -> dict: + return dict((k, str(v)) for k, v in data) + # TODO pretty sure this is only ever called in dead code # see: core/dbt/adapters/cache.py _add_link vs add_link @@ -667,12 +665,9 @@ class AddRelation(DebugLevel, Cli, File, Cache): def message(self) -> str: return f"Adding relation: {str(self.relation)}" - # overriding default json serialization for this event - def fields_to_json(self, val: Any) -> Any: - if isinstance(val, _CachedRelation): - return str(val) - - return val + @classmethod + def asdict(cls, data: list) -> dict: + return dict((k, str(v)) for k, v in data) @dataclass @@ -693,6 +688,16 @@ class DropCascade(DebugLevel, Cli, File, Cache): def message(self) -> str: return f"drop {self.dropped} is cascading to {self.consequences}" + @classmethod + def asdict(cls, data: list) -> dict: + d = dict() + for k, v in data: + if isinstance(v, list): + d[k] = [str(x) for x in v] + else: + d[k] = str(v) # type: ignore + return d + @dataclass class DropRelation(DebugLevel, Cli, File, Cache): @@ -782,11 +787,9 @@ class AdapterImportError(InfoLevel, Cli, File): def message(self) -> str: return f"Error importing adapter: {self.exc}" - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val()) - - return val + @classmethod + def asdict(cls, data: list) -> dict: + return dict((k, str(v)) for k, v in data) @dataclass @@ -842,12 +845,6 @@ class ProfileLoadError(ShowException, DebugLevel, Cli, File): def message(self) -> str: return f"Profile not loaded due to error: {self.exc}" - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class ProfileNotFound(InfoLevel, Cli, File): @@ -885,12 +882,6 @@ def message(self) -> str: ) return error - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - # TODO: Remove? (appears to be uncalled) @dataclass @@ -901,12 +892,6 @@ class HandleInternalException(ShowException, DebugLevel, Cli, File): def message(self) -> str: return str(self.exc) - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - # TODO: Remove? (appears to be uncalled) @@ -927,12 +912,6 @@ def message(self) -> str: error=str(self.exc).strip() ) - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - # TODO: Remove? (appears to be uncalled) @@ -1110,12 +1089,6 @@ class ParsedFileLoadFailed(ShowException, DebugLevel, Cli, File): def message(self) -> str: return f"Failed to load parsed file from disk at {self.path}: {self.exc}" - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class PartialParseSaveFileNotFound(InfoLevel, Cli, File): @@ -1329,12 +1302,6 @@ class RunningOperationCaughtError(ErrorLevel, Cli, File): def message(self) -> str: return f'Encountered an error while running operation: {self.exc}' - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class RunningOperationUncaughtError(ErrorLevel, Cli, File): @@ -1344,12 +1311,6 @@ class RunningOperationUncaughtError(ErrorLevel, Cli, File): def message(self) -> str: return f'Encountered an error while running operation: {self.exc}' - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class DbtProjectError(ErrorLevel, Cli, File): @@ -1367,12 +1328,6 @@ class DbtProjectErrorException(ErrorLevel, Cli, File): def message(self) -> str: return f" ERROR: {str(self.exc)}" - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class DbtProfileError(ErrorLevel, Cli, File): @@ -1390,12 +1345,6 @@ class DbtProfileErrorException(ErrorLevel, Cli, File): def message(self) -> str: return f" ERROR: {str(self.exc)}" - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class ProfileListTitle(InfoLevel, Cli, File): @@ -1443,12 +1392,6 @@ class CatchableExceptionOnRun(ShowException, DebugLevel, Cli, File): def message(self) -> str: return str(self.exc) - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class InternalExceptionOnRun(DebugLevel, Cli, File): @@ -1469,12 +1412,6 @@ def message(self) -> str: note=INTERNAL_ERROR_STRING ) - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - # This prints the stack trace at the debug level while allowing just the nice exception message # at the error level - or whatever other level chosen. Used in multiple places. @@ -1503,12 +1440,6 @@ def message(self) -> str: error=str(self.exc).strip() ) - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class NodeConnectionReleaseError(ShowException, DebugLevel, Cli, File): @@ -1520,12 +1451,6 @@ def message(self) -> str: return ('Error releasing connection for node {}: {!s}' .format(self.node_name, self.exc)) - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class CheckCleanPath(InfoLevel, Cli): @@ -1846,12 +1771,6 @@ class SQlRunnerException(ShowException, DebugLevel, Cli, File): def message(self) -> str: return f"Got an exception: {self.exc}" - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class CheckNodeTestFailure(InfoLevel, Cli, File): @@ -2322,6 +2241,10 @@ class NodeFinished(DebugLevel, Cli, File, NodeInfo): def message(self) -> str: return f"Finished running node {self.unique_id}" + @classmethod + def asdict(cls, data: list) -> dict: + return dict((k, str(v)) for k, v in data) + @dataclass class QueryCancelationUnsupported(InfoLevel, Cli, File): @@ -2606,12 +2529,6 @@ def message(self) -> str: return self.log_fmt.format(str(self.exc)) return str(self.exc) - def fields_to_json(self, val: Any) -> Any: - if val == self.exc: - return str(val) - - return val - @dataclass class EventBufferFull(WarnLevel, Cli, File): diff --git a/test/unit/test_events.py b/test/unit/test_events.py index 34dcff3baf1..a31ac4ae1a4 100644 --- a/test/unit/test_events.py +++ b/test/unit/test_events.py @@ -108,10 +108,6 @@ def test_buffer_populates(self): # EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0 # ) -def dump_callable(): - return dict() - - def MockNode(): return ParsedModelNode( alias='model_one', @@ -221,10 +217,10 @@ def MockNode(): old_key=_ReferenceKey(database="", schema="", identifier=""), new_key=_ReferenceKey(database="", schema="", identifier="") ), - DumpBeforeAddGraph(dump_callable), - DumpAfterAddGraph(dump_callable), - DumpBeforeRenameSchema(dump_callable), - DumpAfterRenameSchema(dump_callable), + DumpBeforeAddGraph(dict()), + DumpAfterAddGraph(dict()), + DumpBeforeRenameSchema(dict()), + DumpAfterRenameSchema(dict()), AdapterImportError(ModuleNotFoundError()), PluginLoadError(), SystemReportReturnCode(returncode=0), @@ -429,7 +425,7 @@ def test_all_serializable(self): # if we have everything we need to test, try to serialize everything for event in sample_values: - d = event_to_serializable_dict(event, lambda dt: dt.isoformat(), lambda x: x.message()) + d = event_to_serializable_dict(event, lambda _: event.get_ts_rfc3339(), lambda x: x.message()) try: json.dumps(d) except TypeError as e: