From fb99dcb73cd616336603a7de4ad22764a4368f3f Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Mon, 26 Aug 2024 08:22:43 -0700 Subject: [PATCH] Fix: Heuristics that determine the default start date of the forward-only preview plan (#3043) --- sqlmesh/core/context.py | 10 ++++---- tests/core/test_integration.py | 42 ++++++++++++++++++++++++++-------- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 78405aa35..1e71e6021 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -40,7 +40,7 @@ import traceback import typing as t import unittest.result -from datetime import timedelta +from datetime import date, timedelta from functools import cached_property from io import StringIO from pathlib import Path @@ -1133,6 +1133,7 @@ def plan_builder( # If no end date is specified, use the max interval end from prod # to prevent unintended evaluation of the entire DAG. default_end: t.Optional[int] = None + default_start: t.Optional[date] = None max_interval_end_per_model: t.Optional[t.Dict[str, int]] = None if not run and not end: models_for_interval_end: t.Optional[t.Set[str]] = None @@ -1153,10 +1154,9 @@ def plan_builder( ) if max_interval_end_per_model: default_end = max(max_interval_end_per_model.values()) - else: - default_end = None - - default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None + default_start = to_date(min(max_interval_end_per_model.values())) - timedelta( + days=1 + ) return PlanBuilder( context_diff=context_diff, diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 124d4204d..097319f4f 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -549,13 +549,40 @@ def test_parent_cron_after_child(init_and_plan_context: t.Callable): @freeze_time("2023-01-08 00:00:00") -def test_cron_not_aligned_with_day_boundary(init_and_plan_context: t.Callable): +@pytest.mark.parametrize( + "forward_only, expected_intervals", + [ + ( + False, + [ + (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), + (to_timestamp("2023-01-02"), to_timestamp("2023-01-03")), + (to_timestamp("2023-01-03"), to_timestamp("2023-01-04")), + (to_timestamp("2023-01-04"), to_timestamp("2023-01-05")), + (to_timestamp("2023-01-05"), to_timestamp("2023-01-06")), + (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), + ], + ), + ( + True, + [ + (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), + ], + ), + ], +) +def test_cron_not_aligned_with_day_boundary( + init_and_plan_context: t.Callable, + forward_only: bool, + expected_intervals: t.List[t.Tuple[int, int]], +): context, plan = init_and_plan_context("examples/sushi") model = context.get_model("sushi.waiter_revenue_by_day") model = SqlModel.parse_obj( { **model.dict(), + "kind": model.kind.copy(update={"forward_only": forward_only}), "cron": "0 12 * * *", } ) @@ -577,18 +604,13 @@ def test_cron_not_aligned_with_day_boundary(init_and_plan_context: t.Callable): ) with freeze_time("2023-01-08 00:10:00"): # Past model's cron. - plan = context.plan("dev", select_models=[model.name], no_prompts=True, skip_tests=True) + plan = context.plan( + "dev", select_models=[model.name], no_prompts=True, skip_tests=True, enable_preview=True + ) assert plan.missing_intervals == [ SnapshotIntervals( snapshot_id=waiter_revenue_by_day_snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), - (to_timestamp("2023-01-02"), to_timestamp("2023-01-03")), - (to_timestamp("2023-01-03"), to_timestamp("2023-01-04")), - (to_timestamp("2023-01-04"), to_timestamp("2023-01-05")), - (to_timestamp("2023-01-05"), to_timestamp("2023-01-06")), - (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), - ], + intervals=expected_intervals, ), ]