Skip to content

Commit

Permalink
Change list_relations_without_caching macro to take a single argument
Browse files Browse the repository at this point in the history
The argument is a Relation object with no identifier field, configured with the appropriate quoting information

Unique quoted/unquoted representations will be treated as distinct
The logic for generating what schemas to search for relations is now distinct from the catalog search logic.
Schema creation/dropping takes a similar relation argument
Add tests
  • Loading branch information
Jacob Beck committed May 7, 2020
1 parent 9cd7cbc commit 7c99b76
Show file tree
Hide file tree
Showing 22 changed files with 218 additions and 169 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
## dbt 0.17.0 (Release TBD)

### Breaking changes
- The `list_relations_without_caching`, `drop_schema`, and `create_schema` macros and methods now accept a single argument of a Relation object with no identifier field. ([#2411](https://github.com/fishtown-analytics/dbt/pull/2411))

### Fixes
- When tracking is disabled due to errors, do not reset the invocation ID ([#2398](https://github.com/fishtown-analytics/dbt/issues/2398), [#2400](https://github.com/fishtown-analytics/dbt/pull/2400))
- Fix for logic error in compilation errors for duplicate data test names ([#2406](https://github.com/fishtown-analytics/dbt/issues/2406), [#2407](https://github.com/fishtown-analytics/dbt/pull/2407))
- Fix a bug where quoted uppercase schemas on snowflake were not processed properly during cache building. ([#2403](https://github.com/fishtown-analytics/dbt/issues/2403), [#2411](https://github.com/fishtown-analytics/dbt/pull/2411))

## dbt 0.17.0b1 (May 5, 2020)

Expand Down
60 changes: 37 additions & 23 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from dbt.contracts.graph.parsed import ParsedSeedNode
from dbt.contracts.graph.model_config import BaseConfig
from dbt.exceptions import warn_or_error
from dbt.node_types import NodeType
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.node_type import NodeType
from dbt.utils import filter_null_values, executor

from dbt.adapters.base.connections import BaseConnectionManager, Connection
Expand Down Expand Up @@ -278,9 +278,18 @@ def _schema_is_cached(self, database: str, schema: str) -> bool:
else:
return True

def _get_cache_schemas(
self, manifest: Manifest, exec_only: bool = False
) -> SchemaSearchMap:
def _get_cache_schemas(self, manifest: Manifest) -> Set[BaseRelation]:
"""Get the set of schema relations that the cache logic needs to
populate. This means only executable nodes are included.
"""
# the cache only cares about executable nodes
return {
self.Relation.create_from(self.config, node).without_identifier()
for node in manifest.nodes.values()
if node.node_type in NodeType.executable()
}

def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
"""Get a mapping of each node's "information_schema" relations to a
set of all schemas expected in that information_schema.
Expand All @@ -295,8 +304,6 @@ def _get_cache_schemas(
manifest.sources.values(),
)
for node in nodes:
if exec_only and node.resource_type not in NodeType.executable():
continue
relation = self.Relation.create_from(self.config, node)
info_schema_name_map.add(relation)
# result is a map whose keys are information_schema Relations without
Expand All @@ -306,10 +313,11 @@ def _get_cache_schemas(
return info_schema_name_map

def _list_relations_get_connection(
self, db: BaseRelation, schema: str
self, schema_relation: BaseRelation
) -> List[BaseRelation]:
with self.connection_named(f'list_{db.database}_{schema}'):
return self.list_relations_without_caching(db, schema)
name = f'list_{schema_relation.database}_{schema_relation.schema}'
with self.connection_named(name):
return self.list_relations_without_caching(schema_relation)

def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
"""Populate the relations cache for the given schemas. Returns an
Expand All @@ -318,11 +326,11 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
if not dbt.flags.USE_CACHE:
return

schema_map = self._get_cache_schemas(manifest, exec_only=True)
cache_schemas = self._get_cache_schemas(manifest)
with executor(self.config) as tpe:
futures: List[Future[List[BaseRelation]]] = [
tpe.submit(self._list_relations_get_connection, db, schema)
for db, schema in schema_map.search()
tpe.submit(self._list_relations_get_connection, cache_schema)
for cache_schema in cache_schemas
]
for future in as_completed(futures):
# if we can't read the relations we need to just raise anyway,
Expand All @@ -333,7 +341,14 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
# it's possible that there were no relations in some schemas. We want
# to insert the schemas we query into the cache's `.schemas` attribute
# so we can check it later
self.cache.update_schemas(schema_map.schemas_searched())
cache_update: Set[Tuple[str, Optional[str]]] = set()
for relation in cache_schemas:
if relation.database is None:
raise InternalException(
'Got a None database in a cached schema!'
)
cache_update.add((relation.database, relation.schema))
self.cache.update_schemas(cache_update)

def set_relations_cache(
self, manifest: Manifest, clear: bool = False
Expand Down Expand Up @@ -512,15 +527,14 @@ def expand_column_types(

@abc.abstractmethod
def list_relations_without_caching(
self, information_schema: BaseRelation, schema: str
self, schema_relation: BaseRelation
) -> List[BaseRelation]:
"""List relations in the given schema, bypassing the cache.
This is used as the underlying behavior to fill the cache.
:param Relation information_schema: The information schema to list
relations from.
:param str schema: The name of the schema to list relations from.
:param schema_relation: A relation containing the database and schema
as appropraite for the underlying data warehouse
:return: The relations in schema
:rtype: List[self.Relation]
"""
Expand Down Expand Up @@ -636,17 +650,17 @@ def list_relations(self, database: str, schema: str) -> List[BaseRelation]:
if self._schema_is_cached(database, schema):
return self.cache.get_relations(database, schema)

information_schema = self.Relation.create(
schema_relation = self.Relation.create(
database=database,
schema=schema,
identifier='',
quote_policy=self.config.quoting
).information_schema()
).without_identifier()

# we can't build the relations cache because we don't have a
# manifest so we can't run any operations.
relations = self.list_relations_without_caching(
information_schema, schema
schema_relation
)

logger.debug('with database={}, schema={}, relations={}'
Expand Down Expand Up @@ -727,15 +741,15 @@ def already_exists(self, schema: str, name: str) -> bool:
###
@abc.abstractmethod
@available.parse_none
def create_schema(self, database: str, schema: str):
def create_schema(self, relation: BaseRelation):
"""Create the given schema if it does not exist."""
raise NotImplementedException(
'`create_schema` is not implemented for this adapter!'
)

@abc.abstractmethod
@available.parse_none
def drop_schema(self, database: str, schema: str):
def drop_schema(self, relation: BaseRelation):
"""Drop the given schema (and everything in it) if it exists."""
raise NotImplementedException(
'`drop_schema` is not implemented for this adapter!'
Expand Down Expand Up @@ -1014,7 +1028,7 @@ def _get_one_catalog(
def get_catalog(
self, manifest: Manifest
) -> Tuple[agate.Table, List[Exception]]:
schema_map = self._get_cache_schemas(manifest)
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = [
Expand Down
37 changes: 17 additions & 20 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ def information_schema(self, view_name=None) -> 'InformationSchema':
def information_schema_only(self) -> 'InformationSchema':
return self.information_schema()

def without_identifier(self) -> 'BaseRelation':
"""Return a form of this relation that only has the database and schema
set to included. To get the appropriately-quoted form the schema out of
the result (for use as part of a query), use `.render()`. To get the
raw database or schema name, use `.database` or `.schema`.
The hash of the returned object is the result of render().
"""
return self.include(identifier=False).replace_path(identifier=None)

def _render_iterator(
self
) -> Iterator[Tuple[Optional[ComponentName], Optional[str]]]:
Expand Down Expand Up @@ -501,38 +511,25 @@ def _render_iterator(self):

class SchemaSearchMap(Dict[InformationSchema, Set[Optional[str]]]):
"""A utility class to keep track of what information_schema tables to
search for what schemas
search for what schemas. The schema values are all lowercased to avoid
duplication.
"""
def add(self, relation: BaseRelation, preserve_case=False):
def add(self, relation: BaseRelation):
key = relation.information_schema_only()
if key not in self:
self[key] = set()
schema: Optional[str] = None
if relation.schema is not None:
if preserve_case:
schema = relation.schema
else:
schema = relation.schema.lower()
schema = relation.schema.lower()
self[key].add(schema)

def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]:
def search(
self
) -> Iterator[Tuple[InformationSchema, Optional[str]]]:
for information_schema_name, schemas in self.items():
for schema in schemas:
yield information_schema_name, schema

def schemas_searched(self) -> Set[Tuple[str, Optional[str]]]:
result: Set[Tuple[str, Optional[str]]] = set()
for information_schema_name, schemas in self.items():
if information_schema_name.database is None:
raise InternalException(
'Got a None database in an information schema!'
)
result.update(
(information_schema_name.database, schema)
for schema in schemas
)
return result

def flatten(self):
new = self.__class__()

Expand Down
22 changes: 11 additions & 11 deletions core/dbt/adapters/sql/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,31 +174,31 @@ def get_columns_in_relation(self, relation):
kwargs={'relation': relation}
)

def create_schema(self, database: str, schema: str) -> None:
logger.debug('Creating schema "{}"."{}".', database, schema)
def create_schema(self, relation: BaseRelation) -> None:
relation = relation.without_identifier()
logger.debug('Creating schema "{}"', relation)
kwargs = {
'database_name': self.quote_as_configured(database, 'database'),
'schema_name': self.quote_as_configured(schema, 'schema'),
'relation': relation,
}
self.execute_macro(CREATE_SCHEMA_MACRO_NAME, kwargs=kwargs)
self.commit_if_has_connection()
# we can't update the cache here, as if the schema already existed we
# don't want to (incorrectly) say that it's empty

def drop_schema(self, database: str, schema: str) -> None:
logger.debug('Dropping schema "{}"."{}".', database, schema)
def drop_schema(self, relation: BaseRelation) -> None:
relation = relation.without_identifier()
logger.debug('Dropping schema "{}".', relation)
kwargs = {
'database_name': self.quote_as_configured(database, 'database'),
'schema_name': self.quote_as_configured(schema, 'schema'),
'relation': relation,
}
self.execute_macro(DROP_SCHEMA_MACRO_NAME, kwargs=kwargs)
# we can update the cache here
self.cache.drop_schema(database, schema)
self.cache.drop_schema(relation.database, relation.schema)

def list_relations_without_caching(
self, information_schema, schema
self, schema_relation: BaseRelation,
) -> List[BaseRelation]:
kwargs = {'information_schema': information_schema, 'schema': schema}
kwargs = {'schema_relation': schema_relation}
results = self.execute_macro(
LIST_RELATIONS_MACRO_NAME,
kwargs=kwargs
Expand Down
22 changes: 11 additions & 11 deletions core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,23 @@
{{ return(load_result('get_columns_in_query').table.columns | map(attribute='name') | list) }}
{% endmacro %}

{% macro create_schema(database_name, schema_name) -%}
{{ adapter_macro('create_schema', database_name, schema_name) }}
{% macro create_schema(relation) -%}
{{ adapter_macro('create_schema', relation) }}
{% endmacro %}

{% macro default__create_schema(database_name, schema_name) -%}
{% macro default__create_schema(relation) -%}
{%- call statement('create_schema') -%}
create schema if not exists {{database_name}}.{{schema_name}}
create schema if not exists {{ relation.without_identifier() }}
{% endcall %}
{% endmacro %}

{% macro drop_schema(database_name, schema_name) -%}
{{ adapter_macro('drop_schema', database_name, schema_name) }}
{% macro drop_schema(relation) -%}
{{ adapter_macro('drop_schema', relation) }}
{% endmacro %}

{% macro default__drop_schema(database_name, schema_name) -%}
{% macro default__drop_schema(relation) -%}
{%- call statement('drop_schema') -%}
drop schema if exists {{database_name}}.{{schema_name}} cascade
drop schema if exists {{ relation.without_identifier() }} cascade
{% endcall %}
{% endmacro %}

Expand Down Expand Up @@ -249,12 +249,12 @@
{% endmacro %}


{% macro list_relations_without_caching(information_schema, schema) %}
{{ return(adapter_macro('list_relations_without_caching', information_schema, schema)) }}
{% macro list_relations_without_caching(schema_relation) %}
{{ return(adapter_macro('list_relations_without_caching', schema_relation)) }}
{% endmacro %}


{% macro default__list_relations_without_caching(information_schema, schema) %}
{% macro default__list_relations_without_caching(schema_relation) %}
{{ exceptions.raise_not_implemented(
'list_relations_without_caching macro not implemented for adapter '+adapter.type()) }}
{% endmacro %}
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def make_unique_id_map(
sources: Dict[str, CatalogTable] = {}

node_map, source_map = get_unique_id_mapping(manifest)
table: CatalogTable
for table in self.values():
key = table.key()
if key in node_map:
Expand Down
Loading

0 comments on commit 7c99b76

Please sign in to comment.