Skip to content

Commit

Permalink
Fix!: Handle lookback correctly on incremental models with an end dat…
Browse files Browse the repository at this point in the history
…e defined
  • Loading branch information
erindru committed Jul 30, 2024
1 parent 4c64455 commit 18d8f1b
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 10 deletions.
33 changes: 23 additions & 10 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,10 +902,23 @@ def missing_intervals(
upper_bound_ts = to_timestamp(execution_time)
end_ts = min(end_ts, upper_bound_ts)

lookback = self.model.lookback if self.is_model else 0
lookback = 0
model_end_ts: t.Optional[int] = None

if self.is_model:
lookback = self.model.lookback
model_end_ts = (
to_timestamp(make_inclusive_end(self.model.end)) if self.model.end else None
)

return compute_missing_intervals(
interval_unit, tuple(intervals), start_ts, end_ts, upper_bound_ts, lookback
interval_unit,
tuple(intervals),
start_ts,
end_ts,
upper_bound_ts,
lookback,
model_end_ts,
)

def categorize_as(self, category: SnapshotChangeCategory) -> None:
Expand Down Expand Up @@ -1641,6 +1654,7 @@ def compute_missing_intervals(
end_ts: int,
upper_bound_ts: int,
lookback: int,
model_end_ts: t.Optional[int],
) -> Intervals:
"""Computes all missing intervals between start and end given intervals.
Expand All @@ -1651,6 +1665,7 @@ def compute_missing_intervals(
end_ts: Exclusive timestamp end.
upper_bound_ts: The exclusive upper bound timestamp for lookback.
lookback: A lookback window.
model_end_ts: The inclusive end timestamp set on the model (if one is set)
Returns:
A list of all timestamps in this range.
Expand All @@ -1660,20 +1675,17 @@ def compute_missing_intervals(

# get all individual timestamps with the addition of extra lookback timestamps up to the execution date
# when a model has lookback, we need to check all the intervals between itself and its lookback exist.
intervals_beyond_end_ts = 0
while True:
ts = to_timestamp(croniter.get_next(estimate=True))

if ts < end_ts:
timestamps.append(ts)
else:
croniter.get_prev(estimate=True)
break

for _ in range(lookback):
ts = to_timestamp(croniter.get_next(estimate=True))
if ts < upper_bound_ts:
elif lookback and ts < upper_bound_ts:
timestamps.append(ts)
intervals_beyond_end_ts += 1
else:
croniter.get_prev(estimate=True)
break

missing = []
Expand All @@ -1697,7 +1709,8 @@ def compute_missing_intervals(
elif current_ts >= low and compare_ts < high:
break
else:
missing.append((current_ts, next_ts))
if model_end_ts is None or compare_ts < model_end_ts or i > intervals_beyond_end_ts + 1:
missing.append((current_ts, next_ts))

return missing

Expand Down
79 changes: 79 additions & 0 deletions tests/core/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,85 @@ def test_missing_intervals_end_bounded_with_ignore_cron(make_snapshot):
]


def test_missing_intervals_past_end_date_with_lookback(make_snapshot):
snapshot: Snapshot = make_snapshot(
SqlModel(
name="test_model",
kind=IncrementalByTimeRangeKind(time_column=TimeColumn(column="ds"), lookback=2),
owner="owner",
cron="@daily",
query=parse_one("SELECT 1, ds FROM name"),
start="2023-01-01",
end="2023-01-05", # inclusive, equivalent to to_timestamp('2023-01-05 23:59:59.999999')
)
)

start_time = to_timestamp("2023-01-01")
end_time = to_timestamp(
"2023-01-06"
) # exclusive because to_timestamp() returns a timestamp and not a date
assert snapshot.inclusive_exclusive(snapshot.node.start, snapshot.node.end) == (
start_time,
end_time,
)

# baseline - all intervals missing
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == [
(to_timestamp("2023-01-01"), to_timestamp("2023-01-02")),
(to_timestamp("2023-01-02"), to_timestamp("2023-01-03")),
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
]

# fully backfill model - no intervals missing
snapshot.add_interval(start_time, end_time)

# even though lookback=2, because every interval has been filled,
# there should be no missing intervals
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == []

# however, when running for a new interval, this triggers lookback
# in this case, we remove the most recent interval (the one for 2023-01-05) to simulate it being new
# since lookback=2 days, this triggers missing intervals for 2023-01-03, 2023-01-04, 2023-01-05
snapshot.remove_interval(interval=(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")))
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == [
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
]

# put the interval we just removed back to make the model fully backfilled again
snapshot.add_interval(to_timestamp("2023-01-05"), to_timestamp("2023-01-06"))
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == []

# running on the end date + 1 day (2023-01-07)
# 2023-01-06 "would" run and since lookback=2 this pulls in 2023-01-04 and 2023-01-05 as well
# however, only 2023-01-04 and 2023-01-05 are within the model end date
end_time = to_timestamp("2023-01-07")
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == [
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
]

# running on the end date + 2 days (2023-01-08)
# 2023-01-07 "would" run and since lookback=2 this pulls in 2023-01-06 and 2023-01-05 as well
# however, only 2023-01-05 is within the model end date
end_time = to_timestamp("2023-01-08")
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == [
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06"))
]

# running on the end date + 3 days (2023-01-09)
# no missing intervals because subtracting 2 days for lookback exceeds the models end date
end_time = to_timestamp("2023-01-09")
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == []

# running way in the future, no missing intervals because subtracting 2 days for lookback still exceeds the models end date
end_time = to_timestamp("2024-01-01")
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == []


def test_incremental_time_self_reference(make_snapshot):
snapshot = make_snapshot(
SqlModel(
Expand Down

0 comments on commit 18d8f1b

Please sign in to comment.