diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index f0722e40d7b..3f59636bb65 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -41,12 +41,12 @@ class BigQueryAdapter(PostgresAdapter): "expand_target_column_types", "load_dataframe", "get_missing_columns", + "cache_new_relation", "create_schema", "alter_table_add_columns", # versions of adapter functions that take / return Relations - "list_relations", "get_relation", "drop_relation", "rename_relation", @@ -180,7 +180,10 @@ def close(cls, connection): return connection - def list_relations(self, schema, model_name=None): + def _link_cached_relations(self, manifest, schemas): + pass + + def _list_relations(self, schema, model_name=None): connection = self.get_connection(model_name) client = connection.handle @@ -201,27 +204,27 @@ def list_relations(self, schema, model_name=None): # This will 404 if the dataset does not exist. This behavior mirrors # the implementation of list_relations for other adapters try: - return [self.bq_table_to_relation(table) for table in all_tables] + return [self._bq_table_to_relation(table) for table in all_tables] except google.api_core.exceptions.NotFound as e: return [] - def get_relation(self, schema=None, identifier=None, - relations_list=None, model_name=None): - if schema is None and relations_list is None: - raise dbt.exceptions.RuntimeException( - 'get_relation needs either a schema to query, or a list ' - 'of relations to use') - - if relations_list is None and identifier is not None: - table = self.get_bq_table(schema, identifier) + def get_relation(self, schema, identifier, model_name=None): + if self._schema_is_cached(schema, model_name): + # if it's in the cache, use the parent's model of going through + # the relations cache and picking out the relation + return super(BigQueryAdapter, self).get_relation( + schema=schema, + identifier=identifier, + model_name=model_name + ) - return self.bq_table_to_relation(table) - - return super(BigQueryAdapter, self).get_relation( - schema, identifier, relations_list, - model_name) + table = self._get_bq_table(schema, identifier) + return self._bq_table_to_relation(table) def drop_relation(self, relation, model_name=None): + if self._schema_is_cached(relation.schema, model_name): + self.cache.drop(relation) + conn = self.get_connection(model_name) client = conn.handle @@ -515,7 +518,7 @@ def get_dataset(self, dataset_name, model_name=None): dataset_ref = conn.handle.dataset(dataset_name) return google.cloud.bigquery.Dataset(dataset_ref) - def bq_table_to_relation(self, bq_table): + def _bq_table_to_relation(self, bq_table): if bq_table is None: return None @@ -529,7 +532,7 @@ def bq_table_to_relation(self, bq_table): }, type=self.RELATION_TYPES.get(bq_table.table_type)) - def get_bq_table(self, dataset_name, identifier, model_name=None): + def _get_bq_table(self, dataset_name, identifier, model_name=None): conn = self.get_connection(model_name) dataset = self.get_dataset(dataset_name, model_name) diff --git a/dbt/adapters/cache.py b/dbt/adapters/cache.py new file mode 100644 index 00000000000..4c562180ab7 --- /dev/null +++ b/dbt/adapters/cache.py @@ -0,0 +1,430 @@ +from collections import namedtuple +import threading +from copy import deepcopy +import pprint +from dbt.logger import CACHE_LOGGER as logger +import dbt.exceptions + + +_ReferenceKey = namedtuple('_ReferenceKey', 'schema identifier') + + +def dot_separated(key): + """Return the key in dot-separated string form. + + :param key _ReferenceKey: The key to stringify. + """ + return '.'.join(key) + + +class _CachedRelation(object): + """Nothing about _CachedRelation is guaranteed to be thread-safe! + + :attr str schema: The schema of this relation. + :attr str identifier: The identifier of this relation. + :attr Dict[_ReferenceKey, _CachedRelation] referenced_by: The relations + that refer to this relation. + :attr DefaultRelation inner: The underlying dbt relation. + """ + def __init__(self, inner): + self.referenced_by = {} + self.inner = inner + + def __str__(self): + return ( + '_CachedRelation(schema={}, identifier={}, inner={})' + ).format(self.schema, self.identifier, self.inner) + + @property + def schema(self): + return self.inner.schema + + @property + def identifier(self): + return self.inner.identifier + + def __copy__(self): + new = self.__class__(self.inner) + new.__dict__.update(self.__dict__) + return new + + def __deepcopy__(self, memo): + new = self.__class__(self.inner.incorporate()) + new.__dict__.update(self.__dict__) + new.referenced_by = deepcopy(self.referenced_by, memo) + + def is_referenced_by(self, key): + return key in self.referenced_by + + def key(self): + """Get the _ReferenceKey that represents this relation + + :return _ReferenceKey: A key for this relation. + """ + return _ReferenceKey(self.schema, self.identifier) + + def add_reference(self, referrer): + """Add a reference from referrer to self, indicating that if this node + were drop...cascaded, the referrer would be dropped as well. + + :param _CachedRelation referrer: The node that refers to this node. + """ + self.referenced_by[referrer.key()] = referrer + + def collect_consequences(self): + """Recursively collect a set of _ReferenceKeys that would + consequentially get dropped if this were dropped via + "drop ... cascade". + + :return Set[_ReferenceKey]: All the relations that would be dropped + """ + consequences = {self.key()} + for relation in self.referenced_by.values(): + consequences.update(relation.collect_consequences()) + return consequences + + def release_references(self, keys): + """Non-recursively indicate that an iterable of _ReferenceKey no longer + exist. Unknown keys are ignored. + + :param Iterable[_ReferenceKey] keys: The keys to drop. + """ + keys = set(self.referenced_by) & set(keys) + for key in keys: + self.referenced_by.pop(key) + + def rename(self, new_relation): + """Rename this cached relation to new_relation. + Note that this will change the output of key(), all refs must be + updated! + + :param _ReferenceKey new_relation: The new name to apply to the + relation + """ + # Relations store this stuff inside their `path` dict. But they + # also store a table_name, and usually use it in their .render(), + # so we need to update that as well. It doesn't appear that + # table_name is ever anything but the identifier (via .create()) + self.inner = self.inner.incorporate( + path={ + 'schema': new_relation.schema, + 'identifier': new_relation.identifier + }, + table_name=new_relation.identifier + ) + + def rename_key(self, old_key, new_key): + """Rename a reference that may or may not exist. Only handles the + reference itself, so this is the other half of what `rename` does. + + If old_key is not in referenced_by, this is a no-op. + + :param _ReferenceKey old_key: The old key to be renamed. + :param _ReferenceKey new_key: The new key to rename to. + :raises InternalError: If the new key already exists. + """ + if new_key in self.referenced_by: + dbt.exceptions.raise_cache_inconsistent( + 'in rename of "{}" -> "{}", new name is in the cache already' + .format(old_key, new_key) + ) + + if old_key not in self.referenced_by: + return + value = self.referenced_by.pop(old_key) + self.referenced_by[new_key] = value + + def dump_graph_entry(self): + """Return a key/value pair representing this key and its referents. + + return List[str]: The dot-separated form of all referent keys. + """ + return [dot_separated(r) for r in self.referenced_by] + + +class RelationsCache(object): + """A cache of the relations known to dbt. Keeps track of relationships + declared between tables and handles renames/drops as a real database would. + + :attr Dict[_ReferenceKey, _CachedRelation] relations: The known relations. + :attr threading.RLock lock: The lock around relations, held during updates. + The adapters also hold this lock while filling the cache. + :attr Set[str] schemas: The set of known/cached schemas, all lowercased. + """ + def __init__(self): + self.relations = {} + self.lock = threading.RLock() + self.schemas = set() + + def add_schema(self, schema): + """Add a schema to the set of known schemas (case-insensitive) + + :param str schema: The schema name to add. + """ + self.schemas.add(schema.lower()) + + def update_schemas(self, schemas): + """Add multiple schemas to the set of known schemas (case-insensitive) + + :param Iterable[str] schemas: An iterable of the schema names to add. + """ + self.schemas.update(s.lower() for s in schemas) + + def __contains__(self, schema): + """A schema is 'in' the relations cache if it is in the set of cached + schemas. + + :param str schema: The schema name to look up. + """ + return schema in self.schemas + + def dump_graph(self): + """Dump a key-only representation of the schema to a dictionary. Every + known relation is a key with a value of a list of keys it is referenced + by. + """ + # we have to hold the lock for the entire dump, if other threads modify + # self.relations or any cache entry's referenced_by during iteration + # it's a runtime error! + with self.lock: + return { + dot_separated(k): v.dump_graph_entry() + for k, v in self.relations.items() + } + + def _setdefault(self, relation): + """Add a relation to the cache, or return it if it already exists. + + :param _CachedRelation relation: The relation to set or get. + :return _CachedRelation: The relation stored under the given relation's + key + """ + self.schemas.add(relation.schema) + key = relation.key() + return self.relations.setdefault(key, relation) + + def _add_link(self, referenced_key, dependent_key): + """Add a link between two relations to the database. Both the old and + new entries must alraedy exist in the database. + + :param _ReferenceKey referenced_key: The key identifying the referenced + model (the one that if dropped will drop the dependent model). + :param _ReferenceKey dependent_key: The key identifying the dependent + model. + :raises InternalError: If either entry does not exist. + """ + referenced = self.relations.get(referenced_key) + if referenced is None: + dbt.exceptions.raise_cache_inconsistent( + 'in add_link, referenced link key {} not in cache!' + .format(referenced_key) + ) + + dependent = self.relations.get(dependent_key) + if dependent is None: + dbt.exceptions.raise_cache_inconsistent( + 'in add_link, dependent link key {} not in cache!' + .format(dependent_key) + ) + + referenced.add_reference(dependent) + + def add_link(self, referenced, dependent): + """Add a link between two relations to the database. Both the old and + new entries must already exist in the database. + + The dependent model refers _to_ the referenced model. So, given + arguments of (jake_test, bar, jake_test, foo): + both values are in the schema jake_test and foo is a view that refers + to bar, so "drop bar cascade" will drop foo and all of foo's + dependents. + + :param DefaultRelation referenced: The referenced model. + :param DefaultRelation dependent: The dependent model. + :raises InternalError: If either entry does not exist. + """ + referenced = _ReferenceKey( + schema=referenced.schema, + identifier=referenced.name + ) + if referenced.schema not in self: + # if we have not cached the referenced schema at all, we must be + # referring to a table outside our control. There's no need to make + # a link - we will never drop the referenced relation during a run. + logger.debug( + '{dep!s} references {ref!s} but {ref.schema} is not in the ' + 'cache, skipping assumed external relation' + .format(dep=dependent, ref=referenced) + ) + return + dependent = _ReferenceKey( + schema=dependent.schema, + identifier=dependent.name + ) + logger.debug( + 'adding link, {!s} references {!s}'.format(dependent, referenced) + ) + with self.lock: + self._add_link(referenced, dependent) + + def add(self, relation): + """Add the relation inner to the cache, under the schema schema and + identifier identifier + + :param DefaultRelation relation: The underlying relation. + """ + cached = _CachedRelation(relation) + logger.debug('Adding relation: {!s}'.format(cached)) + logger.debug('before adding: {}'.format( + pprint.pformat(self.dump_graph())) + ) + with self.lock: + self._setdefault(cached) + logger.debug('after adding: {}'.format( + pprint.pformat(self.dump_graph())) + ) + + def _remove_refs(self, keys): + """Removes all references to all entries in keys. This does not + cascade! + + :param Iterable[_ReferenceKey] keys: The keys to remove. + """ + # remove direct refs + for key in keys: + del self.relations[key] + # then remove all entries from each child + for cached in self.relations.values(): + cached.release_references(keys) + + def _drop_cascade_relation(self, dropped): + """Drop the given relation and cascade it appropriately to all + dependent relations. + + :param _CachedRelation dropped: An existing _CachedRelation to drop. + """ + if dropped not in self.relations: + logger.debug('dropped a nonexistent relationship: {!s}' + .format(dropped)) + return + consequences = self.relations[dropped].collect_consequences() + logger.debug( + 'drop {} is cascading to {}'.format(dropped, consequences) + ) + self._remove_refs(consequences) + + def drop(self, relation): + """Drop the named relation and cascade it appropriately to all + dependent relations. + + Because dbt proactively does many `drop relation if exist ... cascade` + that are noops, nonexistent relation drops cause a debug log and no + other actions. + + :param str schema: The schema of the relation to drop. + :param str identifier: The identifier of the relation to drop. + """ + dropped = _ReferenceKey(schema=relation.schema, + identifier=relation.identifier) + logger.debug('Dropping relation: {!s}'.format(dropped)) + with self.lock: + self._drop_cascade_relation(dropped) + + def _rename_relation(self, old_key, new_key): + """Rename a relation named old_key to new_key, updating references. + If the new key is already present, that is an error. + If the old key is absent, we only debug log and return, assuming it's a + temp table being renamed. + + :param _ReferenceKey old_key: The existing key, to rename from. + :param _ReferenceKey new_key: The new key, to rename to. + :raises InternalError: If the new key is already present. + """ + if old_key not in self.relations: + logger.debug( + 'old key {} not found in self.relations, assuming temporary' + .format(old_key) + ) + return + + if new_key in self.relations: + dbt.exceptions.raise_cache_inconsistent( + 'in rename, new key {} already in cache: {}' + .format(new_key, list(self.relations.keys())) + ) + + # On the database level, a rename updates all values that were + # previously referenced by old_name to be referenced by new_name. + # basically, the name changes but some underlying ID moves. Kind of + # like an object reference! + relation = self.relations.pop(old_key) + + relation.rename(new_key) + # update all the relations that refer to it + for cached in self.relations.values(): + if cached.is_referenced_by(old_key): + logger.debug( + 'updated reference from {0} -> {2} to {1} -> {2}' + .format(old_key, new_key, cached.key()) + ) + cached.rename_key(old_key, new_key) + + self.relations[new_key] = relation + + def rename(self, old, new): + """Rename the old schema/identifier to the new schema/identifier and + update references. + + If the new schema/identifier is already present, that is an error. + If the schema/identifier key is absent, we only debug log and return, + assuming it's a temp table being renamed. + + :param DefaultRelation old: The existing relation name information. + :param DefaultRelation new: The new relation name information. + :raises InternalError: If the new key is already present. + """ + old_key = _ReferenceKey( + schema=old.schema, + identifier=old.identifier + ) + new_key = _ReferenceKey( + schema=new.schema, + identifier=new.identifier + ) + logger.debug('Renaming relation {!s} to {!s}'.format( + old_key, new_key) + ) + logger.debug('before rename: {}'.format( + pprint.pformat(self.dump_graph())) + ) + with self.lock: + self._rename_relation(old_key, new_key) + logger.debug('after rename: {}'.format( + pprint.pformat(self.dump_graph())) + ) + + def get_relations(self, schema): + """Case-insensitively yield all relations matching the given schema. + + :param str schema: The case-insensitive schema name to list from. + :return List[DefaultRelation]: The list of relations with the given + schema + """ + schema = schema.lower() + with self.lock: + results = [ + r.inner for r in self.relations.values() + if r.schema.lower() == schema + ] + + if None in results: + dbt.exceptions.raise_cache_inconsistent( + 'in get_relations, a None relation was found in the cache!' + ) + return results + + def clear(self): + """Clear the cache""" + with self.lock: + self.relations.clear() + self.schemas.clear() diff --git a/dbt/adapters/default/impl.py b/dbt/adapters/default/impl.py index 8fe29e4a684..004b68f51db 100644 --- a/dbt/adapters/default/impl.py +++ b/dbt/adapters/default/impl.py @@ -16,6 +16,7 @@ from dbt.utils import filter_null_values from dbt.adapters.default.relation import DefaultRelation +from dbt.adapters.cache import RelationsCache GET_CATALOG_OPERATION_NAME = 'get_catalog_data' @@ -24,25 +25,38 @@ connections_available = [] -def _filter_schemas(manifest): +def _expect_row_value(key, row): + if key not in row.keys(): + raise dbt.exceptions.InternalException( + 'Got a row without "{}" column, columns: {}' + .format(key, row.keys()) + ) + return row[key] + + +def _relations_filter_schemas(schemas): + def test(row): + referenced_schema = _expect_row_value('referenced_schema', row) + dependent_schema = _expect_row_value('dependent_schema', row) + # handle the null schema + if referenced_schema is not None: + referenced_schema = referenced_schema.lower() + if dependent_schema is not None: + dependent_schema = dependent_schema.lower() + return referenced_schema in schemas or dependent_schema in schemas + return test + + +def _catalog_filter_schemas(manifest): """Return a function that takes a row and decides if the row should be included in the catalog output. """ - schemas = frozenset({ - node.schema.lower() - for node in manifest.nodes.values() - }) + schemas = frozenset(s.lower() for s in manifest.get_used_schemas()) def test(row): - if 'table_schema' not in row.keys(): - # this means the get catalog operation is somehow not well formed! - raise dbt.exceptions.InternalException( - 'Got a row without "table_schema" column, columns: {}' - .format(row.keys()) - ) + table_schema = _expect_row_value('table_schema', row) # the schema may be present but None, which is not an error and should # be filtered out - table_schema = row['table_schema'] if table_schema is None: return False return table_schema.lower() in schemas @@ -58,6 +72,7 @@ class DefaultAdapter(object): "expand_target_column_types", "create_schema", "quote_as_configured", + "cache_new_relation", # deprecated -- use versions that take relations instead "already_exists", @@ -70,7 +85,6 @@ class DefaultAdapter(object): "quote_schema_and_table", # versions of adapter functions that take / return Relations - "list_relations", "get_relation", "drop_relation", "rename_relation", @@ -85,13 +99,14 @@ class DefaultAdapter(object): "get_status", "get_result_from_cursor", "quote", - "convert_type" + "convert_type", ] Relation = DefaultRelation Column = Column def __init__(self, config): self.config = config + self.cache = RelationsCache() ### # ADAPTER-SPECIFIC FUNCTIONS -- each of these must be overridden in @@ -130,8 +145,7 @@ def query_for_existing(self, schemas, model_name=None): all_relations = [] for schema in schemas: - all_relations.extend( - self.list_relations(schema, model_name)) + all_relations.extend(self.list_relations(schema, model_name)) return {relation.identifier: relation.type for relation in all_relations} @@ -151,6 +165,17 @@ def cancel_connection(self, connection): ### # FUNCTIONS THAT SHOULD BE ABSTRACT ### + def cache_new_relation(self, relation, model_name=None): + """Cache a new relation in dbt. It will show up in `list relations`.""" + if relation is None: + dbt.exceptions.raise_compiler_error( + 'Attempted to cache a null relation for {}'.format(model_name) + ) + if dbt.flags.USE_CACHE: + self.cache.add(relation) + # so jinja doesn't render things + return '' + @classmethod def get_result_from_cursor(cls, cursor): data = [] @@ -175,6 +200,8 @@ def drop(self, schema, relation, relation_type, model_name=None): return self.drop_relation(relation, model_name) def drop_relation(self, relation, model_name=None): + if dbt.flags.USE_CACHE: + self.cache.drop(relation) if relation.type is None: dbt.exceptions.raise_compiler_error( 'Tried to drop relation {}, but its type is null.' @@ -216,6 +243,8 @@ def rename(self, schema, from_name, to_name, model_name=None): def rename_relation(self, from_relation, to_relation, model_name=None): + if dbt.flags.USE_CACHE: + self.cache.rename(from_relation, to_relation) sql = 'alter table {} rename to {}'.format( from_relation, to_relation.include(schema=False)) @@ -324,10 +353,37 @@ def expand_target_column_types(self, ### # RELATIONS ### - def list_relations(self, schema, model_name=None): + def _schema_is_cached(self, schema, model_name=None, + debug_on_missing=True): + """Check if the schema is cached, and by default logs if it is not.""" + if dbt.flags.USE_CACHE is False: + return False + elif schema not in self.cache: + if debug_on_missing: + logger.debug( + 'On "{}": cache miss for schema "{}", this is inefficient' + .format(model_name or '', schema) + ) + return False + else: + return True + + def _list_relations(self, schema, model_name=None): raise dbt.exceptions.NotImplementedException( '`list_relations` is not implemented for this adapter!') + def list_relations(self, schema, model_name=None): + if self._schema_is_cached(schema, model_name): + return self.cache.get_relations(schema) + + # we can't build the relations cache because we don't have a + # manifest so we can't run any operations. + relations = self._list_relations(schema, model_name=model_name) + + logger.debug('with schema={}, model_name={}, relations={}' + .format(schema, model_name, relations)) + return relations + def _make_match_kwargs(self, schema, identifier): quoting = self.config.quoting if identifier is not None and quoting['identifier'] is False: @@ -339,15 +395,7 @@ def _make_match_kwargs(self, schema, identifier): return filter_null_values({'identifier': identifier, 'schema': schema}) - def get_relation(self, schema=None, identifier=None, - relations_list=None, model_name=None): - if schema is None and relations_list is None: - raise dbt.exceptions.RuntimeException( - 'get_relation needs either a schema to query, or a list ' - 'of relations to use') - - if relations_list is None: - relations_list = self.list_relations(schema, model_name) + def _make_match(self, relations_list, schema, identifier): matches = [] @@ -357,6 +405,13 @@ def get_relation(self, schema=None, identifier=None, if relation.matches(**search): matches.append(relation) + return matches + + def get_relation(self, schema, identifier, model_name=None): + relations_list = self.list_relations(schema, model_name) + + matches = self._make_match(relations_list, schema, identifier) + if len(matches) > 1: dbt.exceptions.get_relation_returned_multiple_results( {'identifier': identifier, 'schema': schema}, matches) @@ -778,15 +833,56 @@ def run_operation(self, manifest, operation_name): # Abstract methods involving the manifest ### @classmethod - def _filter_table(cls, table, manifest): - return table.where(_filter_schemas(manifest)) + def _catalog_filter_table(cls, table, manifest): + return table.where(_catalog_filter_schemas(manifest)) def get_catalog(self, manifest): try: - table = self.run_operation(manifest, - GET_CATALOG_OPERATION_NAME) + table = self.run_operation(manifest, GET_CATALOG_OPERATION_NAME) finally: self.release_connection(GET_CATALOG_OPERATION_NAME) - results = self._filter_table(table, manifest) + results = self._catalog_filter_table(table, manifest) return results + + @classmethod + def _relations_filter_table(cls, table, schemas): + return table.where(_relations_filter_schemas(schemas)) + + def _link_cached_relations(self, manifest, schemas): + """This method has to exist because BigQueryAdapter and SnowflakeAdapter + inherit from the PostgresAdapter, so they need something to override + in order to disable linking. + """ + pass + + def _relations_cache_for_schemas(self, manifest, schemas=None): + if not dbt.flags.USE_CACHE: + return + + if schemas is None: + schemas = manifest.get_used_schemas() + + relations = [] + # add all relations + for schema in schemas: + # bypass the cache, of course! + for relation in self._list_relations(schema): + self.cache.add(relation) + self._link_cached_relations(manifest, schemas) + # it's possible that there were no relations in some schemas. We want + # to insert the schemas we query into the cache's `.schemas` attribute + # so we can check it later + self.cache.update_schemas(schemas) + + def set_relations_cache(self, manifest, clear=False): + """Run a query that gets a populated cache of the relations in the + database and set the cache on this adapter. + """ + if not dbt.flags.USE_CACHE: + return + + with self.cache.lock: + if clear: + self.cache.clear() + self._relations_cache_for_schemas(manifest) diff --git a/dbt/adapters/postgres/impl.py b/dbt/adapters/postgres/impl.py index 29355df5a76..a0a03967931 100644 --- a/dbt/adapters/postgres/impl.py +++ b/dbt/adapters/postgres/impl.py @@ -10,6 +10,9 @@ from dbt.logger import GLOBAL_LOGGER as logger +GET_RELATIONS_OPERATION_NAME = 'get_relations_data' + + class PostgresAdapter(dbt.adapters.default.DefaultAdapter): DEFAULT_TCP_KEEPALIVE = 0 # 0 means to use the default value @@ -143,7 +146,21 @@ def alter_column_type(self, schema, table, column_name, return connection, cursor - def list_relations(self, schema, model_name=None): + def _link_cached_relations(self, manifest, schemas): + try: + table = self.run_operation(manifest, GET_RELATIONS_OPERATION_NAME) + finally: + self.release_connection(GET_RELATIONS_OPERATION_NAME) + table = self._relations_filter_table(table, schemas) + + for (refed_schema, refed_name, dep_schema, dep_name) in table: + referenced = self.Relation.create(schema=refed_schema, + identifier=refed_name) + dependent = self.Relation.create(schema=dep_schema, + identifier=dep_name) + self.cache.add_link(dependent, referenced) + + def _list_relations(self, schema, model_name=None): sql = """ select tablename as name, schemaname as schema, 'table' as type from pg_tables where schemaname ilike '{schema}' @@ -152,8 +169,7 @@ def list_relations(self, schema, model_name=None): where schemaname ilike '{schema}' """.format(schema=schema).strip() # noqa - connection, cursor = self.add_query(sql, model_name, - auto_begin=False) + connection, cursor = self.add_query(sql, model_name, auto_begin=False) results = cursor.fetchall() diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 20d088e59c4..99c874aed77 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -103,7 +103,10 @@ def open_connection(cls, connection): return connection - def list_relations(self, schema, model_name=None): + def _link_cached_relations(self, manifest, schemas): + pass + + def _list_relations(self, schema, model_name=None): sql = """ select table_name as name, table_schema as schema, table_type as type @@ -133,6 +136,7 @@ def list_relations(self, schema, model_name=None): def rename_relation(self, from_relation, to_relation, model_name=None): + self.cache.rename(from_relation, to_relation) sql = 'alter table {} rename to {}'.format( from_relation, to_relation) @@ -209,13 +213,14 @@ def add_query(self, sql, model_name=None, auto_begin=True, return connection, cursor @classmethod - def _filter_table(cls, table, manifest): + def _catalog_filter_table(cls, table, manifest): # On snowflake, users can set QUOTED_IDENTIFIERS_IGNORE_CASE, so force # the column names to their lowercased forms. lowered = table.rename( column_names=[c.lower() for c in table.column_names] ) - return super(SnowflakeAdapter, cls)._filter_table(lowered, manifest) + return super(SnowflakeAdapter, cls)._catalog_filter_table(lowered, + manifest) def _make_match_kwargs(self, schema, identifier): quoting = self.config.quoting diff --git a/dbt/clients/jinja.py b/dbt/clients/jinja.py index fa2fea4ecd7..fac6580db5d 100644 --- a/dbt/clients/jinja.py +++ b/dbt/clients/jinja.py @@ -1,3 +1,7 @@ +import codecs +import linecache +import os + import jinja2 import jinja2._compat import jinja2.ext @@ -37,13 +41,28 @@ def _parse(self, source, name, filename): jinja2._compat.encode_filename(filename) ).parse() + def _compile(self, source, filename): + """Override jinja's compilation to stash the rendered source inside + the python linecache for debugging. + """ + if filename == '