Skip to content

Transactional json logging #1806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 10, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
@@ -2,7 +2,12 @@
from dbt.contracts.graph.unparsed import Time, FreshnessStatus
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.contracts.util import Writable, Replaceable
from dbt.logger import LogMessage
from dbt.logger import (
LogMessage,
TimingProcessor,
JsonOnly,
GLOBAL_LOGGER as logger,
)
from hologram.helpers import StrEnum
from hologram import JsonSchemaMixin

@@ -28,7 +33,7 @@ def end(self):


class collect_timing_info:
def __init__(self, name):
def __init__(self, name: str):
self.timing_info = TimingInfo(name=name)

def __enter__(self):
@@ -37,6 +42,8 @@ def __enter__(self):

def __exit__(self, exc_type, exc_value, traceback):
self.timing_info.end()
with JsonOnly(), TimingProcessor(self.timing_info):
logger.debug('finished collecting timing info')


@dataclass
25 changes: 14 additions & 11 deletions core/dbt/loader.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
import dbt.exceptions
import dbt.flags

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.logger import GLOBAL_LOGGER as logger, DbtProcessState
from dbt.node_types import NodeType
from dbt.clients.system import make_directory
from dbt.config import Project, RuntimeConfig
@@ -31,6 +31,7 @@


PARTIAL_PARSE_FILE_NAME = 'partial_parse.pickle'
PARSING_STATE = DbtProcessState('parsing')


_parser_types = [
@@ -284,19 +285,21 @@ def load_all(
root_config: RuntimeConfig,
internal_manifest: Optional[Manifest] = None
) -> Manifest:
projects = load_all_projects(root_config)
loader = cls(root_config, projects)
loader.load(internal_manifest=internal_manifest)
loader.write_parse_results()
manifest = loader.create_manifest()
_check_manifest(manifest, root_config)
return manifest
with PARSING_STATE:
projects = load_all_projects(root_config)
loader = cls(root_config, projects)
loader.load(internal_manifest=internal_manifest)
loader.write_parse_results()
manifest = loader.create_manifest()
_check_manifest(manifest, root_config)
return manifest

@classmethod
def load_internal(cls, root_config: RuntimeConfig) -> Manifest:
projects = load_internal_projects(root_config)
loader = cls(root_config, projects)
return loader.load_only_macros()
with PARSING_STATE:
projects = load_internal_projects(root_config)
loader = cls(root_config, projects)
return loader.load_only_macros()


def _check_resource_uniqueness(manifest):
119 changes: 118 additions & 1 deletion core/dbt/logger.py
Original file line number Diff line number Diff line change
@@ -93,8 +93,10 @@ def format_record(self, record, handler):
class JsonFormatter(LogMessageFormatter):
def __call__(self, record, handler):
"""Return a the record converted to LogMessage's JSON form"""
# utils imports exceptions which imports logger...
import dbt.utils
log_message = super().__call__(record, handler)
return json.dumps(log_message.to_dict())
return json.dumps(log_message.to_dict(), cls=dbt.utils.JSONEncoder)


class FormatterMixin:
@@ -152,6 +154,17 @@ def reset(self):
self._text_format_string = self._default_format
self.format_text()

def should_handle(self, record):
if record.level < self.level:
return False
text_mode = self.formatter_class is logbook.StringFormatter
if text_mode and record.extra.get('json_only', False):
return False
elif not text_mode and record.extra.get('text_only', False):
return False
else:
return True


def _redirect_std_logging():
logbook.compat.redirect_logging()
@@ -186,6 +199,108 @@ def process(self, record):
record.level = self.target_level


class JsonOnly(logbook.Processor):
def process(self, record):
record.extra['json_only'] = True


class TextOnly(logbook.Processor):
def process(self, record):
record.extra['text_only'] = True


class TimingProcessor(logbook.Processor):
def __init__(self, timing_info: Optional[JsonSchemaMixin] = None):
self.timing_info = timing_info
super().__init__()

def process(self, record):
if self.timing_info is not None:
record.extra['timing_info'] = self.timing_info.to_dict()


class DbtProcessState(logbook.Processor):
def __init__(self, value: str):
self.value = value
super().__init__()

def process(self, record):
overwrite = (
'run_state' not in record.extra or
record.extra['run_state'] == 'internal'
)
if overwrite:
record.extra['run_state'] = self.value


class DbtModelState(logbook.Processor):
def __init__(self, state: Dict[str, str]):
self.state = state
super().__init__()

def process(self, record):
record.extra.update(self.state)


class DbtStatusMessage(logbook.Processor):
def process(self, record):
record.extra['is_status_message'] = True


class UniqueID(logbook.Processor):
def __init__(self, unique_id: str):
self.unique_id = unique_id
super().__init__()

def process(self, record):
record.extra['unique_id'] = self.unique_id


class NodeCount(logbook.Processor):
def __init__(self, node_count: int):
self.node_count = node_count
super().__init__()

def process(self, record):
record.extra['node_count'] = self.node_count


class NodeMetadata(logbook.Processor):
def __init__(self, node, node_index):
self.node = node
self.node_index = node_index
super().__init__()

def process(self, record):
keys = [
('alias', 'node_alias'),
('schema', 'node_schema'),
('database', 'node_database'),
('name', 'node_name'),
('original_file_path', 'node_path'),
('resource_type', 'resource_type'),
]
for attr, key in keys:
value = getattr(self.node, attr, None)
if value is not None:
record.extra[key] = value
record.extra['node_index'] = self.node_index
if hasattr(self.node, 'config'):
materialized = getattr(self.node.config, 'materialized', None)
if materialized is not None:
record.extra['node_materialized'] = materialized


class TimestampNamed(JsonOnly):
def __init__(self, name: str):
self.name = name
super().__init__()

def process(self, record):
super().process(record)
record.extra[self.name] = datetime.utcnow().isoformat()


logger = logbook.Logger('dbt')
# provide this for the cache, disabled by default
CACHE_LOGGER = logbook.Logger('dbt.cache')
@@ -318,6 +433,7 @@ def __init__(self, stdout=colorama_stdout, stderr=sys.stderr):
self._output_handler = OutputHandler(self.stdout)
self._file_handler = DelayedFileHandler()
self._relevel_processor = Relevel(allowed=['dbt', 'werkzeug'])
self._state_processor = DbtProcessState('internal')
# keep track of wheter we've already entered to decide if we should
# be actually pushing. This allows us to log in main() and also
# support entering dbt execution via handle_and_check.
@@ -327,6 +443,7 @@ def __init__(self, stdout=colorama_stdout, stderr=sys.stderr):
self._output_handler,
self._file_handler,
self._relevel_processor,
self._state_processor,
])

