Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Microbatch Config Validation #10752

Merged
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20240924-154639.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: 'Parse-time validation of microbatch configs: require event_time, batch_size,
lookback and validate input event_time'
time: 2024-09-24T15:46:39.83112+01:00
custom:
Author: michelleark
Issue: "10709"
53 changes: 53 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
register_adapter,
)
from dbt.artifacts.resources import FileHash, NodeRelation, NodeVersion
from dbt.artifacts.resources.types import BatchSize
from dbt.artifacts.schemas.base import Writable
from dbt.clients.jinja import MacroStack, get_rendered
from dbt.clients.jinja_static import statically_extract_macro_calls
Expand Down Expand Up @@ -468,6 +469,7 @@
self.check_valid_group_config()
self.check_valid_access_property()
self.check_valid_snapshot_config()
self.check_valid_microbatch_config()

semantic_manifest = SemanticManifest(self.manifest)
if not semantic_manifest.validate():
Expand Down Expand Up @@ -1355,6 +1357,57 @@
continue
node.config.final_validate()

def check_valid_microbatch_config(self):
if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"):
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
and node.config.incremental_strategy == "microbatch"
):
# Required configs: event_time, batch_size, begin
event_time = node.config.event_time
if event_time is None:
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide an 'event_time' (string) config that indicates the name of the event time column."
)
if not isinstance(event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide an 'event_time' config of type string, but got: {type(event_time)}."
)

begin = node.config.begin
if begin is None:
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide a 'begin' (datetime) config that indicates the earliest timestamp the microbatch model should be built from."
)
if not isinstance(begin, datetime.datetime):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide a 'begin' config of type datetime, but got: {type(begin)}."
)

batch_size = node.config.batch_size
valid_batch_sizes = [size.value for size in BatchSize]
if batch_size not in valid_batch_sizes:
raise dbt.exceptions.ParsingError(

Check warning on line 1391 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L1391

Added line #L1391 was not covered by tests
f"Microbatch model '{node.name}' must provide a 'batch_size' config that is one of {valid_batch_sizes}, but got: {batch_size}."
)

# Optional config: lookback (int)
lookback = node.config.lookback
if not isinstance(lookback, int) and lookback is not None:
raise dbt.exceptions.ParsingError(

Check warning on line 1398 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L1398

Added line #L1398 was not covered by tests
f"Microbatch model '{node.name}' must provide the optional 'lookback' config as type int, but got: {type(lookback)})."
)

# Validate upstream node event_time (if configured)
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time and not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(

Check warning on line 1407 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L1407

Added line #L1407 was not covered by tests
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
Expand Down
8 changes: 4 additions & 4 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@


microbatch_model_ref_render_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model').render() }}
"""

Expand Down Expand Up @@ -369,7 +369,7 @@ def test_run_with_event_time(self, project):


microbatch_model_context_vars = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}}
{{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}}
select * from {{ ref('input_model') }}
Expand Down Expand Up @@ -400,7 +400,7 @@ def test_run_with_event_time_logs(self, project):


microbatch_model_failing_incremental_partition_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{% if '2020-01-02' in (model.config.__dbt_internal_microbatch_event_time_start | string) %}
invalid_sql
{% endif %}
Expand All @@ -425,7 +425,7 @@ def test_run_with_event_time(self, project):


microbatch_model_first_partition_failing_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{% if '2020-01-01' in (model.config.__dbt_internal_microbatch_event_time_start | string) %}
invalid_sql
{% endif %}
Expand Down
129 changes: 129 additions & 0 deletions tests/functional/microbatch/test_microbatch_config_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import os
from unittest import mock

import pytest

from dbt.exceptions import ParsingError
from dbt.tests.util import run_dbt

valid_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

missing_event_time_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day') }}
select * from {{ ref('input_model') }}
"""

invalid_event_time_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time=2) }}
select * from {{ ref('input_model') }}
"""

missing_begin_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

invalid_begin_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time', begin=2) }}
select * from {{ ref('input_model') }}
"""


missing_batch_size_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

invalid_batch_size_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='invalid', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

invalid_event_time_input_model_sql = """
{{ config(materialized='table', event_time=1) }}

select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
"""

valid_input_model_sql = """
{{ config(materialized='table') }}

select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
"""


class BaseMicrobatchTest:
@pytest.fixture(scope="class")
def models(self):
return {}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_parsing_error_raised(self, project):
with pytest.raises(ParsingError):
run_dbt(["parse"])


class TestMissingEventTimeMicrobatch(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": missing_event_time_microbatch_model_sql,
}


class TestInvalidEventTimeMicrobatch(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": invalid_event_time_microbatch_model_sql,
}


class TestMissingBeginMicrobatch(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": missing_begin_microbatch_model_sql,
}


class TestInvaliBeginMicrobatch(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": invalid_begin_microbatch_model_sql,
}


class TestMissingBatchSizeMicrobatch(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": missing_batch_size_microbatch_model_sql,
}


class TestInvalidBatchSizeMicrobatch(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": invalid_batch_size_microbatch_model_sql,
}


class TestInvalidInputEventTimeMicrobatch(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": invalid_event_time_input_model_sql,
"microbatch.sql": valid_microbatch_model_sql,
}
Loading