Skip to content

Commit

Permalink
expose the cache to macros
Browse files Browse the repository at this point in the history
make new cache manipulation methods
mark the methods available to the sql context
move cache manipulation into appropriate macros and methods
update the changelog
fix some type checking
  • Loading branch information
Jacob Beck committed Sep 20, 2019
1 parent e31a9af commit ca1c84c
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 67 deletions.
15 changes: 13 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
## dbt 0.15.0 (TBD)

### Breaking changes
- Cache management changes:
- Materializations that perform creates via direct "create X" statements must call `adapter.cache_added`
- `create_table_as`/`create_view_as` already do this
- Materializations that perform drops via direct "drop" statements must call `adapter.cache_dropped`
- `adapter.drop_relation` already does this
- Materializations that perform renames via direct "alter table" statements must call `adapter.cache_renamed`
- `adapter.rename_relation` already does this

## dbt 0.14.2 (September 13, 2019)

### Overview
Expand Down Expand Up @@ -51,8 +62,8 @@ This is primarily a bugfix release which contains a few minor improvements too.
- Fix for non-atomic column expansion logic in Snowflake incremental models and snapshots ([#1687](https://github.com/fishtown-analytics/dbt/issues/1687), [#1690](https://github.com/fishtown-analytics/dbt/pull/1690))
- Fix for unprojected `count(*)` expression injected by custom data tests ([#1688](https://github.com/fishtown-analytics/dbt/pull/1688))
- Fix for `dbt run` and `dbt docs generate` commands when running against Panoply Redshift ([#1479](https://github.com/fishtown-analytics/dbt/issues/1479), [#1686](https://github.com/fishtown-analytics/dbt/pull/1686))


### Contributors:
Thanks for your contributions to dbt!

Expand Down
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
You must also set the 'TYPE' class attribute with a class-unique constant
string.
"""
TYPE = NotImplemented
TYPE: str = NotImplemented

def __init__(self, profile):
self.profile = profile
Expand Down
118 changes: 82 additions & 36 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
import abc
from contextlib import contextmanager
from datetime import datetime
from typing import (
Optional, Tuple, Callable, Container, FrozenSet, Type
)

import agate
import pytz

import dbt.exceptions
import dbt.flags
import dbt.clients.agate_helper

from dbt.clients.agate_helper import empty_table
from dbt.config import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.node_types import NodeType
from dbt.loader import GraphLoader
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import filter_null_values


from dbt.adapters.base.connections import BaseConnectionManager
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base import BaseRelation
from dbt.adapters.base import Column
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.cache import RelationsCache


GET_CATALOG_MACRO_NAME = 'get_catalog'
FRESHNESS_MACRO_NAME = 'collect_freshness'


def _expect_row_value(key, row):
def _expect_row_value(key: str, row: agate.Row):
if key not in row.keys():
raise dbt.exceptions.InternalException(
'Got a row without "{}" column, columns: {}'
Expand All @@ -34,7 +39,9 @@ def _expect_row_value(key, row):
return row[key]


def _relations_filter_schemas(schemas):
def _relations_filter_schemas(
schemas: Container[str]
) -> Callable[[agate.Row], bool]:
def test(row):
referenced_schema = _expect_row_value('referenced_schema', row)
dependent_schema = _expect_row_value('dependent_schema', row)
Expand All @@ -47,7 +54,7 @@ def test(row):
return test


def _catalog_filter_schemas(manifest):
def _catalog_filter_schemas(manifest: Manifest) -> Callable[[agate.Row], bool]:
"""Return a function that takes a row and decides if the row should be
included in the catalog output.
"""
Expand All @@ -65,7 +72,7 @@ def test(row):
return test


def _utc(dt, source, field_name):
def _utc(dt: datetime, source: str, field_name: str) -> datetime:
"""If dt has a timezone, return a new datetime that's in UTC. Otherwise,
assume the datetime is already for UTC and add the timezone.
"""
Expand All @@ -90,6 +97,13 @@ def _utc(dt, source, field_name):
return dt.replace(tzinfo=pytz.UTC)


def _relation_name(rel: Optional[BaseRelation]) -> str:
if rel is None:
return 'null relation'
else:
return str(rel)


class SchemaSearchMap(dict):
"""A utility class to keep track of what information_schema tables to
search for what schemas
Expand Down Expand Up @@ -171,22 +185,19 @@ class BaseAdapter(metaclass=AdapterMeta):
Macros:
- get_catalog
"""
requires = {}

Relation = BaseRelation
Column = Column
# This should be an implementation of BaseConnectionManager
ConnectionManager = None
Relation: Type[BaseRelation] = BaseRelation
Column: Type[BaseColumn] = BaseColumn
ConnectionManager: Type[BaseConnectionManager]

# A set of clobber config fields accepted by this adapter
# for use in materializations
AdapterSpecificConfigs = frozenset()
AdapterSpecificConfigs: FrozenSet[str] = frozenset()

def __init__(self, config):
self.config = config
def __init__(self, config: RuntimeConfig):
self.config: RuntimeConfig = config
self.cache = RelationsCache()
self.connections = self.ConnectionManager(config)
self._internal_manifest_lazy = None
self._internal_manifest_lazy: Optional[Manifest] = None

###
# Methods that pass through to the connection manager
Expand Down Expand Up @@ -219,8 +230,10 @@ def connection_named(self, name):
finally:
self.release_connection()

@available.parse(lambda *a, **k: ('', dbt.clients.agate_helper()))
def execute(self, sql, auto_begin=False, fetch=False):
@available.parse(lambda *a, **k: ('', empty_table()))
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[str, agate.Table]:
"""Execute the given SQL. This is a thin wrapper around
ConnectionManager.execute.
Expand All @@ -241,7 +254,7 @@ def execute(self, sql, auto_begin=False, fetch=False):
# Methods that should never be overridden
###
@classmethod
def type(cls):
def type(cls) -> str:
"""Get the type of this adapter. Types must be class-unique and
consistent.
Expand All @@ -251,12 +264,12 @@ def type(cls):
return cls.ConnectionManager.TYPE

@property
def _internal_manifest(self):
def _internal_manifest(self) -> Manifest:
if self._internal_manifest_lazy is None:
self.load_internal_manifest()
return self.load_internal_manifest()
return self._internal_manifest_lazy

def check_internal_manifest(self):
def check_internal_manifest(self) -> Optional[Manifest]:
"""Return the internal manifest (used for executing macros) if it's
been initialized, otherwise return None.
"""
Expand All @@ -271,7 +284,7 @@ def load_internal_manifest(self) -> Manifest:
###
# Caching methods
###
def _schema_is_cached(self, database, schema):
def _schema_is_cached(self, database: str, schema: str):
"""Check if the schema is cached, and by default logs if it is not."""

if dbt.flags.USE_CACHE is False:
Expand All @@ -285,15 +298,9 @@ def _schema_is_cached(self, database, schema):
else:
return True

@classmethod
def _relations_filter_table(cls, table, schemas):
"""Filter the table as appropriate for relations table entries.
Subclasses can override this to change filtering rules on a per-adapter
basis.
"""
return table.where(_relations_filter_schemas(schemas))

def _get_cache_schemas(self, manifest, exec_only=False):
def _get_cache_schemas(
self, manifest: Manifest, exec_only: bool = False
) -> SchemaSearchMap:
"""Get a mapping of each node's "information_schema" relations to a
set of all schemas expected in that information_schema.
Expand All @@ -314,7 +321,7 @@ def _get_cache_schemas(self, manifest, exec_only=False):
# schemas
return info_schema_name_map

def _relations_cache_for_schemas(self, manifest):
def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
"""Populate the relations cache for the given schemas. Returns an
iteratble of the schemas populated, as strings.
"""
Expand All @@ -332,7 +339,9 @@ def _relations_cache_for_schemas(self, manifest):
# so we can check it later
self.cache.update_schemas(info_schema_name_map.schemas_searched())

def set_relations_cache(self, manifest, clear=False):
def set_relations_cache(
self, manifest: Manifest, clear: bool = False
) -> None:
"""Run a query that gets a populated cache of the relations in the
database and set the cache on this adapter.
"""
Expand All @@ -344,7 +353,8 @@ def set_relations_cache(self, manifest, clear=False):
self.cache.clear()
self._relations_cache_for_schemas(manifest)

def cache_new_relation(self, relation):
@available
def cache_added(self, relation: Optional[BaseRelation]) -> str:
"""Cache a new relation in dbt. It will show up in `list relations`."""
if relation is None:
name = self.nice_connection_name()
Expand All @@ -356,6 +366,42 @@ def cache_new_relation(self, relation):
# so jinja doesn't render things
return ''

@available
def cache_dropped(self, relation: Optional[BaseRelation]) -> str:
"""Drop a relation in dbt. It will no longer show up in
`list relations`, and any bound views will be dropped from the cache
"""
if relation is None:
name = self.nice_connection_name()
dbt.exceptions.raise_compiler_error(
'Attempted to drop a null relation for {}'.format(name)
)
if dbt.flags.USE_CACHE:
self.cache.drop(relation)
return ''

@available
def cache_renamed(
self,
from_relation: Optional[BaseRelation],
to_relation: Optional[BaseRelation],
) -> str:
"""Rename a relation in dbt. It will show up with a new name in
`list_relations`, but bound views will remain bound.
"""
if from_relation is None or to_relation is None:
name = self.nice_connection_name()
src_name = _relation_name(from_relation)
dst_name = _relation_name(to_relation)
dbt.exceptions.raise_compiler_error(
'Attempted to rename {} to {} for {}'
.format(src_name, dst_name, name)
)

if dbt.flags.USE_CACHE:
self.cache.rename(from_relation, to_relation)
return ''

###
# Abstract methods for database-specific values, attributes, and types
###
Expand Down
60 changes: 42 additions & 18 deletions core/dbt/adapters/base/meta.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
import abc
from functools import wraps
from dbt.deprecations import warn, renamed_method

from typing import Callable, Optional, Any, FrozenSet, Dict

def _always_none(*args, **kwargs):
return None
from dbt.deprecations import warn, renamed_method


def _always_list(*args, **kwargs):
return None
Decorator = Callable[[Any], Callable]


def available(func):
def available_function(func: Callable) -> Callable:
"""A decorator to indicate that a method on the adapter will be
exposed to the database wrapper, and will be available at parse and run
time.
"""
func._is_available_ = True
func._is_available_ = True # type: ignore
return func


def available_deprecated(supported_name, parse_replacement=None):
def available_deprecated(
supported_name: str,
parse_replacement: Optional[Callable] = None
) -> Decorator:
"""A decorator that marks a function as available, but also prints a
deprecation warning. Use like
Expand Down Expand Up @@ -50,12 +50,12 @@ def inner(*args, **kwargs):
return func(*args, **kwargs)

if parse_replacement:
available = available_parse(parse_replacement)
return available(inner)
available_function = available_parse(parse_replacement)
return available_function(inner)
return wrapper


def available_parse(parse_replacement):
def available_parse(parse_replacement: Callable) -> Decorator:
"""A decorator factory to indicate that a method on the adapter will be
exposed to the database wrapper, and will be stubbed out at parse time with
the given function.
Expand All @@ -79,10 +79,34 @@ def inner(func):
return inner


available.deprecated = available_deprecated
available.parse = available_parse
available.parse_none = available_parse(lambda *a, **k: None)
available.parse_list = available_parse(lambda *a, **k: [])
class available:
def __new__(cls, func: Callable) -> Callable:
return available_function(func)

@classmethod
def parse(cls, parse_replacement: Callable) -> Decorator:
return available_parse(parse_replacement)

@classmethod
def deprecated(
cls, supported_name: str, parse_replacement: Optional[Callable] = None
) -> Decorator:
return available_deprecated(supported_name, parse_replacement)

@classmethod
def parse_none(cls, func: Callable) -> Callable:
wrapper = available_parse(lambda *a, **k: None)
return wrapper(func)

@classmethod
def parse_list(cls, func: Callable) -> Callable:
wrapper = available_parse(lambda *a, **k: [])
return wrapper(func)

# available.deprecated = available_deprecated
# available.parse = available_parse
# available.parse_none = available_parse(lambda *a, **k: None)
# available.parse_list = available_parse(lambda *a, **k: [])


class AdapterMeta(abc.ABCMeta):
Expand Down Expand Up @@ -110,7 +134,7 @@ def __new__(mcls, name, bases, namespace, **kwargs):
if parse_replacement is not None:
replacements[name] = parse_replacement

cls._available_ = frozenset(available)
cls._available_: FrozenSet[str] = frozenset(available)
# should this be a namedtuple so it will be immutable like _available_?
cls._parse_replacements_ = replacements
cls._parse_replacements_: Dict[str, Callable] = replacements
return cls
Loading

0 comments on commit ca1c84c

Please sign in to comment.