Skip to content

Commit

Permalink
Make databases optional
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed May 26, 2020
1 parent 6509067 commit c21af17
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 74 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
- Substantial performance improvements for parsing on large projects, especially projects with many docs definition. ([#2480](https://github.com/fishtown-analytics/dbt/issues/2480), [#2481](https://github.com/fishtown-analytics/dbt/pull/2481))
- Expose Snowflake query id in case of an exception raised by connector ([#2201](https://github.com/fishtown-analytics/dbt/issues/2201), [#2358](https://github.com/fishtown-analytics/dbt/pull/2358))

### Under the hood
- Better support for optional database fields in adapters ([#2487](https://github.com/fishtown-analytics/dbt/issues/2487) [#2489](https://github.com/fishtown-analytics/dbt/pull/2489))

Contributors:
- [@dmateusp](https://github.com/dmateusp) ([#2475](https://github.com/fishtown-analytics/dbt/pull/2475))
- [@ChristianKohlberg](https://github.com/ChristianKohlberg) (#2358](https://github.com/fishtown-analytics/dbt/pull/2358))
Expand Down
12 changes: 5 additions & 7 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def load_internal_manifest(self) -> Manifest:
###
# Caching methods
###
def _schema_is_cached(self, database: str, schema: str) -> bool:
def _schema_is_cached(self, database: Optional[str], schema: str) -> bool:
"""Check if the schema is cached, and by default logs if it is not."""

if dbt.flags.USE_CACHE is False:
Expand Down Expand Up @@ -341,12 +341,8 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
# it's possible that there were no relations in some schemas. We want
# to insert the schemas we query into the cache's `.schemas` attribute
# so we can check it later
cache_update: Set[Tuple[str, Optional[str]]] = set()
cache_update: Set[Tuple[Optional[str], Optional[str]]] = set()
for relation in cache_schemas:
if relation.database is None:
raise InternalException(
'Got a None database in a cached schema!'
)
cache_update.add((relation.database, relation.schema))
self.cache.update_schemas(cache_update)

Expand Down Expand Up @@ -646,7 +642,9 @@ def expand_target_column_types(

self.expand_column_types(from_relation, to_relation)

def list_relations(self, database: str, schema: str) -> List[BaseRelation]:
def list_relations(
self, database: Optional[str], schema: str
) -> List[BaseRelation]:
if self._schema_is_cached(database, schema):
return self.cache.get_relations(database, schema)

Expand Down
17 changes: 6 additions & 11 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from collections.abc import Mapping, Hashable
from dataclasses import dataclass, fields
from typing import (
Optional, TypeVar, Generic, Any, Type, Dict, Union, List, Iterator, Tuple,
Optional, TypeVar, Generic, Any, Type, Dict, Union, Iterator, Tuple,
Set
)
from typing_extensions import Protocol
Expand Down Expand Up @@ -278,16 +278,11 @@ def _render_iterator(
yield key, path_part

def render(self) -> str:
parts: List[str] = [
part for _, part in self._render_iterator() if part is not None
]

if len(parts) == 0:
raise dbt.exceptions.RuntimeException(
"No path parts are included! Nothing to render."
)

return '.'.join(parts)
# if there is nothing set, this will return the empty string.
return '.'.join(
part for _, part in self._render_iterator()
if part is not None
)

def quoted(self, identifier):
return '{quote_char}{identifier}{quote_char}'.format(
Expand Down
39 changes: 17 additions & 22 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@
import threading

from dbt.logger import CACHE_LOGGER as logger
from dbt.utils import lowercase
import dbt.exceptions

_ReferenceKey = namedtuple('_ReferenceKey', 'database schema identifier')


def _lower(value: Optional[str]) -> Optional[str]:
"""Postgres schemas can be None so we can't just call lower()."""
if value is None:
return None
return value.lower()


def _make_key(relation) -> _ReferenceKey:
"""Make _ReferenceKeys with lowercase values for the cache so we don't have
to keep track of quoting
"""
return _ReferenceKey(_lower(relation.database),
_lower(relation.schema),
_lower(relation.identifier))
# databases and schemas can both be None
return _ReferenceKey(lowercase(relation.database),
lowercase(relation.schema),
lowercase(relation.identifier))


def dot_separated(key: _ReferenceKey) -> str:
Expand Down Expand Up @@ -53,15 +48,15 @@ def __str__(self) -> str:

@property
def database(self) -> Optional[str]:
return _lower(self.inner.database)
return lowercase(self.inner.database)

@property
def schema(self) -> Optional[str]:
return _lower(self.inner.schema)
return lowercase(self.inner.schema)

@property
def identifier(self) -> Optional[str]:
return _lower(self.inner.identifier)
return lowercase(self.inner.identifier)

def __copy__(self):
new = self.__class__(self.inner)
Expand Down Expand Up @@ -190,7 +185,7 @@ def add_schema(
:param database: The database name to add.
:param schema: The schema name to add.
"""
self.schemas.add((_lower(database), _lower(schema)))
self.schemas.add((lowercase(database), lowercase(schema)))

def drop_schema(
self, database: Optional[str], schema: Optional[str],
Expand All @@ -199,7 +194,7 @@ def drop_schema(
Then remove all its contents (and their dependents, etc) as well.
"""
key = (_lower(database), _lower(schema))
key = (lowercase(database), lowercase(schema))
if key not in self.schemas:
return

Expand All @@ -217,7 +212,7 @@ def update_schemas(self, schemas: Iterable[Tuple[Optional[str], str]]):
:param schemas: An iterable of the schema names to add.
"""
self.schemas.update((_lower(d), s.lower()) for (d, s) in schemas)
self.schemas.update((lowercase(d), s.lower()) for (d, s) in schemas)

def __contains__(self, schema_id: Tuple[Optional[str], str]):
"""A schema is 'in' the relations cache if it is in the set of cached
Expand All @@ -226,7 +221,7 @@ def __contains__(self, schema_id: Tuple[Optional[str], str]):
:param schema_id: The db name and schema name to look up.
"""
db, schema = schema_id
return (_lower(db), schema.lower()) in self.schemas
return (lowercase(db), schema.lower()) in self.schemas

def dump_graph(self):
"""Dump a key-only representation of the schema to a dictionary. Every
Expand Down Expand Up @@ -484,13 +479,13 @@ def get_relations(
:return List[BaseRelation]: The list of relations with the given
schema
"""
database = _lower(database)
schema = _lower(schema)
database = lowercase(database)
schema = lowercase(schema)
with self.lock:
results = [
r.inner for r in self.relations.values()
if (_lower(r.schema) == schema and
_lower(r.database) == database)
if (lowercase(r.schema) == schema and
lowercase(r.database) == database)
]

if None in results:
Expand All @@ -509,7 +504,7 @@ def _list_relations_in_schema(
self, database: Optional[str], schema: Optional[str]
) -> List[_CachedRelation]:
"""Get the relations in a schema. Callers should hold the lock."""
key = (_lower(database), _lower(schema))
key = (lowercase(database), lowercase(schema))

to_remove: List[_CachedRelation] = []
for cachekey, relation in self.relations.items():
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def add_node(self, value: str):

@dataclass
class HasRelationMetadata(JsonSchemaMixin, Replaceable):
database: str
database: Optional[str]
schema: str


Expand Down
7 changes: 4 additions & 3 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
JsonOnly,
GLOBAL_LOGGER as logger,
)
from dbt.utils import lowercase
from hologram.helpers import StrEnum
from hologram import JsonSchemaMixin

Expand Down Expand Up @@ -238,7 +239,7 @@ class FreshnessRunOutput(JsonSchemaMixin, Writable):

CatalogKey = NamedTuple(
'CatalogKey',
[('database', str), ('schema', str), ('name', str)]
[('database', Optional[str]), ('schema', str), ('name', str)]
)


Expand Down Expand Up @@ -268,7 +269,7 @@ class ColumnMetadata(JsonSchemaMixin):
@dataclass
class TableMetadata(JsonSchemaMixin):
type: str
database: str
database: Optional[str]
schema: str
name: str
comment: Optional[str]
Expand All @@ -285,7 +286,7 @@ class CatalogTable(JsonSchemaMixin, Replaceable):

def key(self) -> CatalogKey:
return CatalogKey(
self.metadata.database.lower(),
lowercase(self.metadata.database),
self.metadata.schema.lower(),
self.metadata.name.lower(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

#}
{% macro generate_database_name(custom_database_name=none, node=none) -%}
{% do return(adapter_macro('generate_database_name', custom_database_name, node)) %}
{%- endmacro %}

{% macro default__generate_database_name(custom_database_name=none, node=none) -%}
{%- set default_database = target.database -%}
{%- if custom_database_name is none -%}

Expand Down
4 changes: 2 additions & 2 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import dbt.tracking
import dbt.ui.printer
import dbt.flags
import dbt.utils


INTERNAL_ERROR_STRING = """This is an error in dbt. Please try again. If \
Expand Down Expand Up @@ -491,8 +492,7 @@ def _build_run_result(self, node, start_time, error, status, timing_info,
skip=False, failed=None):
execution_time = time.time() - start_time
thread_id = threading.current_thread().name
if status is not None:
status = status.lower()
status = dbt.utils.lowercase(status)
return PartialResult(
node=node,
status=status,
Expand Down
4 changes: 3 additions & 1 deletion core/dbt/parser/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ def __call__(
self, parsed_node: Any, config_dict: Dict[str, Any]
) -> None:
override = config_dict.get(self.component)
new_value = self.updater(override, parsed_node).strip()
new_value = self.updater(override, parsed_node)
if isinstance(new_value, str):
new_value = new_value.strip()
setattr(parsed_node, self.component, new_value)


Expand Down
11 changes: 9 additions & 2 deletions core/dbt/task/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,15 @@ def __init__(self, columns: List[PrimitiveDict]):
self.add_column(col)

def get_table(self, data: PrimitiveDict) -> CatalogTable:
database = data.get('table_database')
if database is None:
dkey: Optional[str] = None
else:
dkey = str(database)

try:
key = CatalogKey(
str(data['table_database']),
dkey,
str(data['table_schema']),
str(data['table_name']),
)
Expand Down Expand Up @@ -164,8 +170,9 @@ def format_stats(stats: PrimitiveDict) -> StatsDict:


def mapping_key(node: CompileResultNode) -> CatalogKey:
dkey = dbt.utils.lowercase(node.database)
return CatalogKey(
node.database.lower(), node.schema.lower(), node.identifier.lower()
dkey, node.schema.lower(), node.identifier.lower()
)


Expand Down
45 changes: 20 additions & 25 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@
RUNNING_STATE = DbtProcessState('running')


def _lower(value: Optional[str]) -> Optional[str]:
if value is None:
return value
return value.lower()


def write_manifest(config, manifest):
if dbt.flags.WRITE_JSON:
manifest.write(os.path.join(config.target_path, MANIFEST_FILE_NAME))
Expand Down Expand Up @@ -453,28 +447,32 @@ def create_schemas(self, adapter, selected_uids: Iterable[str]):
)
required_databases.add(db_only)

existing_schemas_lowered: Set[Tuple[str, Optional[str]]] = set()
existing_schemas_lowered: Set[Tuple[Optional[str], Optional[str]]]
existing_schemas_lowered = set()

def list_schemas(db_only: BaseRelation) -> List[Tuple[str, str]]:
# the database name should never be None here (or where are we
# listing schemas from?)
def list_schemas(
db_only: BaseRelation
) -> List[Tuple[Optional[str], str]]:
# the database can be None on some warehouses that don't support it
database_quoted: Optional[str]
db_lowercase = dbt.utils.lowercase(db_only.database)
if db_only.database is None:
raise InternalException(
f'Got an invalid database-only portion of {db_only} '
f'(database was None)'
)
database_name: str = db_only.database
database_quoted = str(db_only)
with adapter.connection_named(f'list_{database_name}'):
database_quoted = None
conn_name = 'list_schemas'
else:
database_quoted = str(db_only)
conn_name = f'list_{db_only.database}'

with adapter.connection_named(conn_name):
# we should never create a null schema, so just filter them out
return [
(database_name.lower(), s.lower())
(db_lowercase, s.lower())
for s in adapter.list_schemas(database_quoted)
if s is not None
]

def create_schema(relation: BaseRelation) -> None:
db = relation.database
db = relation.database or ''
schema = relation.schema
with adapter.connection_named(f'create_{db}_{schema}'):
adapter.create_schema(relation)
Expand All @@ -491,18 +489,15 @@ def create_schema(relation: BaseRelation) -> None:
existing_schemas_lowered.update(ls_future.result())

for info in required_schemas:
if info.database is None:
raise InternalException(
'Got an information schema with no database!'
)
if info.schema is None:
# we are not in the business of creating null schemas, so
# skip this
continue
db: str = info.database
db: Optional[str] = info.database
db_lower: Optional[str] = dbt.utils.lowercase(db)
schema: str = info.schema

db_schema = (db.lower(), schema.lower())
db_schema = (db_lower, schema.lower())
if db_schema not in existing_schemas_lowered:
existing_schemas_lowered.add(db_schema)
create_futures.append(
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,13 @@ def coerce_dict_str(value: Any) -> Optional[Dict[str, Any]]:
return None


def lowercase(value: Optional[str]) -> Optional[str]:
if value is None:
return None
else:
return value.lower()


# some types need to make constants available to the jinja context as
# attributes, and regular properties only work with objects. maybe this should
# be handled by the RelationProxy?
Expand Down

0 comments on commit c21af17

Please sign in to comment.