diff --git a/.changes/unreleased/Under the Hood-20241017-101532.yaml b/.changes/unreleased/Under the Hood-20241017-101532.yaml new file mode 100644 index 000000000..122f063f1 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20241017-101532.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Applies "Initial refactoring of incremental materialization" (dbt-labs/dbt-core#5359). +time: 2024-10-17T10:15:32.88591-07:00 +custom: + Author: hiloboy0119 + Issue: "1123" diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e4fc66ccc..a4e2c7a45 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -49,7 +49,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: '3.8' + python-version: '3.9' - name: Install python dependencies run: | @@ -75,7 +75,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] steps: - name: Check out the repository @@ -126,7 +126,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: '3.8' + python-version: '3.9' - name: Install python dependencies run: | @@ -173,7 +173,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, macos-12, windows-latest] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] steps: - name: Set up Python ${{ matrix.python-version }} diff --git a/.github/workflows/release-internal.yml b/.github/workflows/release-internal.yml index 1a5090312..702ef9aea 100644 --- a/.github/workflows/release-internal.yml +++ b/.github/workflows/release-internal.yml @@ -37,7 +37,7 @@ defaults: shell: "bash" env: - PYTHON_TARGET_VERSION: 3.8 + PYTHON_TARGET_VERSION: 3.9 jobs: run-unit-tests: diff --git a/.github/workflows/release-prep.yml b/.github/workflows/release-prep.yml index 9937463d3..d5878ec1e 100644 --- a/.github/workflows/release-prep.yml +++ b/.github/workflows/release-prep.yml @@ -84,7 +84,7 @@ defaults: shell: bash env: - PYTHON_TARGET_VERSION: 3.8 + PYTHON_TARGET_VERSION: 3.9 NOTIFICATION_PREFIX: "[Release Preparation]" jobs: @@ -448,7 +448,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] steps: - name: Check out the repository diff --git a/Makefile b/Makefile index ff4c0fc1b..76b83e8e0 100644 --- a/Makefile +++ b/Makefile @@ -25,8 +25,8 @@ unit: ## Runs unit tests with py38. test: ## Runs unit tests with py38 and code checks against staged changes. @\ python -m pytest tests/unit; \ - python dagger/run_dbt_spark_tests.py --profile spark_session \ - pre-commit run --all-files + pre-commit run --all-files; \ + python dagger/run_dbt_spark_tests.py --profile spark_session .PHONY: clean @echo "cleaning repo" diff --git a/dagger/run_dbt_spark_tests.py b/dagger/run_dbt_spark_tests.py index 67fa56587..6c310a6f8 100644 --- a/dagger/run_dbt_spark_tests.py +++ b/dagger/run_dbt_spark_tests.py @@ -104,7 +104,7 @@ async def test_spark(test_args): platform = dagger.Platform("linux/amd64") tst_container = ( client.container(platform=platform) - .from_("python:3.8-slim") + .from_("python:3.9-slim") .with_mounted_cache("/var/cache/apt/archives", os_reqs_cache) .with_mounted_cache("/root/.cache/pip", pip_cache) # install OS deps first so any local changes don't invalidate the cache diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index d33ebde20..7cdd7e122 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -516,6 +516,12 @@ def debug_query(self) -> None: """Override for DebugTask method""" self.execute("select 1 as id") + def valid_incremental_strategies(self) -> list[str]: + """The set of standard builtin strategies which this adapter supports out-of-the-box. + Not used to validate custom strategies defined by end users. + """ + return ["append", "merge", "insert_overwrite", "microbatch"] + # spark does something interesting with joins when both tables have the same # static values for the join condition and complains that the join condition is diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql index 77bfc59c9..7b5e60d3e 100644 --- a/dbt/include/spark/macros/materializations/incremental/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -1,11 +1,13 @@ {% materialization incremental, adapter='spark', supported_languages=['sql', 'python'] -%} {#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#} {%- set raw_file_format = config.get('file_format', default='parquet') -%} - {%- set raw_strategy = config.get('incremental_strategy') or 'append' -%} + {% set raw_strategy = config.get('incremental_strategy') or 'default' %} {%- set grant_config = config.get('grants') -%} {%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%} {%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%} + {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} + {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, strategy) %} {#-- Set vars --#} @@ -56,8 +58,11 @@ {{ create_table_as(True, tmp_relation, compiled_code, language) }} {%- endcall -%} {%- do process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%} + {#-- call the incremental strategy macro --#} + {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %} + {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} {%- call statement('main') -%} - {{ dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, existing_relation, unique_key, incremental_predicates) }} + {{ build_sql }} {%- endcall -%} {%- if language == 'python' -%} {#-- @@ -83,3 +88,19 @@ {{ return({'relations': [target_relation]}) }} {%- endmaterialization %} + +{% macro spark__get_incremental_default_sql(arg_dict) %} + {% do return(spark__get_incremental_append_sql(arg_dict)) %} +{% endmacro %} +{% macro spark__get_incremental_append_sql(arg_dict) %} + {% do return(dbt_spark_get_incremental_sql("append", arg_dict["temp_relation"], arg_dict["target_relation"], load_relation(this), arg_dict["unique_key"], arg_dict["predicates"])) %} +{% endmacro %} +{% macro spark__get_incremental_merge_sql(arg_dict) %} + {% do return(dbt_spark_get_incremental_sql("merge", arg_dict["temp_relation"], arg_dict["target_relation"], load_relation(this), arg_dict["unique_key"], arg_dict["predicates"])) %} +{% endmacro %} +{% macro spark__get_incremental_insert_overwrite_sql(arg_dict) %} + {% do return(dbt_spark_get_incremental_sql("insert_overwrite", arg_dict["temp_relation"], arg_dict["target_relation"], load_relation(this), arg_dict["unique_key"], arg_dict["predicates"])) %} +{% endmacro %} +{% macro spark__get_incremental_microbatch_sql(arg_dict) %} + {% do return(dbt_spark_get_incremental_sql("microbatch", arg_dict["temp_relation"], arg_dict["target_relation"], load_relation(this), arg_dict["unique_key"], arg_dict["predicates"])) %} +{% endmacro %} diff --git a/dbt/include/spark/macros/materializations/incremental/validate.sql b/dbt/include/spark/macros/materializations/incremental/validate.sql index 4a1ac9943..df224a99a 100644 --- a/dbt/include/spark/macros/materializations/incremental/validate.sql +++ b/dbt/include/spark/macros/materializations/incremental/validate.sql @@ -19,11 +19,6 @@ {% macro dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) %} {#-- Validate the incremental strategy #} - {% set invalid_strategy_msg -%} - Invalid incremental strategy provided: {{ raw_strategy }} - Expected one of: 'append', 'merge', 'insert_overwrite', 'microbatch' - {%- endset %} - {% set invalid_merge_msg -%} Invalid incremental strategy provided: {{ raw_strategy }} You can only choose this strategy when file_format is set to 'delta' or 'iceberg' or 'hudi' @@ -35,15 +30,11 @@ Use the 'append' or 'merge' strategy instead {%- endset %} - {% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'microbatch'] %} - {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} - {%-else %} - {% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %} - {% do exceptions.raise_compiler_error(invalid_merge_msg) %} - {% endif %} - {% if raw_strategy in ['insert_overwrite', 'microbatch'] and target.endpoint %} - {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} - {% endif %} + {% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %} + {% do exceptions.raise_compiler_error(invalid_merge_msg) %} + {% endif %} + {% if raw_strategy in ['insert_overwrite', 'microbatch'] and target.endpoint %} + {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} {% endif %} {% do return(raw_strategy) %} diff --git a/docker/spark-defaults.conf b/docker/spark-defaults.conf index 30ec59591..29839f7d9 100644 --- a/docker/spark-defaults.conf +++ b/docker/spark-defaults.conf @@ -4,6 +4,6 @@ spark.hadoop.datanucleus.autoCreateTables true spark.hadoop.datanucleus.schema.autoCreateTables true spark.hadoop.datanucleus.fixedDatastore false spark.serializer org.apache.spark.serializer.KryoSerializer -spark.jars.packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0 +spark.jars.packages org.apache.hudi:hudi-spark3-bundle_2.12:0.13.0 spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension spark.driver.userClassPathFirst true diff --git a/docker/spark.Dockerfile b/docker/spark.Dockerfile index 49138af50..36848f1ab 100644 --- a/docker/spark.Dockerfile +++ b/docker/spark.Dockerfile @@ -1,8 +1,8 @@ -ARG OPENJDK_VERSION=8 +ARG OPENJDK_VERSION=17 FROM eclipse-temurin:${OPENJDK_VERSION}-jre ARG BUILD_DATE -ARG SPARK_VERSION=3.3.2 +ARG SPARK_VERSION=3.3.4 ARG HADOOP_VERSION=3 LABEL org.label-schema.name="Apache Spark ${SPARK_VERSION}" \ diff --git a/setup.py b/setup.py index 00aeba60d..7ed28be15 100644 --- a/setup.py +++ b/setup.py @@ -3,10 +3,10 @@ import sys import re -# require python 3.8 or newer -if sys.version_info < (3, 8): +# require python 3.9 or newer +if sys.version_info < (3, 9): print("Error: dbt does not support this version of Python.") - print("Please upgrade to Python 3.8 or higher.") + print("Please upgrade to Python 3.9 or higher.") sys.exit(1) # require version of setuptools that supports find_namespace_packages @@ -83,11 +83,10 @@ def _get_plugin_version_dict(): "Operating System :: Microsoft :: Windows", "Operating System :: MacOS :: MacOS X", "Operating System :: POSIX :: Linux", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", ], - python_requires=">=3.8", + python_requires=">=3.9", ) diff --git a/tests/functional/adapter/incremental_strategies/test_incremental_strategies.py b/tests/functional/adapter/incremental_strategies/test_incremental_strategies.py index a44a1d23e..b5f9af030 100644 --- a/tests/functional/adapter/incremental_strategies/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental_strategies/test_incremental_strategies.py @@ -128,7 +128,6 @@ def models(self): return { "bad_file_format.sql": bad_file_format_sql, "bad_merge_not_delta.sql": bad_merge_not_delta_sql, - "bad_strategy.sql": bad_strategy_sql, } @staticmethod @@ -142,3 +141,23 @@ def run_and_test(): @pytest.mark.skip_profile("databricks_http_cluster", "spark_session") def test_bad_strategies(self, project): self.run_and_test() + + +class TestBadCustomStrategy(BaseIncrementalStrategies): + @pytest.fixture(scope="class") + def models(self): + return { + "bad_strategy.sql": bad_strategy_sql, + } + + @staticmethod + def run_and_test(): + run_results = run_dbt(["run"], expect_pass=False) + # assert all models fail with compilation errors + for result in run_results: + assert result.status == "error" + assert "dbt could not find an incremental strategy macro" in result.message + + @pytest.mark.skip_profile("databricks_http_cluster", "spark_session") + def test_bad_custom_strategies(self, project): + self.run_and_test()