diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index e57d2120b52..93208be858b 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -2,7 +2,7 @@ from contextlib import contextmanager from datetime import datetime from typing import ( - Optional, Tuple, Callable, Container, FrozenSet, Type + Optional, Tuple, Callable, Container, FrozenSet, Type, Dict, Any, List ) import agate @@ -61,7 +61,7 @@ def _catalog_filter_schemas(manifest: Manifest) -> Callable[[agate.Row], bool]: schemas = frozenset((d.lower(), s.lower()) for d, s in manifest.get_used_schemas()) - def test(row): + def test(row: agate.Row) -> bool: table_database = _expect_row_value('table_database', row) table_schema = _expect_row_value('table_schema', row) # the schema may be present but None, which is not an error and should @@ -72,7 +72,9 @@ def test(row): return test -def _utc(dt: datetime, source: str, field_name: str) -> datetime: +def _utc( + dt: Optional[datetime], source: BaseRelation, field_name: str +) -> datetime: """If dt has a timezone, return a new datetime that's in UTC. Otherwise, assume the datetime is already for UTC and add the timezone. """ @@ -406,17 +408,13 @@ def cache_renamed( # Abstract methods for database-specific values, attributes, and types ### @abc.abstractclassmethod - def date_function(cls): - """Get the date function used by this adapter's database. - - :return: The date function - :rtype: str - """ + def date_function(cls) -> str: + """Get the date function used by this adapter's database.""" raise dbt.exceptions.NotImplementedException( '`date_function` is not implemented for this adapter!') @abc.abstractclassmethod - def is_cancelable(cls): + def is_cancelable(cls) -> bool: raise dbt.exceptions.NotImplementedException( '`is_cancelable` is not implemented for this adapter!' ) @@ -425,19 +423,14 @@ def is_cancelable(cls): # Abstract methods about schemas ### @abc.abstractmethod - def list_schemas(self, database): - """Get a list of existing schemas. - - :param str database: The name of the database to list under. - :return: All schemas that currently exist in the database - :rtype: List[str] - """ + def list_schemas(self, database: str) -> List[str]: + """Get a list of existing schemas in database""" raise dbt.exceptions.NotImplementedException( '`list_schemas` is not implemented for this adapter!' ) @available.parse(lambda *a, **k: False) - def check_schema_exists(self, database, schema): + def check_schema_exists(self, database: str, schema: str) -> bool: """Check if a schema exists. The default implementation of this is potentially unnecessarily slow, @@ -455,12 +448,10 @@ def check_schema_exists(self, database, schema): ### @abc.abstractmethod @available.parse_none - def drop_relation(self, relation): + def drop_relation(self, relation: BaseRelation) -> None: """Drop the given relation. *Implementors must call self.cache.drop() to preserve cache state!* - - :param self.Relation relation: The relation to drop """ raise dbt.exceptions.NotImplementedException( '`drop_relation` is not implemented for this adapter!' @@ -468,24 +459,20 @@ def drop_relation(self, relation): @abc.abstractmethod @available.parse_none - def truncate_relation(self, relation): - """Truncate the given relation. - - :param self.Relation relation: The relation to truncate - """ + def truncate_relation(self, relation: BaseRelation) -> None: + """Truncate the given relation.""" raise dbt.exceptions.NotImplementedException( '`truncate_relation` is not implemented for this adapter!' ) @abc.abstractmethod @available.parse_none - def rename_relation(self, from_relation, to_relation): + def rename_relation( + self, from_relation: BaseRelation, to_relation: BaseRelation + ) -> None: """Rename the relation from from_relation to to_relation. Implementors must call self.cache.rename() to preserve cache state. - - :param self.Relation from_relation: The original relation name - :param self.Relation to_relation: The new relation name """ raise dbt.exceptions.NotImplementedException( '`rename_relation` is not implemented for this adapter!' @@ -493,19 +480,18 @@ def rename_relation(self, from_relation, to_relation): @abc.abstractmethod @available.parse_list - def get_columns_in_relation(self, relation): - """Get a list of the columns in the given Relation. - - :param self.Relation relation: The relation to query for. - :return: Information about all columns in the given relation. - :rtype: List[self.Column] - """ + def get_columns_in_relation( + self, relation: BaseRelation + ) -> List[BaseColumn]: + """Get a list of the columns in the given Relation.""" raise dbt.exceptions.NotImplementedException( '`get_columns_in_relation` is not implemented for this adapter!' ) @available.deprecated('get_columns_in_relation', lambda *a, **k: []) - def get_columns_in_table(self, schema, identifier): + def get_columns_in_table( + self, schema: str, identifier: str + ) -> List[BaseColumn]: """DEPRECATED: Get a list of the columns in the given table.""" relation = self.Relation.create( database=self.config.credentials.database, @@ -516,7 +502,9 @@ def get_columns_in_table(self, schema, identifier): return self.get_columns_in_relation(relation) @abc.abstractmethod - def expand_column_types(self, goal, current): + def expand_column_types( + self, goal: BaseRelation, current: BaseRelation + ) -> None: """Expand the current table's types to match the goal table. (passable) :param self.Relation goal: A relation that currently exists in the @@ -529,7 +517,9 @@ def expand_column_types(self, goal, current): ) @abc.abstractmethod - def list_relations_without_caching(self, information_schema, schema): + def list_relations_without_caching( + self, information_schema: BaseRelation, schema: str + ) -> List[BaseRelation]: """List relations in the given schema, bypassing the cache. This is used as the underlying behavior to fill the cache. @@ -549,16 +539,11 @@ def list_relations_without_caching(self, information_schema, schema): # Provided methods about relations ### @available.parse_list - def get_missing_columns(self, from_relation, to_relation): + def get_missing_columns( + self, from_relation: BaseRelation, to_relation: BaseRelation + ) -> List[BaseColumn]: """Returns a list of Columns in from_relation that are missing from to_relation. - - :param Relation from_relation: The relation that might have extra - columns - :param Relation to_relation: The realtion that might have columns - missing - :return: The columns in from_relation that are missing from to_relation - :rtype: List[self.Relation] """ if not isinstance(from_relation, self.Relation): dbt.exceptions.invalid_type_error( @@ -592,7 +577,7 @@ def get_missing_columns(self, from_relation, to_relation): ] @available.parse_none - def valid_snapshot_target(self, relation): + def valid_snapshot_target(self, relation: BaseRelation) -> None: """Ensure that the target relation is valid, by making sure it has the expected columns. @@ -634,7 +619,9 @@ def valid_snapshot_target(self, relation): dbt.exceptions.raise_compiler_error(msg) @available.parse_none - def expand_target_column_types(self, from_relation, to_relation): + def expand_target_column_types( + self, from_relation: BaseRelation, to_relation: BaseRelation + ) -> None: if not isinstance(from_relation, self.Relation): dbt.exceptions.invalid_type_error( method_name='expand_target_column_types', @@ -651,7 +638,7 @@ def expand_target_column_types(self, from_relation, to_relation): self.expand_column_types(from_relation, to_relation) - def list_relations(self, database, schema): + def list_relations(self, database: str, schema: str) -> List[BaseRelation]: if self._schema_is_cached(database, schema): return self.cache.get_relations(database, schema) @@ -672,7 +659,9 @@ def list_relations(self, database, schema): .format(database, schema, relations)) return relations - def _make_match_kwargs(self, database, schema, identifier): + def _make_match_kwargs( + self, database: str, schema: str, identifier: str + ) -> Dict[str, str]: quoting = self.config.quoting if identifier is not None and quoting['identifier'] is False: identifier = identifier.lower() @@ -689,7 +678,13 @@ def _make_match_kwargs(self, database, schema, identifier): 'schema': schema, }) - def _make_match(self, relations_list, database, schema, identifier): + def _make_match( + self, + relations_list: List[BaseRelation], + database: str, + schema: str, + identifier: str, + ) -> List[BaseRelation]: matches = [] @@ -702,7 +697,9 @@ def _make_match(self, relations_list, database, schema, identifier): return matches @available.parse_none - def get_relation(self, database, schema, identifier): + def get_relation( + self, database: str, schema: str, identifier: str + ) -> Optional[BaseRelation]: relations_list = self.list_relations(database, schema) matches = self._make_match(relations_list, database, schema, @@ -724,7 +721,7 @@ def get_relation(self, database, schema, identifier): return None @available.deprecated('get_relation', lambda *a, **k: False) - def already_exists(self, schema, name): + def already_exists(self, schema: str, name: str) -> bool: """DEPRECATED: Return if a model already exists in the database""" database = self.config.credentials.database relation = self.get_relation(database, schema, name) @@ -736,47 +733,39 @@ def already_exists(self, schema, name): ### @abc.abstractmethod @available.parse_none - def create_schema(self, database, schema): - """Create the given schema if it does not exist. - - :param str schema: The schema name to create. - """ + def create_schema(self, database: str, schema: str): + """Create the given schema if it does not exist.""" raise dbt.exceptions.NotImplementedException( '`create_schema` is not implemented for this adapter!' ) @abc.abstractmethod - def drop_schema(self, database, schema): - """Drop the given schema (and everything in it) if it exists. - - :param str schema: The schema name to drop. - """ + def drop_schema(self, database: str, schema: str): + """Drop the given schema (and everything in it) if it exists.""" raise dbt.exceptions.NotImplementedException( '`drop_schema` is not implemented for this adapter!' ) @available @abc.abstractclassmethod - def quote(cls, identifier): - """Quote the given identifier, as appropriate for the database. - - :param str identifier: The identifier to quote - :return: The quoted identifier - :rtype: str - """ + def quote(cls, identifier: str) -> str: + """Quote the given identifier, as appropriate for the database.""" raise dbt.exceptions.NotImplementedException( '`quote` is not implemented for this adapter!' ) @available - def quote_as_configured(self, identifier, quote_key): + def quote_as_configured(self, identifier: str, quote_key: str) -> str: """Quote or do not quote the given identifer as configured in the project config for the quote key. The quote key should be one of 'database' (on bigquery, 'profile'), 'identifier', or 'schema', or it will be treated as if you set `True`. """ - default = self.Relation.DEFAULTS['quote_policy'].get(quote_key) + # TODO: Convert BaseRelation to a hologram.JsonSchemaMixin so mypy + # likes this + quotes = self.Relation.DEFAULTS['quote_policy'] + default = quotes.get(quote_key) # type: ignore if self.config.quoting.get(quote_key, default): return self.quote(identifier) else: @@ -787,79 +776,81 @@ def quote_as_configured(self, identifier, quote_key): # converting agate types into their sql equivalents. ### @abc.abstractclassmethod - def convert_text_type(cls, agate_table, col_idx): + def convert_text_type( + cls, agate_table: agate.Table, col_idx: int + ) -> str: """Return the type in the database that best maps to the agate.Text type for the given agate table and column index. - :param agate.Table agate_table: The table - :param int col_idx: The index into the agate table for the column. + :param agate_table: The table + :param col_idx: The index into the agate table for the column. :return: The name of the type in the database - :rtype: str """ raise dbt.exceptions.NotImplementedException( '`convert_text_type` is not implemented for this adapter!') @abc.abstractclassmethod - def convert_number_type(cls, agate_table, col_idx): + def convert_number_type( + cls, agate_table: agate.Table, col_idx: int + ) -> str: """Return the type in the database that best maps to the agate.Number type for the given agate table and column index. - :param agate.Table agate_table: The table - :param int col_idx: The index into the agate table for the column. + :param agate_table: The table + :param col_idx: The index into the agate table for the column. :return: The name of the type in the database - :rtype: str """ raise dbt.exceptions.NotImplementedException( '`convert_number_type` is not implemented for this adapter!') @abc.abstractclassmethod - def convert_boolean_type(cls, agate_table, col_idx): + def convert_boolean_type( + cls, agate_table: agate.Table, col_idx: int + ) -> str: """Return the type in the database that best maps to the agate.Boolean type for the given agate table and column index. - :param agate.Table agate_table: The table - :param int col_idx: The index into the agate table for the column. + :param agate_table: The table + :param col_idx: The index into the agate table for the column. :return: The name of the type in the database - :rtype: str """ raise dbt.exceptions.NotImplementedException( '`convert_boolean_type` is not implemented for this adapter!') @abc.abstractclassmethod - def convert_datetime_type(cls, agate_table, col_idx): + def convert_datetime_type( + cls, agate_table: agate.Table, col_idx: int + ) -> str: """Return the type in the database that best maps to the agate.DateTime type for the given agate table and column index. - :param agate.Table agate_table: The table - :param int col_idx: The index into the agate table for the column. + :param agate_table: The table + :param col_idx: The index into the agate table for the column. :return: The name of the type in the database - :rtype: str """ raise dbt.exceptions.NotImplementedException( '`convert_datetime_type` is not implemented for this adapter!') @abc.abstractclassmethod - def convert_date_type(cls, agate_table, col_idx): + def convert_date_type(cls, agate_table: agate.Table, col_idx: int) -> str: """Return the type in the database that best maps to the agate.Date type for the given agate table and column index. - :param agate.Table agate_table: The table - :param int col_idx: The index into the agate table for the column. + :param agate_table: The table + :param col_idx: The index into the agate table for the column. :return: The name of the type in the database - :rtype: str """ raise dbt.exceptions.NotImplementedException( '`convert_date_type` is not implemented for this adapter!') @abc.abstractclassmethod - def convert_time_type(cls, agate_table, col_idx): + def convert_time_type(cls, agate_table: agate.Table, col_idx: int) -> str: """Return the type in the database that best maps to the agate.TimeDelta type for the given agate table and column index. - :param agate.Table agate_table: The table - :param int col_idx: The index into the agate table for the column. + :param agate_table: The table + :param col_idx: The index into the agate table for the column. :return: The name of the type in the database - :rtype: str """ raise dbt.exceptions.NotImplementedException( '`convert_time_type` is not implemented for this adapter!') @@ -887,24 +878,27 @@ def convert_agate_type(cls, agate_table, col_idx): ### # Operations involving the manifest ### - def execute_macro(self, macro_name, manifest=None, project=None, - context_override=None, kwargs=None, release=False): + def execute_macro( + self, + macro_name: str, + manifest: Optional[Manifest] = None, + project: Optional[str] = None, + context_override: Optional[Dict[str, Any]] = None, + kwargs: Dict[str, Any] = None, + release: bool = False, + ) -> agate.Table: """Look macro_name up in the manifest and execute its results. - :param str macro_name: The name of the macro to execute. - :param Optional[Manifest] manifest: The manifest to use for generating - the base macro execution context. If none is provided, use the - internal manifest. - :param Optional[str] project: The name of the project to search in, or - None for the first match. - :param Optional[dict] context_override: An optional dict to update() - the macro execution context. - :param Optional[dict] kwargs: An optional dict of keyword args used to - pass to the macro. - :param bool release: If True, release the connection after executing. - - Return an an AttrDict with three attributes: 'table', 'data', and - 'status'. 'table' is an agate.Table. + :param macro_name: The name of the macro to execute. + :param manifest: The manifest to use for generating the base macro + execution context. If none is provided, use the internal manifest. + :param project: The name of the project to search in, or None for the + first match. + :param context_override: An optional dict to update() the macro + execution context. + :param kwargs: An optional dict of keyword args used to pass to the + macro. + :param release: If True, release the connection after executing. """ if kwargs is None: kwargs = {} @@ -947,13 +941,15 @@ def execute_macro(self, macro_name, manifest=None, project=None, return result @classmethod - def _catalog_filter_table(cls, table, manifest): + def _catalog_filter_table( + cls, table: agate.Table, manifest: Manifest + ) -> agate.Table: """Filter the table as appropriate for catalog entries. Subclasses can override this to change filtering rules on a per-adapter basis. """ return table.where(_catalog_filter_schemas(manifest)) - def get_catalog(self, manifest): + def get_catalog(self, manifest: Manifest) -> agate.Table: """Get the catalog for this manifest by running the get catalog macro. Returns an agate.Table of catalog information. """ @@ -971,12 +967,18 @@ def cancel_open_connections(self): """Cancel all open connections.""" return self.connections.cancel_open() - def calculate_freshness(self, source, loaded_at_field, manifest=None): + def calculate_freshness( + self, + source: BaseRelation, + loaded_at_field: str, + filter: Optional[str], + manifest: Optional[Manifest] = None + ) -> Dict[str, Any]: """Calculate the freshness of sources in dbt, and return it""" - # in the future `source` will be a Relation instead of a string - kwargs = { + kwargs: Dict[str, Any] = { 'source': source, - 'loaded_at_field': loaded_at_field + 'loaded_at_field': loaded_at_field, + 'filter': filter, } # run the macro @@ -994,10 +996,14 @@ def calculate_freshness(self, source, loaded_at_field, manifest=None): FRESHNESS_MACRO_NAME, [tuple(r) for r in table] ) ) + if table[0][0] is None: + # no records in the table, so really the max_loaded_at was + # infinitely long ago. Just call it 0:00 January 1 year UTC + max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) + else: + max_loaded_at = _utc(table[0][0], source, loaded_at_field) - max_loaded_at = _utc(table[0][0], source, loaded_at_field) snapshotted_at = _utc(table[0][1], source, loaded_at_field) - age = (snapshotted_at - max_loaded_at).total_seconds() return { 'max_loaded_at': max_loaded_at, diff --git a/core/dbt/adapters/base/meta.py b/core/dbt/adapters/base/meta.py index e917fdfc597..5858a919ab1 100644 --- a/core/dbt/adapters/base/meta.py +++ b/core/dbt/adapters/base/meta.py @@ -8,105 +8,81 @@ Decorator = Callable[[Any], Callable] -def available_function(func: Callable) -> Callable: - """A decorator to indicate that a method on the adapter will be - exposed to the database wrapper, and will be available at parse and run - time. - """ - func._is_available_ = True # type: ignore - return func - - -def available_deprecated( - supported_name: str, - parse_replacement: Optional[Callable] = None -) -> Decorator: - """A decorator that marks a function as available, but also prints a - deprecation warning. Use like - - @available_deprecated('my_new_method') - def my_old_method(self, arg): - args = compatability_shim(arg) - return self.my_new_method(*args) - - @available_deprecated('my_new_slow_method', lambda *a, **k: (0, '')) - def my_old_slow_method(self, arg): - args = compatibility_shim(arg) - return self.my_new_slow_method(*args) - - To make `adapter.my_old_method` available but also print out a warning on - use directing users to `my_new_method`. - - The optional parse_replacement, if provided, will provide a parse-time - replacement for the actual method (see `available_parse`). - """ - def wrapper(func): - func_name = func.__name__ - renamed_method(func_name, supported_name) - - @wraps(func) - def inner(*args, **kwargs): - warn('adapter:{}'.format(func_name)) - return func(*args, **kwargs) - - if parse_replacement: - available_function = available_parse(parse_replacement) - return available_function(inner) - return wrapper - - -def available_parse(parse_replacement: Callable) -> Decorator: - """A decorator factory to indicate that a method on the adapter will be - exposed to the database wrapper, and will be stubbed out at parse time with - the given function. - - @available_parse() - def my_method(self, a, b): - if something: - return None - return big_expensive_db_query() - - @available_parse(lambda *args, **args: {}) - def my_other_method(self, a, b): - x = {} - x.update(big_expensive_db_query()) - return x - """ - def inner(func): - func._parse_replacement_ = parse_replacement - available(func) +class _Available: + def __call__(self, func: Callable) -> Callable: + func._is_available_ = True # type: ignore return func - return inner + def parse(self, parse_replacement: Callable) -> Decorator: + """A decorator factory to indicate that a method on the adapter will be + exposed to the database wrapper, and will be stubbed out at parse time + with the given function. + + @available.parse() + def my_method(self, a, b): + if something: + return None + return big_expensive_db_query() + + @available.parse(lambda *args, **args: {}) + def my_other_method(self, a, b): + x = {} + x.update(big_expensive_db_query()) + return x + """ + def inner(func): + func._parse_replacement_ = parse_replacement + return self(func) + return inner -class available: - def __new__(cls, func: Callable) -> Callable: - return available_function(func) - - @classmethod - def parse(cls, parse_replacement: Callable) -> Decorator: - return available_parse(parse_replacement) - - @classmethod def deprecated( - cls, supported_name: str, parse_replacement: Optional[Callable] = None + self, supported_name: str, parse_replacement: Optional[Callable] = None ) -> Decorator: - return available_deprecated(supported_name, parse_replacement) - - @classmethod - def parse_none(cls, func: Callable) -> Callable: - wrapper = available_parse(lambda *a, **k: None) + """A decorator that marks a function as available, but also prints a + deprecation warning. Use like + + @available.deprecated('my_new_method') + def my_old_method(self, arg): + args = compatability_shim(arg) + return self.my_new_method(*args) + + @available.deprecated('my_new_slow_method', lambda *a, **k: (0, '')) + def my_old_slow_method(self, arg): + args = compatibility_shim(arg) + return self.my_new_slow_method(*args) + + To make `adapter.my_old_method` available but also print out a warning + on use directing users to `my_new_method`. + + The optional parse_replacement, if provided, will provide a parse-time + replacement for the actual method (see `available.parse`). + """ + def wrapper(func): + func_name = func.__name__ + renamed_method(func_name, supported_name) + + @wraps(func) + def inner(*args, **kwargs): + warn('adapter:{}'.format(func_name)) + return func(*args, **kwargs) + + if parse_replacement: + available_function = self.parse(parse_replacement) + else: + available_function = self + return available_function(inner) + return wrapper + + def parse_none(self, func: Callable) -> Callable: + wrapper = self.parse(lambda *a, **k: None) return wrapper(func) - @classmethod - def parse_list(cls, func: Callable) -> Callable: - wrapper = available_parse(lambda *a, **k: []) + def parse_list(self, func: Callable) -> Callable: + wrapper = self.parse(lambda *a, **k: []) return wrapper(func) -# available.deprecated = available_deprecated -# available.parse = available_parse -# available.parse_none = available_parse(lambda *a, **k: None) -# available.parse_list = available_parse(lambda *a, **k: []) + +available = _Available() class AdapterMeta(abc.ABCMeta): diff --git a/core/dbt/contracts/graph/unparsed.py b/core/dbt/contracts/graph/unparsed.py index 1df73a70185..2cc58683b0d 100644 --- a/core/dbt/contracts/graph/unparsed.py +++ b/core/dbt/contracts/graph/unparsed.py @@ -105,6 +105,7 @@ class FreshnessStatus(StrEnum): class FreshnessThreshold(JsonSchemaMixin, Mergeable): warn_after: Optional[Time] = None error_after: Optional[Time] = None + filter: Optional[str] = None def status(self, age: float) -> FreshnessStatus: if self.error_after and self.error_after.exceeded(age): diff --git a/core/dbt/include/global_project/macros/adapters/common.sql b/core/dbt/include/global_project/macros/adapters/common.sql index 1a6c3ecd59b..a6be4d8cbc1 100644 --- a/core/dbt/include/global_project/macros/adapters/common.sql +++ b/core/dbt/include/global_project/macros/adapters/common.sql @@ -240,19 +240,22 @@ {%- endmacro %} -{% macro collect_freshness(source, loaded_at_field) %} - {{ return(adapter_macro('collect_freshness', source, loaded_at_field))}} +{% macro collect_freshness(source, loaded_at_field, filter) %} + {{ return(adapter_macro('collect_freshness', source, loaded_at_field, filter))}} {% endmacro %} -{% macro default__collect_freshness(source, loaded_at_field) %} - {% call statement('check_schema_exists', fetch_result=True, auto_begin=False) -%} +{% macro default__collect_freshness(source, loaded_at_field, filter) %} + {% call statement('collect_freshness', fetch_result=True, auto_begin=False) -%} select max({{ loaded_at_field }}) as max_loaded_at, {{ current_timestamp() }} as snapshotted_at from {{ source }} + {% if filter %} + where {{ filter }} + {% endif %} {% endcall %} - {{ return(load_result('check_schema_exists').table) }} + {{ return(load_result('collect_freshness').table) }} {% endmacro %} {% macro make_temp_relation(base_relation, suffix='__dbt_tmp') %} diff --git a/core/dbt/node_runners.py b/core/dbt/node_runners.py index 1ed2bf47448..13a1bcb60af 100644 --- a/core/dbt/node_runners.py +++ b/core/dbt/node_runners.py @@ -456,6 +456,15 @@ def from_run_result(self, result, start_time, timing_info): return result def execute(self, compiled_node, manifest): + # we should only be here if we compiled_node.has_freshness, and + # therefore loaded_at_field should be a str. If this invariant is + # broken, raise! + if compiled_node.loaded_at_field is None: + raise InternalException( + 'Got to execute for source freshness of a source that has no ' + 'loaded_at_field!' + ) + relation = self.adapter.Relation.create_from_source(compiled_node) # given a Source, calculate its fresnhess. with self.adapter.connection_named(compiled_node.unique_id): @@ -463,6 +472,7 @@ def execute(self, compiled_node, manifest): freshness = self.adapter.calculate_freshness( relation, compiled_node.loaded_at_field, + compiled_node.freshness.filter, manifest=manifest ) diff --git a/core/dbt/utils.py b/core/dbt/utils.py index c3a4ffb5e6d..10016c05999 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -8,7 +8,7 @@ import json import os from enum import Enum -from typing import Tuple, Type, Any, Optional +from typing import Tuple, Type, Any, Optional, TypeVar, Dict import dbt.exceptions @@ -433,16 +433,20 @@ def parse_cli_vars(var_string): raise -def filter_null_values(input): +K_T = TypeVar('K_T') +V_T = TypeVar('V_T') + + +def filter_null_values(input: Dict[K_T, V_T]) -> Dict[K_T, V_T]: return dict((k, v) for (k, v) in input.items() if v is not None) -def add_ephemeral_model_prefix(s): +def add_ephemeral_model_prefix(s: str) -> str: return '__dbt__CTE__{}'.format(s) -def timestring(): +def timestring() -> str: """Get the current datetime as an RFC 3339-compliant string""" # isoformat doesn't include the mandatory trailing 'Z' for UTC. return datetime.datetime.utcnow().isoformat() + 'Z' diff --git a/test/integration/029_docs_generate_tests/test_docs_generate.py b/test/integration/029_docs_generate_tests/test_docs_generate.py index a6c59ec40cb..69107fcf221 100644 --- a/test/integration/029_docs_generate_tests/test_docs_generate.py +++ b/test/integration/029_docs_generate_tests/test_docs_generate.py @@ -1351,8 +1351,8 @@ def expected_postgres_references_manifest(self, model_database=None): 'documentation_name': 'source_info', 'documentation_package': '', }, - ], - 'freshness': {'error_after': None, 'warn_after': None}, + ], + 'freshness': {'error_after': None, 'warn_after': None, 'filter': None}, 'identifier': 'seed', 'loaded_at_field': None, 'loader': 'a_loader', diff --git a/test/integration/042_sources_test/filtered_models/schema.yml b/test/integration/042_sources_test/filtered_models/schema.yml new file mode 100644 index 00000000000..edad7f6ecfb --- /dev/null +++ b/test/integration/042_sources_test/filtered_models/schema.yml @@ -0,0 +1,18 @@ +version: 2 +sources: + - name: test_source + loader: custom + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + filter: id > 1 + schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}" + quoting: + identifier: True + tables: + - name: test_table + identifier: source + loaded_at_field: updated_at + freshness: + error_after: {count: 18, period: hour} + filter: id > 101 diff --git a/test/integration/042_sources_test/test_sources.py b/test/integration/042_sources_test/test_sources.py index 6a0ed27602f..a66390b38ff 100644 --- a/test/integration/042_sources_test/test_sources.py +++ b/test/integration/042_sources_test/test_sources.py @@ -51,6 +51,32 @@ class SuccessfulSourcesTest(BaseSourcesTest): def setUp(self): super().setUp() self.run_dbt_with_vars(['seed'], strict=False) + self.maxDiff = None + self._id = 101 + # this is the db initial value + self.last_inserted_time = "2016-09-19T14:45:51+00:00" + + def _set_updated_at_to(self, delta): + insert_time = datetime.utcnow() + delta + timestr = insert_time.strftime("%Y-%m-%d %H:%M:%S") + # favorite_color,id,first_name,email,ip_address,updated_at + insert_id = self._id + self._id += 1 + raw_sql = """INSERT INTO {schema}.{source} + (favorite_color,id,first_name,email,ip_address,updated_at) + VALUES ( + 'blue',{id},'Jake','abc@example.com','192.168.1.1','{time}' + )""" + self.run_sql( + raw_sql, + kwargs={ + 'schema': self.unique_schema(), + 'time': timestr, + 'id': insert_id, + 'source': self.adapter.quote('source'), + } + ) + self.last_inserted_time = insert_time.strftime("%Y-%m-%dT%H:%M:%S+00:00") class TestSources(SuccessfulSourcesTest): @@ -171,37 +197,6 @@ def test_postgres_run_operation_source(self): class TestSourceFreshness(SuccessfulSourcesTest): - def setUp(self): - super().setUp() - self.maxDiff = None - self._id = 100 - # this is the db initial value - self.last_inserted_time = "2016-09-19T14:45:51+00:00" - - # test_source.test_table should have a loaded_at field of `updated_at` - # and a freshness of warn_after: 10 hours, error_after: 18 hours - # by default, our data set is way out of date! - def _set_updated_at_to(self, delta): - insert_time = datetime.utcnow() + delta - timestr = insert_time.strftime("%Y-%m-%d %H:%M:%S") - # favorite_color,id,first_name,email,ip_address,updated_at - insert_id = self._id - self._id += 1 - raw_sql = """INSERT INTO {schema}.{source} - (favorite_color,id,first_name,email,ip_address,updated_at) - VALUES ( - 'blue',{id},'Jake','abc@example.com','192.168.1.1','{time}' - )""" - self.run_sql( - raw_sql, - kwargs={ - 'schema': self.unique_schema(), - 'time': timestr, - 'id': insert_id, - 'source': self.adapter.quote('source'), - } - ) - self.last_inserted_time = insert_time.strftime("%Y-%m-%dT%H:%M:%S+00:00") def _assert_freshness_results(self, path, state): self.assertTrue(os.path.exists(path)) @@ -233,6 +228,9 @@ def _assert_freshness_results(self, path, state): }) def _run_source_freshness(self): + # test_source.test_table should have a loaded_at field of `updated_at` + # and a freshness of warn_after: 10 hours, error_after: 18 hours + # by default, our data set is way out of date! self.freshness_start_time = datetime.utcnow() results = self.run_dbt_with_vars( ['source', 'snapshot-freshness', '-o', 'target/error_source.json'], @@ -300,6 +298,38 @@ def test_postgres_error(self): self.assertIsNotNone(results[0].error) +class TestSourceFreshnessFilter(SuccessfulSourcesTest): + @property + def models(self): + return 'filtered_models' + + def assert_source_freshness_passed(self, results): + self.assertEqual(len(results), 1) + self.assertEqual(results[0].status, 'pass') + self.assertFalse(results[0].fail) + self.assertIsNone(results[0].error) + + def assert_source_freshness_failed(self, results): + self.assertEqual(len(results), 1) + self.assertEqual(results[0].status, 'error') + self.assertTrue(results[0].fail) + self.assertIsNone(results[0].error) + + @use_profile('postgres') + def test_postgres_all_records(self): + # all records are filtered out + self.run_dbt_with_vars(['source', 'snapshot-freshness'], expect_pass=False) + # we should insert a record with #101 that's fresh, but will still fail + # because the filter excludes it + self._set_updated_at_to(timedelta(hours=-2)) + self.run_dbt_with_vars(['source', 'snapshot-freshness'], expect_pass=False) + + # we should now insert a record with #102 that's fresh, and the filter + # includes it + self._set_updated_at_to(timedelta(hours=-2)) + results = self.run_dbt_with_vars(['source', 'snapshot-freshness'], expect_pass=True) + + class TestMalformedSources(BaseSourcesTest): # even seeds should fail, because parsing is what's raising @property diff --git a/third-party-stubs/agate/__init__.pyi b/third-party-stubs/agate/__init__.pyi index 19dbff85e4c..b1948fbb394 100644 --- a/third-party-stubs/agate/__init__.pyi +++ b/third-party-stubs/agate/__init__.pyi @@ -1,8 +1,28 @@ from collections import Sequence -from typing import Any, Optional +from typing import Any, Optional, Callable -from . import data_types +from . import data_types as data_types + + +class MappedSequence(Sequence): + def __init__(self, values: Any, keys: Optional[Any] = ...) -> None: ... + def __unicode__(self): ... + def __getitem__(self, key: Any): ... + def __setitem__(self, key: Any, value: Any) -> None: ... + def __iter__(self): ... + def __len__(self): ... + def __eq__(self, other: Any): ... + def __ne__(self, other: Any): ... + def __contains__(self, value: Any): ... + def keys(self): ... + def values(self): ... + def items(self): ... + def get(self, key: Any, default: Optional[Any] = ...): ... + def dict(self): ... + + +class Row(MappedSequence): ... class Table: @@ -22,28 +42,9 @@ class Table: def rows(self): ... def print_csv(self, **kwargs: Any) -> None: ... def print_json(self, **kwargs: Any) -> None: ... + def where(self, test: Callable[[Row], bool]) -> 'Table': ... class TypeTester: def __init__(self, force: Any = ..., limit: Optional[Any] = ..., types: Optional[Any] = ...) -> None: ... def run(self, rows: Any, column_names: Any): ... - - -class MappedSequence(Sequence): - def __init__(self, values: Any, keys: Optional[Any] = ...) -> None: ... - def __unicode__(self): ... - def __getitem__(self, key: Any): ... - def __setitem__(self, key: Any, value: Any) -> None: ... - def __iter__(self): ... - def __len__(self): ... - def __eq__(self, other: Any): ... - def __ne__(self, other: Any): ... - def __contains__(self, value: Any): ... - def keys(self): ... - def values(self): ... - def items(self): ... - def get(self, key: Any, default: Optional[Any] = ...): ... - def dict(self): ... - - -class Row(MappedSequence): ...