Skip to content

Commit

Permalink
fix: make lookback conditional on missing intervals closes #2985 (#3002)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobymao authored Aug 13, 2024
1 parent 05feeb9 commit 3cb34bb
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 97 deletions.
101 changes: 52 additions & 49 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from pydantic import Field
from sqlglot import exp
from sqlglot.helper import seq_get
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers

from sqlmesh.core import constants as c
Expand All @@ -24,6 +23,7 @@
is_date,
make_inclusive,
make_inclusive_end,
make_exclusive,
now,
now_timestamp,
to_date,
Expand Down Expand Up @@ -856,8 +856,6 @@ def missing_intervals(
return []
if self.node.start and to_datetime(start) < to_datetime(self.node.start):
start = self.node.start
if self.node.end and make_inclusive_end(end) > make_inclusive_end(self.node.end):
end = self.node.end
# If the amount of time being checked is less than the size of a single interval then we
# know that there can't being missing intervals within that range and return
validate_date_range(start, end)
Expand Down Expand Up @@ -888,35 +886,33 @@ def missing_intervals(
)

interval_unit = self.node.interval_unit
upper_bound_ts = to_timestamp(execution_time or now())

execution_time = execution_time or now()
if end_bounded:
execution_time = min(to_timestamp(execution_time), end_ts)
if allow_partials:
end_ts = min(end_ts, upper_bound_ts)
else:
upper_bound_ts = min(upper_bound_ts, end_ts) if end_bounded else upper_bound_ts

if not ignore_cron:
upper_bound_ts = to_timestamp(self.node.cron_floor(upper_bound_ts))

if not allow_partials:
upper_bound_ts = to_timestamp(
self.node.cron_floor(execution_time) if not ignore_cron else execution_time
end_ts = min(
end_ts,
to_timestamp(interval_unit.cron_floor(upper_bound_ts)),
)
end_ts = min(end_ts, to_timestamp(interval_unit.cron_floor(upper_bound_ts)))
else:
upper_bound_ts = to_timestamp(execution_time)
end_ts = min(end_ts, upper_bound_ts)

lookback = 0
model_end_ts: t.Optional[int] = None

if self.is_model:
lookback = self.model.lookback
model_end_ts = (
to_timestamp(make_inclusive_end(self.model.end)) if self.model.end else None
)
model_end_ts = to_timestamp(make_exclusive(self.model.end)) if self.model.end else None

return compute_missing_intervals(
interval_unit,
tuple(intervals),
start_ts,
end_ts,
upper_bound_ts,
lookback,
model_end_ts,
)
Expand Down Expand Up @@ -1652,7 +1648,6 @@ def compute_missing_intervals(
intervals: t.Tuple[Interval, ...],
start_ts: int,
end_ts: int,
upper_bound_ts: int,
lookback: int,
model_end_ts: t.Optional[int],
) -> Intervals:
Expand All @@ -1663,56 +1658,64 @@ def compute_missing_intervals(
intervals: The intervals to check what's missing.
start_ts: Inclusive timestamp start.
end_ts: Exclusive timestamp end.
upper_bound_ts: The exclusive upper bound timestamp for lookback.
lookback: A lookback window.
model_end_ts: The inclusive end timestamp set on the model (if one is set)
model_end_ts: The exclusive end timestamp set on the model (if one is set)
Returns:
A list of all timestamps in this range.
"""
if start_ts == end_ts:
return []

croniter = interval_unit.croniter(start_ts)
timestamps = [start_ts]

# get all individual timestamps with the addition of extra lookback timestamps up to the execution date
# when a model has lookback, we need to check all the intervals between itself and its lookback exist.
intervals_beyond_end_ts = 0
while True:
ts = to_timestamp(croniter.get_next(estimate=True))

if ts < end_ts:
timestamps.append(ts)
elif lookback and ts < upper_bound_ts:
timestamps.append(ts)
intervals_beyond_end_ts += 1
else:
croniter.get_prev(estimate=True)
if ts > end_ts:
if len(timestamps) > 1:
timestamps[-1] = end_ts
else:
timestamps.append(end_ts)
break

missing = []
for i in range(len(timestamps)):
if timestamps[i] >= end_ts:
break
current_ts = timestamps[i]
next_ts = (
timestamps[i + 1]
if i + 1 < len(timestamps)
else min(
to_timestamp(interval_unit.cron_next(current_ts, estimate=True)), upper_bound_ts
)
)
compare_ts = seq_get(timestamps, i + lookback) or timestamps[-1]
timestamps.append(ts)

missing = set()

for current_ts, next_ts in zip(timestamps, timestamps[1:]):
for low, high in intervals:
if compare_ts < low:
missing.append((current_ts, next_ts))
if current_ts < low:
missing.add((current_ts, next_ts))
break
elif current_ts >= low and compare_ts < high:
elif current_ts >= low and next_ts <= high:
break
else:
if model_end_ts is None or compare_ts < model_end_ts or i > intervals_beyond_end_ts + 1:
missing.append((current_ts, next_ts))
missing.add((current_ts, next_ts))

return missing
if missing:
if lookback:
if model_end_ts:
croniter = interval_unit.croniter(end_ts)
end_ts = to_timestamp(croniter.get_prev(estimate=True))

while model_end_ts < end_ts:
end_ts = to_timestamp(croniter.get_prev(estimate=True))
lookback -= 1

lookback = max(lookback, 0)

for i, (current_ts, next_ts) in enumerate(zip(timestamps, timestamps[1:])):
parent = timestamps[i + lookback : i + lookback + 2]

if len(parent) < 2 or tuple(parent) in missing:
missing.add((current_ts, next_ts))

if model_end_ts:
missing = {interval for interval in missing if interval[0] < model_end_ts}

return sorted(missing)


def earliest_start_date(
Expand Down
12 changes: 8 additions & 4 deletions sqlmesh/utils/date.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,14 @@ def make_inclusive(start: TimeLike, end: TimeLike) -> Interval:


def make_inclusive_end(end: TimeLike) -> datetime:
end_dt = to_datetime(end)
if is_date(end):
end_dt = end_dt + timedelta(days=1)
return end_dt - timedelta(microseconds=1)
return make_exclusive(end) - timedelta(microseconds=1)


def make_exclusive(time: TimeLike) -> datetime:
dt = to_datetime(time)
if is_date(time):
dt = dt + timedelta(days=1)
return dt


def validate_date_range(
Expand Down
65 changes: 21 additions & 44 deletions tests/core/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,10 @@ def test_missing_intervals_partial(make_snapshot):
start = "2023-01-01"
end_ts = to_timestamp(start) + 1000
assert snapshot.missing_intervals(start, end_ts) == [
(to_timestamp(start), to_timestamp("2023-01-02")),
(to_timestamp(start), end_ts),
]
assert snapshot.missing_intervals(start, end_ts, execution_time=end_ts) == [
(to_timestamp(start), end_ts),
(to_timestamp(start), end_ts)
]
assert snapshot.missing_intervals(start, start) == [
(to_timestamp(start), to_timestamp("2023-01-02")),
Expand All @@ -325,13 +325,11 @@ def test_missing_intervals_end_bounded_with_lookback(make_snapshot):
snapshot.intervals = [(to_timestamp(start), to_timestamp(end))]

execution_ts = to_timestamp("2023-01-03")
assert snapshot.missing_intervals(start, start, execution_time=execution_ts) == [
(to_timestamp(start), to_timestamp(end)),
assert snapshot.missing_intervals(start, start, execution_time=execution_ts) == []
assert snapshot.missing_intervals(start, end, execution_time=execution_ts) == [
(to_timestamp("2023-01-01"), to_timestamp("2023-01-02")),
(to_timestamp("2023-01-02"), to_timestamp("2023-01-03")),
]
assert (
snapshot.missing_intervals(start, start, execution_time=execution_ts, end_bounded=True)
== []
)


def test_missing_intervals_end_bounded_with_ignore_cron(make_snapshot):
Expand Down Expand Up @@ -490,59 +488,38 @@ def test_lookback(snapshot: Snapshot, make_snapshot):
]

snapshot.add_interval("2023-01-01", "2023-01-04")
assert snapshot.missing_intervals("2023-01-01", "2023-01-02") == []

snapshot.add_interval("2023-01-06", "2023-01-07")
assert snapshot.missing_intervals("2023-01-03", "2023-01-03") == [
assert snapshot.missing_intervals("2023-01-01", "2023-01-04") == []
assert snapshot.missing_intervals("2023-01-01", "2023-01-05") == [
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
]
assert snapshot.missing_intervals("2023-01-04", "2023-01-04") == [
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
]

snapshot.add_interval("2023-01-06", "2023-01-07")
assert snapshot.missing_intervals("2023-01-03", "2023-01-03") == []
assert snapshot.missing_intervals("2023-01-05", "2023-01-05") == [
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
]
assert snapshot.missing_intervals("2023-01-03", "2023-01-05") == [
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
assert snapshot.missing_intervals("2023-01-06", "2023-01-07") == []
assert snapshot.missing_intervals("2023-01-05", "2023-01-08") == [
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
]
snapshot.add_interval("2023-01-05", "2023-01-05")
assert snapshot.missing_intervals("2023-01-03", "2023-01-06") == [
(to_timestamp("2023-01-06"), to_timestamp("2023-01-07")),
]

assert snapshot.missing_intervals("2023-01-29", "2023-01-29") == [
(to_timestamp("2023-01-29"), to_timestamp("2023-01-30")),
(to_timestamp("2023-01-07"), to_timestamp("2023-01-08")),
(to_timestamp("2023-01-08"), to_timestamp("2023-01-09")),
]

snapshot.add_interval("2023-01-28", "2023-01-29")
assert snapshot.missing_intervals("2023-01-27", "2023-01-27", "2023-01-30 05:00:00") == [
(to_timestamp("2023-01-27"), to_timestamp("2023-01-28")),
]
assert snapshot.missing_intervals("2023-01-28", "2023-01-28", "2023-01-30 05:00:00") == [
(to_timestamp("2023-01-28"), to_timestamp("2023-01-29")),
]
assert snapshot.missing_intervals("2023-01-29", "2023-01-29", "2023-01-30 05:00:00") == [
(to_timestamp("2023-01-29"), to_timestamp("2023-01-30")),
]
assert snapshot.missing_intervals("2023-01-27", "2023-01-29", "2023-01-30 05:00:00") == [
(to_timestamp("2023-01-27"), to_timestamp("2023-01-28")),
assert snapshot.missing_intervals("2023-01-28", "2023-01-29", "2023-01-31 05:00:00") == []
assert snapshot.missing_intervals("2023-01-28", "2023-01-30", "2023-01-31 05:00:00") == [
(to_timestamp("2023-01-28"), to_timestamp("2023-01-29")),
(to_timestamp("2023-01-29"), to_timestamp("2023-01-30")),
(to_timestamp("2023-01-30"), to_timestamp("2023-01-31")),
]

snapshot.add_interval("2023-01-28", "2023-01-30")
assert snapshot.missing_intervals("2023-01-27", "2023-01-27", "2023-01-30 05:00:00") == [
(to_timestamp("2023-01-27"), to_timestamp("2023-01-28")),
]
assert snapshot.missing_intervals("2023-01-28", "2023-01-28", "2023-01-30 05:00:00") == []
assert snapshot.missing_intervals("2023-01-29", "2023-01-29", "2023-01-30 05:00:00") == []
assert snapshot.missing_intervals("2023-01-27", "2023-01-29", "2023-01-30 05:00:00") == [
(to_timestamp("2023-01-27"), to_timestamp("2023-01-28")),
]

assert snapshot.missing_intervals("2023-01-30", "2023-01-30", "2023-01-30") == []
assert snapshot.missing_intervals("2023-01-28", "2023-01-30", "2023-01-31 04:00:00") == []


def test_seed_intervals(make_snapshot):
Expand Down Expand Up @@ -628,7 +605,7 @@ def test_missing_interval_smaller_than_interval_unit(make_snapshot):
)

assert snapshot_partial.missing_intervals("2020-01-01 00:00:05", "2020-01-01 23:59:59") == [
(to_timestamp("2020-01-01"), to_timestamp("2020-01-02"))
(to_timestamp("2020-01-01"), to_timestamp("2020-01-01 23:59:59"))
]
assert snapshot_partial.missing_intervals("2020-01-01 00:00:00", "2020-01-02 00:00:00") == [
(to_timestamp("2020-01-01"), to_timestamp("2020-01-02"))
Expand Down

0 comments on commit 3cb34bb

Please sign in to comment.