def push_application(self):
18 changes: 18 additions & 0 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
@@ -70,6 +70,18 @@ def __init__(self, config, adapter, node, node_index, num_nodes):
self.skip = False
self.skip_cause = None

def get_result_status(self, result) -> Dict[str, str]:
if result.error:
return {'node_status': 'error', 'node_error': str(result.error)}
elif result.skip:
return {'node_status': 'skipped'}
elif result.fail:
return {'node_status': 'failed'}
elif result.warn:
return {'node_status': 'warn'}
else:
return {'node_status': 'passed'}

def run_with_hooks(self, manifest):
if self.skip:
return self.on_skip()
@@ -436,6 +448,12 @@ def on_skip(self):
'Freshness: nodes cannot be skipped!'
)

def get_result_status(self, result) -> Dict[str, str]:
if result.error:
return {'node_status': 'error', 'node_error': str(result.error)}
else:
return {'node_status': str(result.status)}

def before_execute(self):
description = 'freshness of {0.source_name}.{0.name}'.format(self.node)
dbt.ui.printer.print_start_line(description, self.node_index,
11 changes: 7 additions & 4 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools
import time

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.logger import GLOBAL_LOGGER as logger, TextOnly
from dbt.node_types import NodeType, RunHookType
from dbt.node_runners import ModelRunner

@@ -92,7 +92,8 @@ def run_hooks(self, adapter, hook_type, extra_context):
num_hooks = len(ordered_hooks)

plural = 'hook' if num_hooks == 1 else 'hooks'
print_timestamped_line("")
with TextOnly():
print_timestamped_line("")
print_timestamped_line(
'Running {} {} {}'.format(num_hooks, hook_type, plural)
)
@@ -115,7 +116,8 @@ def run_hooks(self, adapter, hook_type, extra_context):
print_hook_end_line(hook_text, status, idx, num_hooks,
timer.elapsed)

print_timestamped_line("")
with TextOnly():
print_timestamped_line("")

def safe_run_hooks(self, adapter, hook_type, extra_context):
try:
@@ -134,7 +136,8 @@ def print_results_line(self, results, execution_time):
execution = " in {execution_time:0.2f}s".format(
execution_time=execution_time)

print_timestamped_line("")
with TextOnly():
print_timestamped_line("")
print_timestamped_line(
"Finished running {stat_line}{execution}."
.format(stat_line=stat_line, execution=execution))
38 changes: 31 additions & 7 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,16 @@

from dbt.task.base import ConfiguredTask
from dbt.adapters.factory import get_adapter
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.logger import (
GLOBAL_LOGGER as logger,
DbtProcessState,
TextOnly,
UniqueID,
TimestampNamed,
DbtModelState,
NodeMetadata,
NodeCount,
)
from dbt.compilation import compile_manifest
from dbt.contracts.results import ExecutionResult
from dbt.loader import GraphLoader
@@ -19,6 +28,7 @@

RESULT_FILE_NAME = 'run_results.json'
MANIFEST_FILE_NAME = 'manifest.json'
RUNNING_STATE = DbtProcessState('running')


def write_manifest(manifest, config):
@@ -120,9 +130,20 @@ def get_runner(self, node):
return cls(self.config, adapter, node, run_count, num_nodes)

def call_runner(self, runner):
# TODO: create+enforce an actual contracts for what `result` is instead
# of the current free-for-all
result = runner.run_with_hooks(self.manifest)
uid_context = UniqueID(runner.node.unique_id)
with RUNNING_STATE, uid_context:
startctx = TimestampNamed('node_started_at')
extended_metadata = NodeMetadata(runner.node, runner.node_index)
with startctx, extended_metadata:
logger.info('Began running model')
status = 'error' # we must have an error if we don't see this
try:
result = runner.run_with_hooks(self.manifest)
status = runner.get_result_status(result)
finally:
finishctx = TimestampNamed('node_finished_at')
with finishctx, DbtModelState(status):
logger.info('Finished running model')
if result.error is not None and self.raise_on_first_error():
# if we raise inside a thread, it'll just get silently swallowed.
# stash the error message we want here, and it will check the
@@ -203,8 +224,10 @@ def execute_nodes(self):

text = "Concurrency: {} threads (target='{}')"
concurrency_line = text.format(num_threads, target_name)
dbt.ui.printer.print_timestamped_line(concurrency_line)
dbt.ui.printer.print_timestamped_line("")
with NodeCount(self.num_nodes):
dbt.ui.printer.print_timestamped_line(concurrency_line)
with TextOnly():
dbt.ui.printer.print_timestamped_line("")

pool = ThreadPool(num_threads)
try:
@@ -292,7 +315,8 @@ def run(self):
elapsed_time=0.0,
)
else:
logger.info("")
with TextOnly():
logger.info("")

