Skip to content

Commit

Permalink
Include adapter_response for freshness queries
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Nov 15, 2022
1 parent 73116fb commit a066c1e
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
6 changes: 4 additions & 2 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,8 @@ def calculate_freshness(
}

# run the macro
table = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, manifest=manifest)
result = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, manifest=manifest)
adapter_response, table = result.response, result.table
# now we have a 1-row table of the maximum `loaded_at_field` value and
# the current time according to the db.
if len(table) != 1 or len(table[0]) != 2:
Expand All @@ -1114,11 +1115,12 @@ def calculate_freshness(

snapshotted_at = _utc(table[0][1], source, loaded_at_field)
age = (snapshotted_at - max_loaded_at).total_seconds()
return {
freshness = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}
return adapter_response, freshness

def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
"""A hook for running some operation before the model materialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
where {{ filter }}
{% endif %}
{% endcall %}
{{ return(load_result('collect_freshness').table) }}
{{ return(load_result('collect_freshness')) }}
{% endmacro %}
6 changes: 3 additions & 3 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ def execute(self, compiled_node, manifest):
)

relation = self.adapter.Relation.create_from_source(compiled_node)
# given a Source, calculate its fresnhess.
# given a Source, calculate its freshness.
with self.adapter.connection_for(compiled_node):
self.adapter.clear_transaction()
freshness = self.adapter.calculate_freshness(
adapter_response, freshness = self.adapter.calculate_freshness(
relation,
compiled_node.loaded_at_field,
compiled_node.freshness.filter,
Expand All @@ -124,7 +124,7 @@ def execute(self, compiled_node, manifest):
timing=[],
execution_time=0,
message=None,
adapter_response={},
adapter_response=adapter_response.to_dict(omit_none=True),
failures=None,
**freshness,
)
Expand Down

0 comments on commit a066c1e

Please sign in to comment.