Skip to content

Commit

Permalink
fix!: correctly use execution time on new rows for scd type 2 by column
Browse files Browse the repository at this point in the history
  • Loading branch information
eakmanrq committed Sep 26, 2024
1 parent fbf941b commit f889b61
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
8 changes: 4 additions & 4 deletions docs/concepts/models/model_kinds.md
Original file line number Diff line number Diff line change
Expand Up @@ -736,10 +736,10 @@ This is the most accurate representation of the menu based on the source data pr

### SCD Type 2 By Column Configuration Options

| Name | Description | Type |
|------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
| columns | The name of the columns to check for changes. `*` to represent that all columns should be checked. | List of strings or string |
| execution_time_as_valid_from | By default, for new rows `valid_from` is set to `1970-01-01 00:00:00`. This changes the behavior to set it to the `execution_time` of when the pipeline ran. Default: `false` | bool |
| Name | Description | Type |
|------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
| columns | The name of the columns to check for changes. `*` to represent that all columns should be checked. | List of strings or string |
| execution_time_as_valid_from | By default, when the model is first loaded `valid_from` is set to `1970-01-01 00:00:00` and future new rows will have `execution_time` of when the pipeline ran. This changes the behavior to always use `execution_time`. Default: `false` | bool |

### Querying SCD Type 2 Models

Expand Down
7 changes: 5 additions & 2 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,10 @@ def remove_managed_columns(
"Cannot use `updated_at_as_valid_from` without `updated_at_name` for SCD Type 2"
)
update_valid_from_start: t.Union[str, exp.Expression] = updated_at_col
elif execution_time_as_valid_from:
# If using check_columns and the user doesn't always want execution_time for valid from
# then we only use epoch 0 if we are truncating the table and loading rows for the first time.
# All future new rows should have execution time.
elif check_columns and (execution_time_as_valid_from or not truncate):
update_valid_from_start = execution_ts
else:
update_valid_from_start = to_time_column("1970-01-01 00:00:00+00:00", time_data_type)
Expand Down Expand Up @@ -1626,7 +1629,7 @@ def remove_managed_columns(
.group_by(*unique_key),
)
# Do a full join between latest records and source table in order to combine them together
# MySQL doesn't suport full join so going to do a left then right join and remove dups with union
# MySQL doesn't support full join so going to do a left then right join and remove dups with union
# We do a left/right and filter right on only matching to remove the need to do union distinct
# which allows scd type 2 to be compatible with unhashable data types
.with_(
Expand Down
4 changes: 3 additions & 1 deletion tests/core/engine_adapter/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,7 @@ def test_scd_type_2_by_column(ctx: TestContext):
execution_time="2023-01-01",
execution_time_as_valid_from=False,
columns_to_types=ctx.columns_to_types,
truncate=True,
)
results = ctx.get_metadata_results()
assert len(results.views) == 0
Expand Down Expand Up @@ -1577,6 +1578,7 @@ def test_scd_type_2_by_column(ctx: TestContext):
execution_time="2023-01-05 00:00:00",
execution_time_as_valid_from=False,
columns_to_types=ctx.columns_to_types,
truncate=False,
)
results = ctx.get_metadata_results()
assert len(results.views) == 0
Expand Down Expand Up @@ -1633,7 +1635,7 @@ def test_scd_type_2_by_column(ctx: TestContext):
"id": 5,
"name": "e",
"status": "inactive",
"valid_from": "1970-01-01 00:00:00",
"valid_from": "2023-01-05 00:00:00",
"valid_to": pd.NaT,
},
]
Expand Down

0 comments on commit f889b61

Please sign in to comment.