Skip to content

Commit

Permalink
Addresses issues with generated models being overly eager to refresh (#…
Browse files Browse the repository at this point in the history
…2406)

* Addresses issues with generated models being overly eager to refresh

* make things a list of tuples instead
  • Loading branch information
ravenac95 authored Oct 25, 2024
1 parent dc9423b commit 6d91f4a
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 6 deletions.
2 changes: 1 addition & 1 deletion warehouse/metrics_tools/dialect/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def names(self):
return list(self._map.keys())


def send_anonymous_to_callable(anon: exp.Anonymous, f: t.Callable):
def send_anonymous_to_callable[T](anon: exp.Anonymous, f: t.Callable[..., T]):
# much of this is taken from sqlmesh.core.macros
args = []
kwargs = {}
Expand Down
30 changes: 27 additions & 3 deletions warehouse/metrics_tools/lib/factories/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from sqlmesh.core.macros import MacroEvaluator
from sqlmesh.utils.date import TimeLike

from metrics_tools.dialect.translate import CustomFuncHandler, CustomFuncRegistry
from metrics_tools.dialect.translate import (
CustomFuncHandler,
CustomFuncRegistry,
)
from metrics_tools.evaluator import FunctionsTransformer

CURR_DIR = os.path.dirname(__file__)
Expand Down Expand Up @@ -377,6 +380,26 @@ def table_name(self, ref: PeerMetricDependencyRef):
name = self._source.name or self._name
return reference_to_str(ref, name)

def dependencies(
self, ref: PeerMetricDependencyRef, peer_table_map: t.Dict[str, str]
):
dependencies: t.Set[str] = set()

for expression in self._expressions:
anonymous_expressions = expression.find_all(exp.Anonymous)
for anonymous in anonymous_expressions:
if anonymous.this == "metrics_peer_ref":
dep_name = anonymous.expressions[0].sql()
dependencies = dependencies.union(
set(
filter(
lambda a: dep_name in a,
peer_table_map.keys(),
)
)
)
return list(dependencies)

def generate_dependency_refs_for_name(self, name: str):
refs: t.List[PeerMetricDependencyRef] = []
for entity in self._source.entity_types or DEFAULT_ENTITY_TYPES:
Expand Down Expand Up @@ -808,7 +831,7 @@ class GeneratedArtifactConfig(t.TypedDict):
query_reference_name: str
query_def_as_input: MetricQueryInput
default_dialect: str
peer_table_map: t.Dict[str, str]
peer_table_tuples: t.List[t.Tuple[str, str]]
ref: PeerMetricDependencyRef
timeseries_sources: t.List[str]

Expand All @@ -818,7 +841,7 @@ def generated_entity(
query_reference_name: str,
query_def_as_input: MetricQueryInput,
default_dialect: str,
peer_table_map: t.Dict[str, str],
peer_table_tuples: t.List[t.Tuple[str, str]],
ref: PeerMetricDependencyRef,
timeseries_sources: t.List[str],
):
Expand All @@ -828,6 +851,7 @@ def generated_entity(
default_dialect=default_dialect,
source=query_def,
)
peer_table_map = dict(peer_table_tuples)
e = query.generate_query_ref(
ref,
evaluator,
Expand Down
20 changes: 18 additions & 2 deletions warehouse/metrics_tools/lib/factories/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,21 @@ def generate_models_from_query(
assert query._source.rolling
cron = query._source.rolling["cron"]

# Clean up the peer_table_map (this is a hack to prevent unnecessary
# runs when the metrics factory is updated)
query_dependencies = query.dependencies(ref, peer_table_map)
# So much of this needs to be refactored but for now this is to ensure
# that in some way that the dict doesn't randomly "change". I don't
# think this will be consistent between python machines but let's see
# for now.
reduced_peer_table_tuples = [(k, peer_table_map[k]) for k in query_dependencies]
reduced_peer_table_tuples.sort()

config = GeneratedArtifactConfig(
query_reference_name=query_reference_name,
query_def_as_input=query_def_as_input,
default_dialect=default_dialect,
peer_table_map=peer_table_map,
peer_table_tuples=reduced_peer_table_tuples,
ref=ref,
timeseries_sources=timeseries_sources,
)
Expand Down Expand Up @@ -273,5 +283,11 @@ def timeseries_metrics(
kind="VIEW",
dialect="clickhouse",
start=raw_options["start"],
columns=METRICS_COLUMNS_BY_ENTITY[entity_type],
columns={
k: METRICS_COLUMNS_BY_ENTITY[entity_type][k]
for k in filter(
lambda col: col not in ["event_source"],
METRICS_COLUMNS_BY_ENTITY[entity_type].keys(),
)
},
)
1 change: 1 addition & 0 deletions warehouse/metrics_tools/lib/local/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

project_id = os.getenv("GOOGLE_PROJECT_ID")


def bq_to_duckdb(table_mapping: t.Dict[str, str], duckdb_path: str):
"""Copies the tables in table_mapping to tables in duckdb
Expand Down

0 comments on commit 6d91f4a

Please sign in to comment.