Skip to content

Commit

Permalink
Chore: Treat the target environment as missing during planning if it'…
Browse files Browse the repository at this point in the history
…s expired
  • Loading branch information
izeigerman committed Aug 26, 2024
1 parent d24fe29 commit 97799e8
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 15 deletions.
1 change: 1 addition & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,7 @@ def invalidate_environment(self, name: str, sync: bool = False) -> None:
sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
be deleted asynchronously by the janitor process.
"""
name = Environment.sanitize_name(name)
self.state_sync.invalidate_environment(name)
if sync:
self._cleanup_environments()
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/context_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def create(
environment = environment.lower()
env = state_reader.get_environment(environment)

if env is None:
if env is None or env.expired:
env = state_reader.get_environment(create_from.lower())
is_new_environment = True
previously_promoted_snapshot_ids = set()
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/core/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from sqlmesh.core.config import EnvironmentSuffixTarget
from sqlmesh.core.snapshot import SnapshotId, SnapshotTableInfo
from sqlmesh.utils import word_characters_only
from sqlmesh.utils.date import TimeLike
from sqlmesh.utils.date import TimeLike, now_timestamp
from sqlmesh.utils.pydantic import PydanticModel, field_validator

T = t.TypeVar("T", bound="EnvironmentNamingInfo")
Expand Down Expand Up @@ -152,3 +152,7 @@ def naming_info(self) -> EnvironmentNamingInfo:
catalog_name_override=self.catalog_name_override,
normalize_name=self.normalize_name,
)

@property
def expired(self) -> bool:
return self.expiration_ts is not None and self.expiration_ts <= now_timestamp()
27 changes: 14 additions & 13 deletions sqlmesh/core/state_sync/engine_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,19 +284,20 @@ def promote(
)
!= table_infos[name].qualified_view_name.for_environment(environment.naming_info)
}
if environment.previous_plan_id != existing_environment.plan_id:
raise SQLMeshError(
f"Plan '{environment.plan_id}' is no longer valid for the target environment '{environment.name}'. "
f"Expected previous plan ID: '{environment.previous_plan_id}', actual previous plan ID: '{existing_environment.plan_id}'. "
"Please recreate the plan and try again"
)
if no_gaps_snapshot_names != set():
snapshots = self._get_snapshots(environment.snapshots).values()
self._ensure_no_gaps(
snapshots,
existing_environment,
no_gaps_snapshot_names,
)
if not existing_environment.expired:
if environment.previous_plan_id != existing_environment.plan_id:
raise SQLMeshError(
f"Plan '{environment.plan_id}' is no longer valid for the target environment '{environment.name}'. "
f"Expected previous plan ID: '{environment.previous_plan_id}', actual previous plan ID: '{existing_environment.plan_id}'. "
"Please recreate the plan and try again"
)
if no_gaps_snapshot_names != set():
snapshots = self._get_snapshots(environment.snapshots).values()
self._ensure_no_gaps(
snapshots,
existing_environment,
no_gaps_snapshot_names,
)
demoted_snapshots = set(existing_environment.snapshots) - set(environment.snapshots)
# Update the updated_at attribute.
self._update_snapshots(demoted_snapshots)
Expand Down
28 changes: 28 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,34 @@ def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable):
context.apply(plan)


@freeze_time("2023-01-08 15:00:00")
def test_plan_against_expired_environment(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
context.apply(plan)

model = context.get_model("sushi.waiter_revenue_by_day")
context.upsert_model(add_projection_to_model(t.cast(SqlModel, model)))

modified_models = {model.fqn, context.get_model("sushi.top_waiters").fqn}

plan = context.plan("dev", no_prompts=True)
assert plan.has_changes
assert set(plan.context_diff.modified_snapshots) == modified_models
context.apply(plan)

# Make sure there are no changes when comparing against the existing environment.
plan = context.plan("dev", no_prompts=True)
assert not plan.has_changes
assert not plan.context_diff.modified_snapshots

# Invalidate the environment and make sure that the plan detects the changes.
context.invalidate_environment("dev")
plan = context.plan("dev", no_prompts=True)
assert plan.has_changes
assert set(plan.context_diff.modified_snapshots) == modified_models
context.apply(plan)


def test_plan_twice_with_star_macro_yields_no_diff(tmp_path: Path):
init_example_project(tmp_path, dialect="duckdb")

Expand Down
26 changes: 26 additions & 0 deletions tests/core/test_state_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,32 @@ def test_promote_snapshots_parent_plan_id_mismatch(
state_sync.promote(stale_new_environment)


def test_promote_environment_expired(state_sync: EngineAdapterStateSync, make_snapshot: t.Callable):
snapshot = make_snapshot(
SqlModel(
name="a",
query=parse_one("select 1, ds"),
),
)
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

state_sync.push_snapshots([snapshot])
promote_snapshots(state_sync, [snapshot], "dev")
state_sync.invalidate_environment("dev")

new_environment = Environment(
name="dev",
snapshots=[snapshot.table_info],
start_at="2022-01-01",
end_at="2022-01-01",
plan_id="new_plan_id",
previous_plan_id=None, # No previous plan ID since it's technically a new environment
)

# This call shouldn't fail.
state_sync.promote(new_environment)


def test_promote_snapshots_no_gaps(state_sync: EngineAdapterStateSync, make_snapshot: t.Callable):
model = SqlModel(
name="a",
Expand Down

0 comments on commit 97799e8

Please sign in to comment.