Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make databases optional #2489

Merged
merged 2 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
- Substantial performance improvements for parsing on large projects, especially projects with many docs definition. ([#2480](https://github.com/fishtown-analytics/dbt/issues/2480), [#2481](https://github.com/fishtown-analytics/dbt/pull/2481))
- Expose Snowflake query id in case of an exception raised by connector ([#2201](https://github.com/fishtown-analytics/dbt/issues/2201), [#2358](https://github.com/fishtown-analytics/dbt/pull/2358))

### Under the hood
- Better support for optional database fields in adapters ([#2487](https://github.com/fishtown-analytics/dbt/issues/2487) [#2489](https://github.com/fishtown-analytics/dbt/pull/2489))

Contributors:
- [@dmateusp](https://github.com/dmateusp) ([#2475](https://github.com/fishtown-analytics/dbt/pull/2475))
- [@ChristianKohlberg](https://github.com/ChristianKohlberg) (#2358](https://github.com/fishtown-analytics/dbt/pull/2358))
Expand Down
12 changes: 5 additions & 7 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def load_internal_manifest(self) -> Manifest:
###
# Caching methods
###
def _schema_is_cached(self, database: str, schema: str) -> bool:
def _schema_is_cached(self, database: Optional[str], schema: str) -> bool:
"""Check if the schema is cached, and by default logs if it is not."""

if dbt.flags.USE_CACHE is False:
Expand Down Expand Up @@ -341,12 +341,8 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
# it's possible that there were no relations in some schemas. We want
# to insert the schemas we query into the cache's `.schemas` attribute
# so we can check it later
cache_update: Set[Tuple[str, Optional[str]]] = set()
cache_update: Set[Tuple[Optional[str], Optional[str]]] = set()
for relation in cache_schemas:
if relation.database is None:
raise InternalException(
'Got a None database in a cached schema!'
)
cache_update.add((relation.database, relation.schema))
self.cache.update_schemas(cache_update)

Expand Down Expand Up @@ -646,7 +642,9 @@ def expand_target_column_types(

self.expand_column_types(from_relation, to_relation)

def list_relations(self, database: str, schema: str) -> List[BaseRelation]:
def list_relations(
self, database: Optional[str], schema: str
) -> List[BaseRelation]:
if self._schema_is_cached(database, schema):
return self.cache.get_relations(database, schema)

Expand Down
17 changes: 6 additions & 11 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from collections.abc import Mapping, Hashable
from dataclasses import dataclass, fields
from typing import (
Optional, TypeVar, Generic, Any, Type, Dict, Union, List, Iterator, Tuple,
Optional, TypeVar, Generic, Any, Type, Dict, Union, Iterator, Tuple,
Set
)
from typing_extensions import Protocol
Expand Down Expand Up @@ -278,16 +278,11 @@ def _render_iterator(
yield key, path_part

def render(self) -> str:
parts: List[str] = [
part for _, part in self._render_iterator() if part is not None
]

if len(parts) == 0:
raise dbt.exceptions.RuntimeException(
"No path parts are included! Nothing to render."
)

return '.'.join(parts)
# if there is nothing set, this will return the empty string.
return '.'.join(
part for _, part in self._render_iterator()
if part is not None
)

def quoted(self, identifier):
return '{quote_char}{identifier}{quote_char}'.format(
Expand Down
39 changes: 17 additions & 22 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@
import threading

from dbt.logger import CACHE_LOGGER as logger
from dbt.utils import lowercase
import dbt.exceptions

_ReferenceKey = namedtuple('_ReferenceKey', 'database schema identifier')


def _lower(value: Optional[str]) -> Optional[str]:
"""Postgres schemas can be None so we can't just call lower()."""
if value is None:
return None
return value.lower()


def _make_key(relation) -> _ReferenceKey:
"""Make _ReferenceKeys with lowercase values for the cache so we don't have
to keep track of quoting
"""
return _ReferenceKey(_lower(relation.database),
_lower(relation.schema),
_lower(relation.identifier))
# databases and schemas can both be None
return _ReferenceKey(lowercase(relation.database),
lowercase(relation.schema),
lowercase(relation.identifier))


def dot_separated(key: _ReferenceKey) -> str:
Expand Down Expand Up @@ -53,15 +48,15 @@ def __str__(self) -> str:

@property
def database(self) -> Optional[str]:
return _lower(self.inner.database)
return lowercase(self.inner.database)

@property
def schema(self) -> Optional[str]:
return _lower(self.inner.schema)
return lowercase(self.inner.schema)

@property
def identifier(self) -> Optional[str]:
return _lower(self.inner.identifier)
return lowercase(self.inner.identifier)

def __copy__(self):
new = self.__class__(self.inner)
Expand Down Expand Up @@ -190,7 +185,7 @@ def add_schema(
:param database: The database name to add.
:param schema: The schema name to add.
"""
self.schemas.add((_lower(database), _lower(schema)))
self.schemas.add((lowercase(database), lowercase(schema)))

def drop_schema(
self, database: Optional[str], schema: Optional[str],
Expand All @@ -199,7 +194,7 @@ def drop_schema(

Then remove all its contents (and their dependents, etc) as well.
"""
key = (_lower(database), _lower(schema))
key = (lowercase(database), lowercase(schema))
if key not in self.schemas:
return

Expand All @@ -217,7 +212,7 @@ def update_schemas(self, schemas: Iterable[Tuple[Optional[str], str]]):

:param schemas: An iterable of the schema names to add.
"""
self.schemas.update((_lower(d), s.lower()) for (d, s) in schemas)
self.schemas.update((lowercase(d), s.lower()) for (d, s) in schemas)

def __contains__(self, schema_id: Tuple[Optional[str], str]):
"""A schema is 'in' the relations cache if it is in the set of cached
Expand All @@ -226,7 +221,7 @@ def __contains__(self, schema_id: Tuple[Optional[str], str]):
:param schema_id: The db name and schema name to look up.
"""
db, schema = schema_id
return (_lower(db), schema.lower()) in self.schemas
return (lowercase(db), schema.lower()) in self.schemas

def dump_graph(self):
"""Dump a key-only representation of the schema to a dictionary. Every
Expand Down Expand Up @@ -484,13 +479,13 @@ def get_relations(
:return List[BaseRelation]: The list of relations with the given
schema
"""
database = _lower(database)
schema = _lower(schema)
database = lowercase(database)
schema = lowercase(schema)
with self.lock:
results = [
r.inner for r in self.relations.values()
if (_lower(r.schema) == schema and
_lower(r.database) == database)
if (lowercase(r.schema) == schema and
lowercase(r.database) == database)
]

if None in results:
Expand All @@ -509,7 +504,7 @@ def _list_relations_in_schema(
self, database: Optional[str], schema: Optional[str]
) -> List[_CachedRelation]:
"""Get the relations in a schema. Callers should hold the lock."""
key = (_lower(database), _lower(schema))
key = (lowercase(database), lowercase(schema))

to_remove: List[_CachedRelation] = []
for cachekey, relation in self.relations.items():
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def add_node(self, value: str):

@dataclass
class HasRelationMetadata(JsonSchemaMixin, Replaceable):
database: str
database: Optional[str]
schema: str


Expand Down
7 changes: 4 additions & 3 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
JsonOnly,
GLOBAL_LOGGER as logger,
)
from dbt.utils import lowercase
from hologram.helpers import StrEnum
from hologram import JsonSchemaMixin

Expand Down Expand Up @@ -238,7 +239,7 @@ class FreshnessRunOutput(JsonSchemaMixin, Writable):

CatalogKey = NamedTuple(
'CatalogKey',
[('database', str), ('schema', str), ('name', str)]
[('database', Optional[str]), ('schema', str), ('name', str)]
)


Expand Down Expand Up @@ -268,7 +269,7 @@ class ColumnMetadata(JsonSchemaMixin):
@dataclass
class TableMetadata(JsonSchemaMixin):
type: str
database: str
database: Optional[str]
schema: str
name: str
comment: Optional[str]
Expand All @@ -285,7 +286,7 @@ class CatalogTable(JsonSchemaMixin, Replaceable):

def key(self) -> CatalogKey:
return CatalogKey(
self.metadata.database.lower(),
lowercase(self.metadata.database),
self.metadata.schema.lower(),
self.metadata.name.lower(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

#}
{% macro generate_database_name(custom_database_name=none, node=none) -%}
{% do return(adapter_macro('generate_database_name', custom_database_name, node)) %}
{%- endmacro %}

{% macro default__generate_database_name(custom_database_name=none, node=none) -%}
{%- set default_database = target.database -%}
{%- if custom_database_name is none -%}

Expand Down
4 changes: 2 additions & 2 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import dbt.tracking
import dbt.ui.printer
import dbt.flags
import dbt.utils


INTERNAL_ERROR_STRING = """This is an error in dbt. Please try again. If \
Expand Down Expand Up @@ -491,8 +492,7 @@ def _build_run_result(self, node, start_time, error, status, timing_info,
skip=False, failed=None):
execution_time = time.time() - start_time
thread_id = threading.current_thread().name
if status is not None:
status = status.lower()
status = dbt.utils.lowercase(status)
return PartialResult(
node=node,
status=status,
Expand Down
4 changes: 3 additions & 1 deletion core/dbt/parser/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ def __call__(
self, parsed_node: Any, config_dict: Dict[str, Any]
) -> None:
override = config_dict.get(self.component)
new_value = self.updater(override, parsed_node).strip()
new_value = self.updater(override, parsed_node)
if isinstance(new_value, str):
new_value = new_value.strip()
setattr(parsed_node, self.component, new_value)


Expand Down
11 changes: 9 additions & 2 deletions core/dbt/task/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,15 @@ def __init__(self, columns: List[PrimitiveDict]):
self.add_column(col)

def get_table(self, data: PrimitiveDict) -> CatalogTable:
database = data.get('table_database')
if database is None:
dkey: Optional[str] = None
else:
dkey = str(database)

try:
key = CatalogKey(
str(data['table_database']),
dkey,
str(data['table_schema']),
str(data['table_name']),
)
Expand Down Expand Up @@ -164,8 +170,9 @@ def format_stats(stats: PrimitiveDict) -> StatsDict:


def mapping_key(node: CompileResultNode) -> CatalogKey:
dkey = dbt.utils.lowercase(node.database)
return CatalogKey(
node.database.lower(), node.schema.lower(), node.identifier.lower()
dkey, node.schema.lower(), node.identifier.lower()
)


Expand Down
45 changes: 20 additions & 25 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@
RUNNING_STATE = DbtProcessState('running')


def _lower(value: Optional[str]) -> Optional[str]:
if value is None:
return value
return value.lower()


def write_manifest(config, manifest):
if dbt.flags.WRITE_JSON:
manifest.write(os.path.join(config.target_path, MANIFEST_FILE_NAME))
Expand Down Expand Up @@ -453,28 +447,32 @@ def create_schemas(self, adapter, selected_uids: Iterable[str]):
)
required_databases.add(db_only)

existing_schemas_lowered: Set[Tuple[str, Optional[str]]] = set()
existing_schemas_lowered: Set[Tuple[Optional[str], Optional[str]]]
existing_schemas_lowered = set()

def list_schemas(db_only: BaseRelation) -> List[Tuple[str, str]]:
# the database name should never be None here (or where are we
# listing schemas from?)
def list_schemas(
db_only: BaseRelation
) -> List[Tuple[Optional[str], str]]:
# the database can be None on some warehouses that don't support it
database_quoted: Optional[str]
db_lowercase = dbt.utils.lowercase(db_only.database)
if db_only.database is None:
raise InternalException(
f'Got an invalid database-only portion of {db_only} '
f'(database was None)'
)
database_name: str = db_only.database
database_quoted = str(db_only)
with adapter.connection_named(f'list_{database_name}'):
database_quoted = None
conn_name = 'list_schemas'
else:
database_quoted = str(db_only)
conn_name = f'list_{db_only.database}'

with adapter.connection_named(conn_name):
# we should never create a null schema, so just filter them out
return [
(database_name.lower(), s.lower())
(db_lowercase, s.lower())
for s in adapter.list_schemas(database_quoted)
if s is not None
]

def create_schema(relation: BaseRelation) -> None:
db = relation.database
db = relation.database or ''
schema = relation.schema
with adapter.connection_named(f'create_{db}_{schema}'):
adapter.create_schema(relation)
Expand All @@ -491,18 +489,15 @@ def create_schema(relation: BaseRelation) -> None:
existing_schemas_lowered.update(ls_future.result())

for info in required_schemas:
if info.database is None:
raise InternalException(
'Got an information schema with no database!'
)
if info.schema is None:
# we are not in the business of creating null schemas, so
# skip this
continue
db: str = info.database
db: Optional[str] = info.database
db_lower: Optional[str] = dbt.utils.lowercase(db)
schema: str = info.schema

db_schema = (db.lower(), schema.lower())
db_schema = (db_lower, schema.lower())
if db_schema not in existing_schemas_lowered:
existing_schemas_lowered.add(db_schema)
create_futures.append(
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,13 @@ def coerce_dict_str(value: Any) -> Optional[Dict[str, Any]]:
return None


def lowercase(value: Optional[str]) -> Optional[str]:
if value is None:
return None
else:
return value.lower()


# some types need to make constants available to the jinja context as
# attributes, and regular properties only work with objects. maybe this should
# be handled by the RelationProxy?
Expand Down