Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the cache in dbt's rendering contexts (#1683) #1770

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## dbt 0.15.0 (TBD)

### Breaking changes
- Cache management changes:
- Materialization macros should now return a dictionary {"relations": [...]}, with the list containing all relations that have been added, in order to add them to the cache. The default behavior is to still add the materialization's model to the cache.
- Materializations that perform drops via direct "drop" statements must call `adapter.cache_dropped`
- `adapter.drop_relation` already does this
- Materializations that perform renames via direct "alter table" statements must call `adapter.cache_renamed`
- `adapter.rename_relation` already does this

## dbt 0.14.2 (September 13, 2019)

### Overview
Expand Down Expand Up @@ -51,8 +61,8 @@ This is primarily a bugfix release which contains a few minor improvements too.
- Fix for non-atomic column expansion logic in Snowflake incremental models and snapshots ([#1687](https://github.com/fishtown-analytics/dbt/issues/1687), [#1690](https://github.com/fishtown-analytics/dbt/pull/1690))
- Fix for unprojected `count(*)` expression injected by custom data tests ([#1688](https://github.com/fishtown-analytics/dbt/pull/1688))
- Fix for `dbt run` and `dbt docs generate` commands when running against Panoply Redshift ([#1479](https://github.com/fishtown-analytics/dbt/issues/1479), [#1686](https://github.com/fishtown-analytics/dbt/pull/1686))


### Contributors:
Thanks for your contributions to dbt!

Expand Down
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
You must also set the 'TYPE' class attribute with a class-unique constant
string.
"""
TYPE = NotImplemented
TYPE: str = NotImplemented

def __init__(self, profile):
self.profile = profile
Expand Down
118 changes: 82 additions & 36 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
import abc
from contextlib import contextmanager
from datetime import datetime
from typing import (
Optional, Tuple, Callable, Container, FrozenSet, Type
)

import agate
import pytz

import dbt.exceptions
import dbt.flags
import dbt.clients.agate_helper

from dbt.clients.agate_helper import empty_table
from dbt.config import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.node_types import NodeType
from dbt.loader import GraphLoader
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import filter_null_values


from dbt.adapters.base.connections import BaseConnectionManager
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base import BaseRelation
from dbt.adapters.base import Column
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.cache import RelationsCache


GET_CATALOG_MACRO_NAME = 'get_catalog'
FRESHNESS_MACRO_NAME = 'collect_freshness'


def _expect_row_value(key, row):
def _expect_row_value(key: str, row: agate.Row):
if key not in row.keys():
raise dbt.exceptions.InternalException(
'Got a row without "{}" column, columns: {}'
Expand All @@ -34,7 +39,9 @@ def _expect_row_value(key, row):
return row[key]


def _relations_filter_schemas(schemas):
def _relations_filter_schemas(
schemas: Container[str]
) -> Callable[[agate.Row], bool]:
def test(row):
referenced_schema = _expect_row_value('referenced_schema', row)
dependent_schema = _expect_row_value('dependent_schema', row)
Expand All @@ -47,7 +54,7 @@ def test(row):
return test


def _catalog_filter_schemas(manifest):
def _catalog_filter_schemas(manifest: Manifest) -> Callable[[agate.Row], bool]:
"""Return a function that takes a row and decides if the row should be
included in the catalog output.
"""
Expand All @@ -65,7 +72,7 @@ def test(row):
return test


def _utc(dt, source, field_name):
def _utc(dt: datetime, source: str, field_name: str) -> datetime:
"""If dt has a timezone, return a new datetime that's in UTC. Otherwise,
assume the datetime is already for UTC and add the timezone.
"""
Expand All @@ -90,6 +97,13 @@ def _utc(dt, source, field_name):
return dt.replace(tzinfo=pytz.UTC)


def _relation_name(rel: Optional[BaseRelation]) -> str:
if rel is None:
return 'null relation'
else:
return str(rel)


class SchemaSearchMap(dict):
"""A utility class to keep track of what information_schema tables to
search for what schemas
Expand Down Expand Up @@ -171,22 +185,19 @@ class BaseAdapter(metaclass=AdapterMeta):
Macros:
- get_catalog
"""
requires = {}

Relation = BaseRelation
Column = Column
# This should be an implementation of BaseConnectionManager
ConnectionManager = None
Relation: Type[BaseRelation] = BaseRelation
Column: Type[BaseColumn] = BaseColumn
ConnectionManager: Type[BaseConnectionManager]

# A set of clobber config fields accepted by this adapter
# for use in materializations
AdapterSpecificConfigs = frozenset()
AdapterSpecificConfigs: FrozenSet[str] = frozenset()

def __init__(self, config):
self.config = config
def __init__(self, config: RuntimeConfig):
self.config: RuntimeConfig = config
self.cache = RelationsCache()
self.connections = self.ConnectionManager(config)
self._internal_manifest_lazy = None
self._internal_manifest_lazy: Optional[Manifest] = None

###
# Methods that pass through to the connection manager
Expand Down Expand Up @@ -219,8 +230,10 @@ def connection_named(self, name):
finally:
self.release_connection()

@available.parse(lambda *a, **k: ('', dbt.clients.agate_helper()))
def execute(self, sql, auto_begin=False, fetch=False):
@available.parse(lambda *a, **k: ('', empty_table()))
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[str, agate.Table]:
"""Execute the given SQL. This is a thin wrapper around
ConnectionManager.execute.

Expand All @@ -241,7 +254,7 @@ def execute(self, sql, auto_begin=False, fetch=False):
# Methods that should never be overridden
###
@classmethod
def type(cls):
def type(cls) -> str:
"""Get the type of this adapter. Types must be class-unique and
consistent.

Expand All @@ -251,12 +264,12 @@ def type(cls):
return cls.ConnectionManager.TYPE

@property
def _internal_manifest(self):
def _internal_manifest(self) -> Manifest:
if self._internal_manifest_lazy is None:
self.load_internal_manifest()
return self.load_internal_manifest()
return self._internal_manifest_lazy

def check_internal_manifest(self):
def check_internal_manifest(self) -> Optional[Manifest]:
"""Return the internal manifest (used for executing macros) if it's
been initialized, otherwise return None.
"""
Expand All @@ -271,7 +284,7 @@ def load_internal_manifest(self) -> Manifest:
###
# Caching methods
###
def _schema_is_cached(self, database, schema):
def _schema_is_cached(self, database: str, schema: str):
"""Check if the schema is cached, and by default logs if it is not."""

if dbt.flags.USE_CACHE is False:
Expand All @@ -285,15 +298,9 @@ def _schema_is_cached(self, database, schema):
else:
return True

@classmethod
def _relations_filter_table(cls, table, schemas):
"""Filter the table as appropriate for relations table entries.
Subclasses can override this to change filtering rules on a per-adapter
basis.
"""
return table.where(_relations_filter_schemas(schemas))

def _get_cache_schemas(self, manifest, exec_only=False):
def _get_cache_schemas(
self, manifest: Manifest, exec_only: bool = False
) -> SchemaSearchMap:
"""Get a mapping of each node's "information_schema" relations to a
set of all schemas expected in that information_schema.

Expand All @@ -314,7 +321,7 @@ def _get_cache_schemas(self, manifest, exec_only=False):
# schemas
return info_schema_name_map

def _relations_cache_for_schemas(self, manifest):
def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
"""Populate the relations cache for the given schemas. Returns an
iteratble of the schemas populated, as strings.
"""
Expand All @@ -332,7 +339,9 @@ def _relations_cache_for_schemas(self, manifest):
# so we can check it later
self.cache.update_schemas(info_schema_name_map.schemas_searched())

def set_relations_cache(self, manifest, clear=False):
def set_relations_cache(
self, manifest: Manifest, clear: bool = False
) -> None:
"""Run a query that gets a populated cache of the relations in the
database and set the cache on this adapter.
"""
Expand All @@ -344,7 +353,8 @@ def set_relations_cache(self, manifest, clear=False):
self.cache.clear()
self._relations_cache_for_schemas(manifest)

def cache_new_relation(self, relation):
@available
def cache_added(self, relation: Optional[BaseRelation]) -> str:
"""Cache a new relation in dbt. It will show up in `list relations`."""
if relation is None:
name = self.nice_connection_name()
Expand All @@ -356,6 +366,42 @@ def cache_new_relation(self, relation):
# so jinja doesn't render things
return ''

@available
def cache_dropped(self, relation: Optional[BaseRelation]) -> str:
"""Drop a relation in dbt. It will no longer show up in
`list relations`, and any bound views will be dropped from the cache
"""
if relation is None:
name = self.nice_connection_name()
dbt.exceptions.raise_compiler_error(
'Attempted to drop a null relation for {}'.format(name)
)
if dbt.flags.USE_CACHE:
self.cache.drop(relation)
return ''

@available
def cache_renamed(
self,
from_relation: Optional[BaseRelation],
to_relation: Optional[BaseRelation],
) -> str:
"""Rename a relation in dbt. It will show up with a new name in
`list_relations`, but bound views will remain bound.
"""
if from_relation is None or to_relation is None:
name = self.nice_connection_name()
src_name = _relation_name(from_relation)
dst_name = _relation_name(to_relation)
dbt.exceptions.raise_compiler_error(
'Attempted to rename {} to {} for {}'
.format(src_name, dst_name, name)
)

if dbt.flags.USE_CACHE:
self.cache.rename(from_relation, to_relation)
return ''

###
# Abstract methods for database-specific values, attributes, and types
###
Expand Down
60 changes: 42 additions & 18 deletions core/dbt/adapters/base/meta.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
import abc
from functools import wraps
from dbt.deprecations import warn, renamed_method

from typing import Callable, Optional, Any, FrozenSet, Dict

def _always_none(*args, **kwargs):
return None
from dbt.deprecations import warn, renamed_method


def _always_list(*args, **kwargs):
return None
Decorator = Callable[[Any], Callable]


def available(func):
def available_function(func: Callable) -> Callable:
"""A decorator to indicate that a method on the adapter will be
exposed to the database wrapper, and will be available at parse and run
time.
"""
func._is_available_ = True
func._is_available_ = True # type: ignore
return func


def available_deprecated(supported_name, parse_replacement=None):
def available_deprecated(
supported_name: str,
parse_replacement: Optional[Callable] = None
) -> Decorator:
"""A decorator that marks a function as available, but also prints a
deprecation warning. Use like

Expand Down Expand Up @@ -50,12 +50,12 @@ def inner(*args, **kwargs):
return func(*args, **kwargs)

if parse_replacement:
available = available_parse(parse_replacement)
return available(inner)
available_function = available_parse(parse_replacement)
return available_function(inner)
return wrapper


def available_parse(parse_replacement):
def available_parse(parse_replacement: Callable) -> Decorator:
"""A decorator factory to indicate that a method on the adapter will be
exposed to the database wrapper, and will be stubbed out at parse time with
the given function.
Expand All @@ -79,10 +79,34 @@ def inner(func):
return inner


available.deprecated = available_deprecated
available.parse = available_parse
available.parse_none = available_parse(lambda *a, **k: None)
available.parse_list = available_parse(lambda *a, **k: [])
class available:
def __new__(cls, func: Callable) -> Callable:
return available_function(func)

@classmethod
def parse(cls, parse_replacement: Callable) -> Decorator:
return available_parse(parse_replacement)

@classmethod
def deprecated(
cls, supported_name: str, parse_replacement: Optional[Callable] = None
) -> Decorator:
return available_deprecated(supported_name, parse_replacement)

@classmethod
def parse_none(cls, func: Callable) -> Callable:
wrapper = available_parse(lambda *a, **k: None)
return wrapper(func)

@classmethod
def parse_list(cls, func: Callable) -> Callable:
wrapper = available_parse(lambda *a, **k: [])
return wrapper(func)

# available.deprecated = available_deprecated
# available.parse = available_parse
# available.parse_none = available_parse(lambda *a, **k: None)
# available.parse_list = available_parse(lambda *a, **k: [])


class AdapterMeta(abc.ABCMeta):
Expand Down Expand Up @@ -110,7 +134,7 @@ def __new__(mcls, name, bases, namespace, **kwargs):
if parse_replacement is not None:
replacements[name] = parse_replacement

cls._available_ = frozenset(available)
cls._available_: FrozenSet[str] = frozenset(available)
# should this be a namedtuple so it will be immutable like _available_?
cls._parse_replacements_ = replacements
cls._parse_replacements_: Dict[str, Callable] = replacements
return cls
Loading