Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter field to source table definitions (#1495) #1776

Merged
merged 3 commits into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 128 additions & 122 deletions core/dbt/adapters/base/impl.py

Large diffs are not rendered by default.

156 changes: 66 additions & 90 deletions core/dbt/adapters/base/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,105 +8,81 @@
Decorator = Callable[[Any], Callable]


def available_function(func: Callable) -> Callable:
"""A decorator to indicate that a method on the adapter will be
exposed to the database wrapper, and will be available at parse and run
time.
"""
func._is_available_ = True # type: ignore
return func


def available_deprecated(
supported_name: str,
parse_replacement: Optional[Callable] = None
) -> Decorator:
"""A decorator that marks a function as available, but also prints a
deprecation warning. Use like

@available_deprecated('my_new_method')
def my_old_method(self, arg):
args = compatability_shim(arg)
return self.my_new_method(*args)

@available_deprecated('my_new_slow_method', lambda *a, **k: (0, ''))
def my_old_slow_method(self, arg):
args = compatibility_shim(arg)
return self.my_new_slow_method(*args)

To make `adapter.my_old_method` available but also print out a warning on
use directing users to `my_new_method`.

The optional parse_replacement, if provided, will provide a parse-time
replacement for the actual method (see `available_parse`).
"""
def wrapper(func):
func_name = func.__name__
renamed_method(func_name, supported_name)

@wraps(func)
def inner(*args, **kwargs):
warn('adapter:{}'.format(func_name))
return func(*args, **kwargs)

if parse_replacement:
available_function = available_parse(parse_replacement)
return available_function(inner)
return wrapper


def available_parse(parse_replacement: Callable) -> Decorator:
"""A decorator factory to indicate that a method on the adapter will be
exposed to the database wrapper, and will be stubbed out at parse time with
the given function.

@available_parse()
def my_method(self, a, b):
if something:
return None
return big_expensive_db_query()

@available_parse(lambda *args, **args: {})
def my_other_method(self, a, b):
x = {}
x.update(big_expensive_db_query())
return x
"""
def inner(func):
func._parse_replacement_ = parse_replacement
available(func)
class _Available:
def __call__(self, func: Callable) -> Callable:
func._is_available_ = True # type: ignore
return func
return inner

def parse(self, parse_replacement: Callable) -> Decorator:
"""A decorator factory to indicate that a method on the adapter will be
exposed to the database wrapper, and will be stubbed out at parse time
with the given function.

@available.parse()
def my_method(self, a, b):
if something:
return None
return big_expensive_db_query()

@available.parse(lambda *args, **args: {})
def my_other_method(self, a, b):
x = {}
x.update(big_expensive_db_query())
return x
"""
def inner(func):
func._parse_replacement_ = parse_replacement
return self(func)
return inner

class available:
def __new__(cls, func: Callable) -> Callable:
return available_function(func)

@classmethod
def parse(cls, parse_replacement: Callable) -> Decorator:
return available_parse(parse_replacement)

@classmethod
def deprecated(
cls, supported_name: str, parse_replacement: Optional[Callable] = None
self, supported_name: str, parse_replacement: Optional[Callable] = None
) -> Decorator:
return available_deprecated(supported_name, parse_replacement)

@classmethod
def parse_none(cls, func: Callable) -> Callable:
wrapper = available_parse(lambda *a, **k: None)
"""A decorator that marks a function as available, but also prints a
deprecation warning. Use like

@available.deprecated('my_new_method')
def my_old_method(self, arg):
args = compatability_shim(arg)
return self.my_new_method(*args)

@available.deprecated('my_new_slow_method', lambda *a, **k: (0, ''))
def my_old_slow_method(self, arg):
args = compatibility_shim(arg)
return self.my_new_slow_method(*args)

To make `adapter.my_old_method` available but also print out a warning
on use directing users to `my_new_method`.

The optional parse_replacement, if provided, will provide a parse-time
replacement for the actual method (see `available.parse`).
"""
def wrapper(func):
func_name = func.__name__
renamed_method(func_name, supported_name)

@wraps(func)
def inner(*args, **kwargs):
warn('adapter:{}'.format(func_name))
return func(*args, **kwargs)

if parse_replacement:
available_function = self.parse(parse_replacement)
else:
available_function = self
return available_function(inner)
return wrapper

def parse_none(self, func: Callable) -> Callable:
wrapper = self.parse(lambda *a, **k: None)
return wrapper(func)

@classmethod
def parse_list(cls, func: Callable) -> Callable:
wrapper = available_parse(lambda *a, **k: [])
def parse_list(self, func: Callable) -> Callable:
wrapper = self.parse(lambda *a, **k: [])
return wrapper(func)

# available.deprecated = available_deprecated
# available.parse = available_parse
# available.parse_none = available_parse(lambda *a, **k: None)
# available.parse_list = available_parse(lambda *a, **k: [])

available = _Available()


class AdapterMeta(abc.ABCMeta):
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class FreshnessStatus(StrEnum):
class FreshnessThreshold(JsonSchemaMixin, Mergeable):
warn_after: Optional[Time] = None
error_after: Optional[Time] = None
filter: Optional[str] = None

def status(self, age: float) -> FreshnessStatus:
if self.error_after and self.error_after.exceeded(age):
Expand Down
13 changes: 8 additions & 5 deletions core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,22 @@
{%- endmacro %}


{% macro collect_freshness(source, loaded_at_field) %}
{{ return(adapter_macro('collect_freshness', source, loaded_at_field))}}
{% macro collect_freshness(source, loaded_at_field, filter) %}
{{ return(adapter_macro('collect_freshness', source, loaded_at_field, filter))}}
{% endmacro %}


{% macro default__collect_freshness(source, loaded_at_field) %}
{% call statement('check_schema_exists', fetch_result=True, auto_begin=False) -%}
{% macro default__collect_freshness(source, loaded_at_field, filter) %}
{% call statement('collect_freshness', fetch_result=True, auto_begin=False) -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

select
max({{ loaded_at_field }}) as max_loaded_at,
{{ current_timestamp() }} as snapshotted_at
from {{ source }}
{% if filter %}
where {{ filter }}
{% endif %}
{% endcall %}
{{ return(load_result('check_schema_exists').table) }}
{{ return(load_result('collect_freshness').table) }}
{% endmacro %}

{% macro make_temp_relation(base_relation, suffix='__dbt_tmp') %}
Expand Down
10 changes: 10 additions & 0 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,13 +456,23 @@ def from_run_result(self, result, start_time, timing_info):
return result

def execute(self, compiled_node, manifest):
# we should only be here if we compiled_node.has_freshness, and
# therefore loaded_at_field should be a str. If this invariant is
# broken, raise!
if compiled_node.loaded_at_field is None:
raise InternalException(
'Got to execute for source freshness of a source that has no '
'loaded_at_field!'
)

relation = self.adapter.Relation.create_from_source(compiled_node)
# given a Source, calculate its fresnhess.
with self.adapter.connection_named(compiled_node.unique_id):
self.adapter.clear_transaction()
freshness = self.adapter.calculate_freshness(
relation,
compiled_node.loaded_at_field,
compiled_node.freshness.filter,
manifest=manifest
)

Expand Down
12 changes: 8 additions & 4 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import json
import os
from enum import Enum
from typing import Tuple, Type, Any, Optional
from typing import Tuple, Type, Any, Optional, TypeVar, Dict

import dbt.exceptions

Expand Down Expand Up @@ -433,16 +433,20 @@ def parse_cli_vars(var_string):
raise


def filter_null_values(input):
K_T = TypeVar('K_T')
V_T = TypeVar('V_T')


def filter_null_values(input: Dict[K_T, V_T]) -> Dict[K_T, V_T]:
return dict((k, v) for (k, v) in input.items()
if v is not None)


def add_ephemeral_model_prefix(s):
def add_ephemeral_model_prefix(s: str) -> str:
return '__dbt__CTE__{}'.format(s)


def timestring():
def timestring() -> str:
"""Get the current datetime as an RFC 3339-compliant string"""
# isoformat doesn't include the mandatory trailing 'Z' for UTC.
return datetime.datetime.utcnow().isoformat() + 'Z'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,8 +1351,8 @@ def expected_postgres_references_manifest(self, model_database=None):
'documentation_name': 'source_info',
'documentation_package': '',
},
],
'freshness': {'error_after': None, 'warn_after': None},
],
'freshness': {'error_after': None, 'warn_after': None, 'filter': None},
'identifier': 'seed',
'loaded_at_field': None,
'loader': 'a_loader',
Expand Down
18 changes: 18 additions & 0 deletions test/integration/042_sources_test/filtered_models/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: 2
sources:
- name: test_source
loader: custom
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
filter: id > 1
schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}"
quoting:
identifier: True
tables:
- name: test_table
identifier: source
loaded_at_field: updated_at
freshness:
error_after: {count: 18, period: hour}
filter: id > 101
Loading