Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix run repeatability/caching issues (#1138, #1139, #1140) #1144

Merged
merged 7 commits into from
Nov 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 65 additions & 34 deletions dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,26 @@
_ReferenceKey = namedtuple('_ReferenceKey', 'schema identifier')


def _lower(value):
"""Postgres schemas can be None so we can't just call lower()."""
if value is None:
return None
return value.lower()


def _make_key(schema, identifier):
"""Make _ReferenceKeys with lowercase values for the cache so we don't have
to keep track of quoting
"""
return _ReferenceKey(_lower(schema), _lower(identifier))


def dot_separated(key):
"""Return the key in dot-separated string form.

:param key _ReferenceKey: The key to stringify.
"""
return '.'.join(key)
return '.'.join(map(str, key))


class _CachedRelation(object):
Expand All @@ -37,11 +51,11 @@ def __str__(self):

@property
def schema(self):
return self.inner.schema
return _lower(self.inner.schema)

@property
def identifier(self):
return self.inner.identifier
return _lower(self.inner.identifier)

def __copy__(self):
new = self.__class__(self.inner)
Expand All @@ -61,7 +75,7 @@ def key(self):

:return _ReferenceKey: A key for this relation.
"""
return _ReferenceKey(self.schema, self.identifier)
return _make_key(self.schema, self.identifier)

def add_reference(self, referrer):
"""Add a reference from referrer to self, indicating that if this node
Expand Down Expand Up @@ -161,22 +175,22 @@ def add_schema(self, schema):

:param str schema: The schema name to add.
"""
self.schemas.add(schema.lower())
self.schemas.add(_lower(schema))

def update_schemas(self, schemas):
"""Add multiple schemas to the set of known schemas (case-insensitive)

:param Iterable[str] schemas: An iterable of the schema names to add.
"""
self.schemas.update(s.lower() for s in schemas)
self.schemas.update(_lower(s) for s in schemas)

def __contains__(self, schema):
"""A schema is 'in' the relations cache if it is in the set of cached
schemas.

:param str schema: The schema name to look up.
"""
return schema in self.schemas
return _lower(schema) in self.schemas

def dump_graph(self):
"""Dump a key-only representation of the schema to a dictionary. Every
Expand All @@ -199,7 +213,7 @@ def _setdefault(self, relation):
:return _CachedRelation: The relation stored under the given relation's
key
"""
self.schemas.add(relation.schema)
self.add_schema(relation.schema)
key = relation.key()
return self.relations.setdefault(key, relation)

Expand Down Expand Up @@ -243,7 +257,7 @@ def add_link(self, referenced, dependent):
:param DefaultRelation dependent: The dependent model.
:raises InternalError: If either entry does not exist.
"""
referenced = _ReferenceKey(
referenced = _make_key(
schema=referenced.schema,
identifier=referenced.name
)
Expand All @@ -257,7 +271,7 @@ def add_link(self, referenced, dependent):
.format(dep=dependent, ref=referenced)
)
return
dependent = _ReferenceKey(
dependent = _make_key(
schema=dependent.schema,
identifier=dependent.name
)
Expand Down Expand Up @@ -324,35 +338,19 @@ def drop(self, relation):
:param str schema: The schema of the relation to drop.
:param str identifier: The identifier of the relation to drop.
"""
dropped = _ReferenceKey(schema=relation.schema,
identifier=relation.identifier)
dropped = _make_key(schema=relation.schema,
identifier=relation.identifier)
logger.debug('Dropping relation: {!s}'.format(dropped))
with self.lock:
self._drop_cascade_relation(dropped)

def _rename_relation(self, old_key, new_key):
"""Rename a relation named old_key to new_key, updating references.
If the new key is already present, that is an error.
If the old key is absent, we only debug log and return, assuming it's a
temp table being renamed.
Return whether or not there was a key to rename.

:param _ReferenceKey old_key: The existing key, to rename from.
:param _ReferenceKey new_key: The new key, to rename to.
:raises InternalError: If the new key is already present.
"""
if old_key not in self.relations:
logger.debug(
'old key {} not found in self.relations, assuming temporary'
.format(old_key)
)
return

if new_key in self.relations:
dbt.exceptions.raise_cache_inconsistent(
'in rename, new key {} already in cache: {}'
.format(new_key, list(self.relations.keys()))
)

# On the database level, a rename updates all values that were
# previously referenced by old_name to be referenced by new_name.
# basically, the name changes but some underlying ID moves. Kind of
Expand All @@ -370,6 +368,34 @@ def _rename_relation(self, old_key, new_key):
cached.rename_key(old_key, new_key)

self.relations[new_key] = relation
return True

def _check_rename_constraints(self, old_key, new_key):
"""Check the rename constraints, and return whether or not the rename
can proceed.

If the new key is already present, that is an error.
If the old key is absent, we debug log and return False, assuming it's
a temp table being renamed.

:param _ReferenceKey old_key: The existing key, to rename from.
:param _ReferenceKey new_key: The new key, to rename to.
:return bool: If the old relation exists for renaming.
:raises InternalError: If the new key is already present.
"""
if new_key in self.relations:
dbt.exceptions.raise_cache_inconsistent(
'in rename, new key {} already in cache: {}'
.format(new_key, list(self.relations.keys()))
)

if old_key not in self.relations:
logger.debug(
'old key {} not found in self.relations, assuming temporary'
.format(old_key)
)
return False
return True

def rename(self, old, new):
"""Rename the old schema/identifier to the new schema/identifier and
Expand All @@ -383,11 +409,11 @@ def rename(self, old, new):
:param DefaultRelation new: The new relation name information.
:raises InternalError: If the new key is already present.
"""
old_key = _ReferenceKey(
old_key = _make_key(
schema=old.schema,
identifier=old.identifier
)
new_key = _ReferenceKey(
new_key = _make_key(
schema=new.schema,
identifier=new.identifier
)
Expand All @@ -397,8 +423,13 @@ def rename(self, old, new):
logger.debug('before rename: {}'.format(
pprint.pformat(self.dump_graph()))
)

with self.lock:
self._rename_relation(old_key, new_key)
if self._check_rename_constraints(old_key, new_key):
self._rename_relation(old_key, new_key)
else:
self._setdefault(_CachedRelation(new))

logger.debug('after rename: {}'.format(
pprint.pformat(self.dump_graph()))
)
Expand All @@ -410,11 +441,11 @@ def get_relations(self, schema):
:return List[DefaultRelation]: The list of relations with the given
schema
"""
schema = schema.lower()
schema = _lower(schema)
with self.lock:
results = [
r.inner for r in self.relations.values()
if r.schema.lower() == schema
if _lower(r.schema) == schema
]

if None in results:
Expand Down
4 changes: 1 addition & 3 deletions dbt/flags.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
STRICT_MODE = False
NON_DESTRUCTIVE = False
FULL_REFRESH = False
LOG_CACHE_EVENTS = False
USE_CACHE = True


def reset():
global STRICT_MODE, NON_DESTRUCTIVE, FULL_REFRESH, LOG_CACHE_EVENTS
global STRICT_MODE, NON_DESTRUCTIVE, FULL_REFRESH

STRICT_MODE = False
NON_DESTRUCTIVE = False
FULL_REFRESH = False
LOG_CACHE_EVENTS = False
USE_CACHE = True
9 changes: 7 additions & 2 deletions dbt/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,17 @@ def initialize_logger(debug_mode=False, path=None):
warning_logger.addHandler(logdir_handler)
warning_logger.setLevel(logging.DEBUG)

CACHE_LOGGER.propagate = dbt.flags.LOG_CACHE_EVENTS

initialized = True


def logger_initialized():
return initialized


def log_cache_events(flag):
"""Set the cache logger to propagate its messages based on the given flag.
"""
CACHE_LOGGER.propagate = flag


GLOBAL_LOGGER = logger
8 changes: 6 additions & 2 deletions dbt/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dbt.logger import initialize_logger, GLOBAL_LOGGER as logger, \
logger_initialized
logger_initialized, log_cache_events

import argparse
import os.path
Expand All @@ -19,6 +19,7 @@
import dbt.task.archive as archive_task
import dbt.task.generate as generate_task
import dbt.task.serve as serve_task
from dbt.adapters.factory import reset_adapters

import dbt.tracking
import dbt.ui.printer
Expand Down Expand Up @@ -122,6 +123,8 @@ def handle_and_check(args):
if colorize_output(profile_config):
dbt.ui.printer.use_colors()

reset_adapters()

try:
task, res = run_from_args(parsed)
finally:
Expand Down Expand Up @@ -209,6 +212,8 @@ def invoke_dbt(parsed):
task = None
cfg = None

log_cache_events(getattr(parsed, 'log_cache_events', False))

try:
if parsed.which in {'deps', 'clean'}:
# deps doesn't need a profile, so don't require one.
Expand Down Expand Up @@ -251,7 +256,6 @@ def invoke_dbt(parsed):
return None

flags.NON_DESTRUCTIVE = getattr(parsed, 'non_destructive', False)
flags.LOG_CACHE_EVENTS = getattr(parsed, 'log_cache_events', False)
flags.USE_CACHE = getattr(parsed, 'use_cache', True)

arg_drop_existing = getattr(parsed, 'drop_existing', False)
Expand Down
6 changes: 6 additions & 0 deletions test/integration/038_caching_test/models/model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{{
config(
materialized='table'
)
}}
select 1 as id
6 changes: 6 additions & 0 deletions test/integration/038_caching_test/shouting_models/MODEL.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{{
config(
materialized='table'
)
}}
select 1 as id
60 changes: 60 additions & 0 deletions test/integration/038_caching_test/test_caching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from test.integration.base import DBTIntegrationTest, use_profile
from dbt.adapters import factory

class TestBaseCaching(DBTIntegrationTest):
@property
def schema(self):
return "caching_038"

@property
def project_config(self):
return {
'quoting': {
'identifier': False,
'schema': False,
}
}

def run_and_get_adapter(self):
# we want to inspect the adapter that dbt used for the run, which is
# not self.adapter. You can't do this until after you've run dbt once.
self.run_dbt(['run'], clear_adapters=False)
return factory._ADAPTERS[self.adapter_type]

def cache_run(self):
adapter = self.run_and_get_adapter()
self.assertEqual(len(adapter.cache.relations), 1)
relation = next(iter(adapter.cache.relations.values()))
self.assertEqual(relation.inner.schema, self.unique_schema())
self.assertEqual(relation.schema, self.unique_schema().lower())

self.run_dbt(['run'], clear_adapters=False)
self.assertEqual(len(adapter.cache.relations), 1)
second_relation = next(iter(adapter.cache.relations.values()))
self.assertEqual(relation, second_relation)

class TestCachingLowercaseModel(TestBaseCaching):
@property
def models(self):
return "test/integration/038_caching_test/models"

@use_profile('snowflake')
def test_snowflake_cache(self):
self.cache_run()

@use_profile('postgres')
def test_postgres_cache(self):
self.cache_run()

class TestCachingUppercaseModel(TestBaseCaching):
@property
def models(self):
return "test/integration/038_caching_test/shouting_models"

@use_profile('snowflake')
def test_snowflake_cache(self):
self.cache_run()

@use_profile('postgres')
def test_postgres_cache(self):
self.cache_run()
5 changes: 3 additions & 2 deletions test/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,10 @@ def project_config(self):
def profile_config(self):
return {}

def run_dbt(self, args=None, expect_pass=True, strict=True):
def run_dbt(self, args=None, expect_pass=True, strict=True, clear_adapters=True):
# clear the adapter cache
reset_adapters()
if clear_adapters:
reset_adapters()
if args is None:
args = ["run"]

Expand Down
4 changes: 2 additions & 2 deletions test/unit/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ def test_add(self):
relations = self.cache.get_relations('foo')
self.assertEqual(len(relations), 2)

self.assertEqual(self.cache.schemas, {'foo', 'FOO'})
self.assertEqual(self.cache.schemas, {'foo'})
self.assertIsNot(self.cache.relations[('foo', 'bar')].inner, None)
self.assertIsNot(self.cache.relations[('FOO', 'baz')].inner, None)
self.assertIsNot(self.cache.relations[('foo', 'baz')].inner, None)

def test_rename(self):
self.cache.add(make_relation('foo', 'bar'))
Expand Down