From c21af17a3034645ba0dc801a6c46fe6c5d5d3815 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 26 May 2020 11:56:17 -0600 Subject: [PATCH] Make databases optional --- CHANGELOG.md | 3 ++ core/dbt/adapters/base/impl.py | 12 +++-- core/dbt/adapters/base/relation.py | 17 +++---- core/dbt/adapters/cache.py | 39 +++++++--------- core/dbt/contracts/graph/parsed.py | 2 +- core/dbt/contracts/results.py | 7 +-- .../macros/etc/get_custom_database.sql | 4 ++ core/dbt/node_runners.py | 4 +- core/dbt/parser/base.py | 4 +- core/dbt/task/generate.py | 11 ++++- core/dbt/task/runnable.py | 45 +++++++++---------- core/dbt/utils.py | 7 +++ 12 files changed, 81 insertions(+), 74 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b2acc853d4..5f1f0b4a181 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ - Substantial performance improvements for parsing on large projects, especially projects with many docs definition. ([#2480](https://github.com/fishtown-analytics/dbt/issues/2480), [#2481](https://github.com/fishtown-analytics/dbt/pull/2481)) - Expose Snowflake query id in case of an exception raised by connector ([#2201](https://github.com/fishtown-analytics/dbt/issues/2201), [#2358](https://github.com/fishtown-analytics/dbt/pull/2358)) +### Under the hood +- Better support for optional database fields in adapters ([#2487](https://github.com/fishtown-analytics/dbt/issues/2487) [#2489](https://github.com/fishtown-analytics/dbt/pull/2489)) + Contributors: - [@dmateusp](https://github.com/dmateusp) ([#2475](https://github.com/fishtown-analytics/dbt/pull/2475)) - [@ChristianKohlberg](https://github.com/ChristianKohlberg) (#2358](https://github.com/fishtown-analytics/dbt/pull/2358)) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 479d1dac673..6af38b5264b 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -264,7 +264,7 @@ def load_internal_manifest(self) -> Manifest: ### # Caching methods ### - def _schema_is_cached(self, database: str, schema: str) -> bool: + def _schema_is_cached(self, database: Optional[str], schema: str) -> bool: """Check if the schema is cached, and by default logs if it is not.""" if dbt.flags.USE_CACHE is False: @@ -341,12 +341,8 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None: # it's possible that there were no relations in some schemas. We want # to insert the schemas we query into the cache's `.schemas` attribute # so we can check it later - cache_update: Set[Tuple[str, Optional[str]]] = set() + cache_update: Set[Tuple[Optional[str], Optional[str]]] = set() for relation in cache_schemas: - if relation.database is None: - raise InternalException( - 'Got a None database in a cached schema!' - ) cache_update.add((relation.database, relation.schema)) self.cache.update_schemas(cache_update) @@ -646,7 +642,9 @@ def expand_target_column_types( self.expand_column_types(from_relation, to_relation) - def list_relations(self, database: str, schema: str) -> List[BaseRelation]: + def list_relations( + self, database: Optional[str], schema: str + ) -> List[BaseRelation]: if self._schema_is_cached(database, schema): return self.cache.get_relations(database, schema) diff --git a/core/dbt/adapters/base/relation.py b/core/dbt/adapters/base/relation.py index 6591c48980b..44654bfb342 100644 --- a/core/dbt/adapters/base/relation.py +++ b/core/dbt/adapters/base/relation.py @@ -6,7 +6,7 @@ from collections.abc import Mapping, Hashable from dataclasses import dataclass, fields from typing import ( - Optional, TypeVar, Generic, Any, Type, Dict, Union, List, Iterator, Tuple, + Optional, TypeVar, Generic, Any, Type, Dict, Union, Iterator, Tuple, Set ) from typing_extensions import Protocol @@ -278,16 +278,11 @@ def _render_iterator( yield key, path_part def render(self) -> str: - parts: List[str] = [ - part for _, part in self._render_iterator() if part is not None - ] - - if len(parts) == 0: - raise dbt.exceptions.RuntimeException( - "No path parts are included! Nothing to render." - ) - - return '.'.join(parts) + # if there is nothing set, this will return the empty string. + return '.'.join( + part for _, part in self._render_iterator() + if part is not None + ) def quoted(self, identifier): return '{quote_char}{identifier}{quote_char}'.format( diff --git a/core/dbt/adapters/cache.py b/core/dbt/adapters/cache.py index 157684fe9c2..8bbac5cc95f 100644 --- a/core/dbt/adapters/cache.py +++ b/core/dbt/adapters/cache.py @@ -4,25 +4,20 @@ import threading from dbt.logger import CACHE_LOGGER as logger +from dbt.utils import lowercase import dbt.exceptions _ReferenceKey = namedtuple('_ReferenceKey', 'database schema identifier') -def _lower(value: Optional[str]) -> Optional[str]: - """Postgres schemas can be None so we can't just call lower().""" - if value is None: - return None - return value.lower() - - def _make_key(relation) -> _ReferenceKey: """Make _ReferenceKeys with lowercase values for the cache so we don't have to keep track of quoting """ - return _ReferenceKey(_lower(relation.database), - _lower(relation.schema), - _lower(relation.identifier)) + # databases and schemas can both be None + return _ReferenceKey(lowercase(relation.database), + lowercase(relation.schema), + lowercase(relation.identifier)) def dot_separated(key: _ReferenceKey) -> str: @@ -53,15 +48,15 @@ def __str__(self) -> str: @property def database(self) -> Optional[str]: - return _lower(self.inner.database) + return lowercase(self.inner.database) @property def schema(self) -> Optional[str]: - return _lower(self.inner.schema) + return lowercase(self.inner.schema) @property def identifier(self) -> Optional[str]: - return _lower(self.inner.identifier) + return lowercase(self.inner.identifier) def __copy__(self): new = self.__class__(self.inner) @@ -190,7 +185,7 @@ def add_schema( :param database: The database name to add. :param schema: The schema name to add. """ - self.schemas.add((_lower(database), _lower(schema))) + self.schemas.add((lowercase(database), lowercase(schema))) def drop_schema( self, database: Optional[str], schema: Optional[str], @@ -199,7 +194,7 @@ def drop_schema( Then remove all its contents (and their dependents, etc) as well. """ - key = (_lower(database), _lower(schema)) + key = (lowercase(database), lowercase(schema)) if key not in self.schemas: return @@ -217,7 +212,7 @@ def update_schemas(self, schemas: Iterable[Tuple[Optional[str], str]]): :param schemas: An iterable of the schema names to add. """ - self.schemas.update((_lower(d), s.lower()) for (d, s) in schemas) + self.schemas.update((lowercase(d), s.lower()) for (d, s) in schemas) def __contains__(self, schema_id: Tuple[Optional[str], str]): """A schema is 'in' the relations cache if it is in the set of cached @@ -226,7 +221,7 @@ def __contains__(self, schema_id: Tuple[Optional[str], str]): :param schema_id: The db name and schema name to look up. """ db, schema = schema_id - return (_lower(db), schema.lower()) in self.schemas + return (lowercase(db), schema.lower()) in self.schemas def dump_graph(self): """Dump a key-only representation of the schema to a dictionary. Every @@ -484,13 +479,13 @@ def get_relations( :return List[BaseRelation]: The list of relations with the given schema """ - database = _lower(database) - schema = _lower(schema) + database = lowercase(database) + schema = lowercase(schema) with self.lock: results = [ r.inner for r in self.relations.values() - if (_lower(r.schema) == schema and - _lower(r.database) == database) + if (lowercase(r.schema) == schema and + lowercase(r.database) == database) ] if None in results: @@ -509,7 +504,7 @@ def _list_relations_in_schema( self, database: Optional[str], schema: Optional[str] ) -> List[_CachedRelation]: """Get the relations in a schema. Callers should hold the lock.""" - key = (_lower(database), _lower(schema)) + key = (lowercase(database), lowercase(schema)) to_remove: List[_CachedRelation] = [] for cachekey, relation in self.relations.items(): diff --git a/core/dbt/contracts/graph/parsed.py b/core/dbt/contracts/graph/parsed.py index 70f091ecc80..9c9955f12a2 100644 --- a/core/dbt/contracts/graph/parsed.py +++ b/core/dbt/contracts/graph/parsed.py @@ -83,7 +83,7 @@ def add_node(self, value: str): @dataclass class HasRelationMetadata(JsonSchemaMixin, Replaceable): - database: str + database: Optional[str] schema: str diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index c9f9b06c97b..e77154f73f4 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -10,6 +10,7 @@ JsonOnly, GLOBAL_LOGGER as logger, ) +from dbt.utils import lowercase from hologram.helpers import StrEnum from hologram import JsonSchemaMixin @@ -238,7 +239,7 @@ class FreshnessRunOutput(JsonSchemaMixin, Writable): CatalogKey = NamedTuple( 'CatalogKey', - [('database', str), ('schema', str), ('name', str)] + [('database', Optional[str]), ('schema', str), ('name', str)] ) @@ -268,7 +269,7 @@ class ColumnMetadata(JsonSchemaMixin): @dataclass class TableMetadata(JsonSchemaMixin): type: str - database: str + database: Optional[str] schema: str name: str comment: Optional[str] @@ -285,7 +286,7 @@ class CatalogTable(JsonSchemaMixin, Replaceable): def key(self) -> CatalogKey: return CatalogKey( - self.metadata.database.lower(), + lowercase(self.metadata.database), self.metadata.schema.lower(), self.metadata.name.lower(), ) diff --git a/core/dbt/include/global_project/macros/etc/get_custom_database.sql b/core/dbt/include/global_project/macros/etc/get_custom_database.sql index adbe152cb9b..bf12e366f5c 100644 --- a/core/dbt/include/global_project/macros/etc/get_custom_database.sql +++ b/core/dbt/include/global_project/macros/etc/get_custom_database.sql @@ -14,6 +14,10 @@ #} {% macro generate_database_name(custom_database_name=none, node=none) -%} + {% do return(adapter_macro('generate_database_name', custom_database_name, node)) %} +{%- endmacro %} + +{% macro default__generate_database_name(custom_database_name=none, node=none) -%} {%- set default_database = target.database -%} {%- if custom_database_name is none -%} diff --git a/core/dbt/node_runners.py b/core/dbt/node_runners.py index ffea17c0606..e1f84836bd3 100644 --- a/core/dbt/node_runners.py +++ b/core/dbt/node_runners.py @@ -27,6 +27,7 @@ import dbt.tracking import dbt.ui.printer import dbt.flags +import dbt.utils INTERNAL_ERROR_STRING = """This is an error in dbt. Please try again. If \ @@ -491,8 +492,7 @@ def _build_run_result(self, node, start_time, error, status, timing_info, skip=False, failed=None): execution_time = time.time() - start_time thread_id = threading.current_thread().name - if status is not None: - status = status.lower() + status = dbt.utils.lowercase(status) return PartialResult( node=node, status=status, diff --git a/core/dbt/parser/base.py b/core/dbt/parser/base.py index 748c85172bb..ff3fa0a0e9a 100644 --- a/core/dbt/parser/base.py +++ b/core/dbt/parser/base.py @@ -120,7 +120,9 @@ def __call__( self, parsed_node: Any, config_dict: Dict[str, Any] ) -> None: override = config_dict.get(self.component) - new_value = self.updater(override, parsed_node).strip() + new_value = self.updater(override, parsed_node) + if isinstance(new_value, str): + new_value = new_value.strip() setattr(parsed_node, self.component, new_value) diff --git a/core/dbt/task/generate.py b/core/dbt/task/generate.py index d3167d93429..e78fe84649a 100644 --- a/core/dbt/task/generate.py +++ b/core/dbt/task/generate.py @@ -62,9 +62,15 @@ def __init__(self, columns: List[PrimitiveDict]): self.add_column(col) def get_table(self, data: PrimitiveDict) -> CatalogTable: + database = data.get('table_database') + if database is None: + dkey: Optional[str] = None + else: + dkey = str(database) + try: key = CatalogKey( - str(data['table_database']), + dkey, str(data['table_schema']), str(data['table_name']), ) @@ -164,8 +170,9 @@ def format_stats(stats: PrimitiveDict) -> StatsDict: def mapping_key(node: CompileResultNode) -> CatalogKey: + dkey = dbt.utils.lowercase(node.database) return CatalogKey( - node.database.lower(), node.schema.lower(), node.identifier.lower() + dkey, node.schema.lower(), node.identifier.lower() ) diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index d75040f644c..713d0d55cd9 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -45,12 +45,6 @@ RUNNING_STATE = DbtProcessState('running') -def _lower(value: Optional[str]) -> Optional[str]: - if value is None: - return value - return value.lower() - - def write_manifest(config, manifest): if dbt.flags.WRITE_JSON: manifest.write(os.path.join(config.target_path, MANIFEST_FILE_NAME)) @@ -453,28 +447,32 @@ def create_schemas(self, adapter, selected_uids: Iterable[str]): ) required_databases.add(db_only) - existing_schemas_lowered: Set[Tuple[str, Optional[str]]] = set() + existing_schemas_lowered: Set[Tuple[Optional[str], Optional[str]]] + existing_schemas_lowered = set() - def list_schemas(db_only: BaseRelation) -> List[Tuple[str, str]]: - # the database name should never be None here (or where are we - # listing schemas from?) + def list_schemas( + db_only: BaseRelation + ) -> List[Tuple[Optional[str], str]]: + # the database can be None on some warehouses that don't support it + database_quoted: Optional[str] + db_lowercase = dbt.utils.lowercase(db_only.database) if db_only.database is None: - raise InternalException( - f'Got an invalid database-only portion of {db_only} ' - f'(database was None)' - ) - database_name: str = db_only.database - database_quoted = str(db_only) - with adapter.connection_named(f'list_{database_name}'): + database_quoted = None + conn_name = 'list_schemas' + else: + database_quoted = str(db_only) + conn_name = f'list_{db_only.database}' + + with adapter.connection_named(conn_name): # we should never create a null schema, so just filter them out return [ - (database_name.lower(), s.lower()) + (db_lowercase, s.lower()) for s in adapter.list_schemas(database_quoted) if s is not None ] def create_schema(relation: BaseRelation) -> None: - db = relation.database + db = relation.database or '' schema = relation.schema with adapter.connection_named(f'create_{db}_{schema}'): adapter.create_schema(relation) @@ -491,18 +489,15 @@ def create_schema(relation: BaseRelation) -> None: existing_schemas_lowered.update(ls_future.result()) for info in required_schemas: - if info.database is None: - raise InternalException( - 'Got an information schema with no database!' - ) if info.schema is None: # we are not in the business of creating null schemas, so # skip this continue - db: str = info.database + db: Optional[str] = info.database + db_lower: Optional[str] = dbt.utils.lowercase(db) schema: str = info.schema - db_schema = (db.lower(), schema.lower()) + db_schema = (db_lower, schema.lower()) if db_schema not in existing_schemas_lowered: existing_schemas_lowered.add(db_schema) create_futures.append( diff --git a/core/dbt/utils.py b/core/dbt/utils.py index e9cf218dfe0..989b93308d5 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -490,6 +490,13 @@ def coerce_dict_str(value: Any) -> Optional[Dict[str, Any]]: return None +def lowercase(value: Optional[str]) -> Optional[str]: + if value is None: + return None + else: + return value.lower() + + # some types need to make constants available to the jinja context as # attributes, and regular properties only work with objects. maybe this should # be handled by the RelationProxy?