selected_uids = frozenset(n.unique_id for n in self._flattened_nodes)
result = self.execute_with_hooks(selected_uids)
8 changes: 5 additions & 3 deletions core/dbt/task/seed.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import random

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.logger import GLOBAL_LOGGER as logger, TextOnly
from dbt.node_runners import SeedRunner
from dbt.node_types import NodeType
from dbt.task.run import RunTask
@@ -35,11 +35,13 @@ def show_table(self, result):
alias = result.node.alias

header = "Random sample of table: {}.{}".format(schema, alias)
logger.info("")
with TextOnly():
logger.info("")
logger.info(header)
logger.info("-" * len(header))
rand_table.print_table(max_rows=10, max_columns=None)
logger.info("")
with TextOnly():
logger.info("")

def show_tables(self, results):
for result in results:
12 changes: 12 additions & 0 deletions core/dbt/tracking.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
from snowplow_tracker import SelfDescribingJson
from datetime import datetime

import logbook
import pytz
import platform
import uuid
@@ -336,3 +337,14 @@ def initialize_tracking(cookie_dir):
logger.debug('Got an exception trying to initialize tracking',
exc_info=True)
active_user = User(None)


class InvocationProcessor(logbook.Processor):
def __init__(self):
super().__init__()

def process(self, record):
record.extra.update({
"run_started_at": active_user.run_started_at.isoformat(),
"invocation_id": active_user.invocation_id,
})
27 changes: 16 additions & 11 deletions core/dbt/ui/printer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Dict, Optional, Tuple

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import get_materialization
from dbt.logger import GLOBAL_LOGGER as logger, DbtStatusMessage, TextOnly
from dbt.node_types import NodeType
from dbt.tracking import InvocationProcessor
from dbt.utils import get_materialization
import dbt.ui.colors

import time
@@ -295,7 +296,8 @@ def print_run_result_error(
result, newline: bool = True, is_warning: bool = False
) -> None:
if newline:
logger.info("")
with TextOnly():
logger.info("")

if result.fail or (is_warning and result.warn):
if is_warning:
@@ -319,7 +321,8 @@ def print_run_result_error(
logger.info(" Got {}, expected 0.".format(status))

if result.node.build_path is not None:
logger.info("")
with TextOnly():
logger.info("")
logger.info(" compiled SQL at {}".format(
result.node.build_path))

@@ -357,19 +360,21 @@ def print_end_of_run_summary(
else:
message = green('Completed successfully')

logger.info('')
with TextOnly():
logger.info('')
logger.info('{}'.format(message))


def print_run_end_messages(results, early_exit: bool = False) -> None:
errors = [r for r in results if r.error is not None or r.fail]
warnings = [r for r in results if r.warn]
print_end_of_run_summary(len(errors), len(warnings), early_exit)
with DbtStatusMessage(), InvocationProcessor():
print_end_of_run_summary(len(errors), len(warnings), early_exit)

for error in errors:
print_run_result_error(error, is_warning=False)
for error in errors:
print_run_result_error(error, is_warning=False)

for warning in warnings:
print_run_result_error(warning, is_warning=True)
for warning in warnings:
print_run_result_error(warning, is_warning=True)

print_run_status_line(results)
print_run_status_line(results)