Skip to content

Commit

Permalink
Merge pull request #1776 from fishtown-analytics/feature/partition-fi…
Browse files Browse the repository at this point in the history
…lters-sources

Add filter field to source table definitions (#1495)
  • Loading branch information
beckjake authored Sep 26, 2019
2 parents 509e0e8 + 0522535 commit 23484b1
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 276 deletions.
250 changes: 128 additions & 122 deletions core/dbt/adapters/base/impl.py

Large diffs are not rendered by default.

156 changes: 66 additions & 90 deletions core/dbt/adapters/base/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,105 +8,81 @@
Decorator = Callable[[Any], Callable]


def available_function(func: Callable) -> Callable:
"""A decorator to indicate that a method on the adapter will be
exposed to the database wrapper, and will be available at parse and run
time.
"""
func._is_available_ = True # type: ignore
return func


def available_deprecated(
supported_name: str,
parse_replacement: Optional[Callable] = None
) -> Decorator:
"""A decorator that marks a function as available, but also prints a
deprecation warning. Use like
@available_deprecated('my_new_method')
def my_old_method(self, arg):
args = compatability_shim(arg)
return self.my_new_method(*args)
@available_deprecated('my_new_slow_method', lambda *a, **k: (0, ''))
def my_old_slow_method(self, arg):
args = compatibility_shim(arg)
return self.my_new_slow_method(*args)
To make `adapter.my_old_method` available but also print out a warning on
use directing users to `my_new_method`.
The optional parse_replacement, if provided, will provide a parse-time
replacement for the actual method (see `available_parse`).
"""
def wrapper(func):
func_name = func.__name__
renamed_method(func_name, supported_name)

@wraps(func)
def inner(*args, **kwargs):
warn('adapter:{}'.format(func_name))
return func(*args, **kwargs)

if parse_replacement:
available_function = available_parse(parse_replacement)
return available_function(inner)
return wrapper


def available_parse(parse_replacement: Callable) -> Decorator:
"""A decorator factory to indicate that a method on the adapter will be
exposed to the database wrapper, and will be stubbed out at parse time with
the given function.
@available_parse()
def my_method(self, a, b):
if something:
return None
return big_expensive_db_query()
@available_parse(lambda *args, **args: {})
def my_other_method(self, a, b):
x = {}
x.update(big_expensive_db_query())
return x
"""
def inner(func):
func._parse_replacement_ = parse_replacement
available(func)
class _Available:
def __call__(self, func: Callable) -> Callable:
func._is_available_ = True # type: ignore
return func
return inner

def parse(self, parse_replacement: Callable) -> Decorator:
"""A decorator factory to indicate that a method on the adapter will be
exposed to the database wrapper, and will be stubbed out at parse time
with the given function.
@available.parse()
def my_method(self, a, b):
if something:
return None
return big_expensive_db_query()
@available.parse(lambda *args, **args: {})
def my_other_method(self, a, b):
x = {}
x.update(big_expensive_db_query())
return x
"""
def inner(func):
func._parse_replacement_ = parse_replacement
return self(func)
return inner

class available:
def __new__(cls, func: Callable) -> Callable:
return available_function(func)

@classmethod
def parse(cls, parse_replacement: Callable) -> Decorator:
return available_parse(parse_replacement)

@classmethod
def deprecated(
cls, supported_name: str, parse_replacement: Optional[Callable] = None
self, supported_name: str, parse_replacement: Optional[Callable] = None
) -> Decorator:
return available_deprecated(supported_name, parse_replacement)

@classmethod
def parse_none(cls, func: Callable) -> Callable:
wrapper = available_parse(lambda *a, **k: None)
"""A decorator that marks a function as available, but also prints a
deprecation warning. Use like
@available.deprecated('my_new_method')
def my_old_method(self, arg):
args = compatability_shim(arg)
return self.my_new_method(*args)
@available.deprecated('my_new_slow_method', lambda *a, **k: (0, ''))
def my_old_slow_method(self, arg):
args = compatibility_shim(arg)
return self.my_new_slow_method(*args)
To make `adapter.my_old_method` available but also print out a warning
on use directing users to `my_new_method`.
The optional parse_replacement, if provided, will provide a parse-time
replacement for the actual method (see `available.parse`).
"""
def wrapper(func):
func_name = func.__name__
renamed_method(func_name, supported_name)

@wraps(func)
def inner(*args, **kwargs):
warn('adapter:{}'.format(func_name))
return func(*args, **kwargs)

if parse_replacement:
available_function = self.parse(parse_replacement)
else:
available_function = self
return available_function(inner)
return wrapper

def parse_none(self, func: Callable) -> Callable:
wrapper = self.parse(lambda *a, **k: None)
return wrapper(func)

@classmethod
def parse_list(cls, func: Callable) -> Callable:
wrapper = available_parse(lambda *a, **k: [])
def parse_list(self, func: Callable) -> Callable:
wrapper = self.parse(lambda *a, **k: [])
return wrapper(func)

# available.deprecated = available_deprecated
# available.parse = available_parse
# available.parse_none = available_parse(lambda *a, **k: None)
# available.parse_list = available_parse(lambda *a, **k: [])

available = _Available()


class AdapterMeta(abc.ABCMeta):
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class FreshnessStatus(StrEnum):
class FreshnessThreshold(JsonSchemaMixin, Mergeable):
warn_after: Optional[Time] = None
error_after: Optional[Time] = None
filter: Optional[str] = None

def status(self, age: float) -> FreshnessStatus:
if self.error_after and self.error_after.exceeded(age):
Expand Down
13 changes: 8 additions & 5 deletions core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,22 @@
{%- endmacro %}


{% macro collect_freshness(source, loaded_at_field) %}
{{ return(adapter_macro('collect_freshness', source, loaded_at_field))}}
{% macro collect_freshness(source, loaded_at_field, filter) %}
{{ return(adapter_macro('collect_freshness', source, loaded_at_field, filter))}}
{% endmacro %}


{% macro default__collect_freshness(source, loaded_at_field) %}
{% call statement('check_schema_exists', fetch_result=True, auto_begin=False) -%}
{% macro default__collect_freshness(source, loaded_at_field, filter) %}
{% call statement('collect_freshness', fetch_result=True, auto_begin=False) -%}
select
max({{ loaded_at_field }}) as max_loaded_at,
{{ current_timestamp() }} as snapshotted_at
from {{ source }}
{% if filter %}
where {{ filter }}
{% endif %}
{% endcall %}
{{ return(load_result('check_schema_exists').table) }}
{{ return(load_result('collect_freshness').table) }}
{% endmacro %}

{% macro make_temp_relation(base_relation, suffix='__dbt_tmp') %}
Expand Down
10 changes: 10 additions & 0 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,13 +456,23 @@ def from_run_result(self, result, start_time, timing_info):
return result

def execute(self, compiled_node, manifest):
# we should only be here if we compiled_node.has_freshness, and
# therefore loaded_at_field should be a str. If this invariant is
# broken, raise!
if compiled_node.loaded_at_field is None:
raise InternalException(
'Got to execute for source freshness of a source that has no '
'loaded_at_field!'
)

relation = self.adapter.Relation.create_from_source(compiled_node)
# given a Source, calculate its fresnhess.
with self.adapter.connection_named(compiled_node.unique_id):
self.adapter.clear_transaction()
freshness = self.adapter.calculate_freshness(
relation,
compiled_node.loaded_at_field,
compiled_node.freshness.filter,
manifest=manifest
)

Expand Down
12 changes: 8 additions & 4 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import json
import os
from enum import Enum
from typing import Tuple, Type, Any, Optional
from typing import Tuple, Type, Any, Optional, TypeVar, Dict

import dbt.exceptions

Expand Down Expand Up @@ -433,16 +433,20 @@ def parse_cli_vars(var_string):
raise


def filter_null_values(input):
K_T = TypeVar('K_T')
V_T = TypeVar('V_T')


def filter_null_values(input: Dict[K_T, V_T]) -> Dict[K_T, V_T]:
return dict((k, v) for (k, v) in input.items()
if v is not None)


def add_ephemeral_model_prefix(s):
def add_ephemeral_model_prefix(s: str) -> str:
return '__dbt__CTE__{}'.format(s)


def timestring():
def timestring() -> str:
"""Get the current datetime as an RFC 3339-compliant string"""
# isoformat doesn't include the mandatory trailing 'Z' for UTC.
return datetime.datetime.utcnow().isoformat() + 'Z'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,8 +1351,8 @@ def expected_postgres_references_manifest(self, model_database=None):
'documentation_name': 'source_info',
'documentation_package': '',
},
],
'freshness': {'error_after': None, 'warn_after': None},
],
'freshness': {'error_after': None, 'warn_after': None, 'filter': None},
'identifier': 'seed',
'loaded_at_field': None,
'loader': 'a_loader',
Expand Down
18 changes: 18 additions & 0 deletions test/integration/042_sources_test/filtered_models/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: 2
sources:
- name: test_source
loader: custom
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
filter: id > 1
schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}"
quoting:
identifier: True
tables:
- name: test_table
identifier: source
loaded_at_field: updated_at
freshness:
error_after: {count: 18, period: hour}
filter: id > 101
Loading

0 comments on commit 23484b1

Please sign in to comment.