Skip to content

Commit

Permalink
🐛 Add a drop table hook to drop scd tables in case of overwrite sync (a…
Browse files Browse the repository at this point in the history
…irbytehq#18015)

* Add a drop table hook to drop scd tables in case of overwrite sync

* Add an integration test for dropping SCD table on overwrite

* skip new test for Oracle and TiDB

* Add normalization run after initial reset

* Bump normalization version
  • Loading branch information
grishick authored Nov 1, 2022
1 parent 385ab7e commit 8cf5464
Show file tree
Hide file tree
Showing 13 changed files with 446 additions and 78 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.2.23
LABEL io.airbyte.version=0.2.24
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ def clean_tmp_tables(
schemas_to_remove[destination.value] = []

# based on test_type select path to source files
if test_type == "ephemeral":
if test_type == "ephemeral" or test_type == "test_reset_scd_overwrite":
if not tmp_folders:
raise TypeError("`tmp_folders` arg is not provided.")
for folder in tmp_folders:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"streams": [
{
"stream": {
"name": "stream_test_scd_drop",
"json_schema": {
"type": ["null", "object"],
"properties": {
"id": {
"type": "integer"
},
"date": {
"type": "string",
"format": "date"
},
"timestamp_col": {
"type": "string",
"format": "date-time"
},
"datetime_to_string": {
"type": "string",
"format": "date-time",
"airbyte_type": "timestamp_with_timezone"
},
"string_to_dt": {
"type": "string"
},
"number_to_int": {
"type": "number"
},
"int_to_number": {
"type": "integer"
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"cursor_field": ["date"],
"destination_sync_mode": "append_dedup",
"primary_key": [["id"]]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"streams": [
{
"stream": {
"name": "stream_test_scd_drop",
"json_schema": {
"type": ["null", "object"],
"properties": {
"id": {
"type": "integer"
},
"date": {
"type": "string",
"format": "date"
},
"timestamp_col": {
"type": "string",
"format": "date-time"
},
"datetime_to_string": {
"type": "string"
},
"string_to_dt": {
"type": "string",
"format": "date-time",
"airbyte_type": "timestamp_with_timezone"
},
"number_to_int": {
"type": "integer"
},
"int_to_number": {
"type": "number"
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"cursor_field": ["date"],
"destination_sync_mode": "append_dedup",
"primary_key": [["id"]]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"streams": [
{
"stream": {
"name": "stream_test_scd_drop",
"json_schema": {
"type": ["null", "object"],
"properties": {
"id": {
"type": "integer"
},
"date": {
"type": "string",
"format": "date"
},
"timestamp_col": {
"type": "string",
"format": "date-time"
},
"datetime_to_string": {
"type": "string"
},
"string_to_dt": {
"type": "string",
"format": "date-time",
"airbyte_type": "timestamp_with_timezone"
},
"number_to_int": {
"type": "integer"
},
"int_to_number": {
"type": "number"
}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"cursor_field": ["date"],
"destination_sync_mode": "overwrite",
"primary_key": [["id"]]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637589000, "data": { "id": 1, "date": "2022-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "datetime_to_string":"2022-10-01T01:04:04-04:00", "string_to_dt":"2022-11-01T02:03:04-07:00", "number_to_int": 1, "int_to_number": 10}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637689100, "data": { "id": 2, "date": "2022-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "datetime_to_string":"2022-10-02T01:04:04-04:00", "string_to_dt":"2022-11-02T03:04:05-07:00", "number_to_int": 10, "int_to_number": 11}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637789200, "data": { "id": 3, "date": "2022-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "datetime_to_string":"2022-10-03T01:04:04-04:00", "string_to_dt":"2022-11-03T03:04:06-07:00", "number_to_int": 11, "int_to_number": 12}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637889300, "data": { "id": 4, "date": "2022-09-01", "timestamp_col": "2020-08-31T00:00:00+0000", "datetime_to_string":"2022-10-04T01:04:04-04:00", "string_to_dt":"2022-11-04T03:04:07-07:00", "number_to_int": 111, "int_to_number": 133}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637989400, "data": { "id": 5, "date": "2022-09-02", "timestamp_col": "2020-09-01T00:00:00Z", "datetime_to_string":"2022-10-05T01:04:04-04:00", "string_to_dt":"2022-11-05T03:04:08-12:00", "number_to_int": 1010, "int_to_number": 1300}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637589000, "data": { "id": 1, "date": "2022-08-29", "timestamp_col": "2020-08-29T00:00:00.000000-0000", "datetime_to_string":"2022-10-01T01:04:04-04:00", "string_to_dt":"2022-11-01T02:03:04-07:00", "number_to_int": 1, "int_to_number": 10}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637689100, "data": { "id": 2, "date": "2022-08-30", "timestamp_col": "2020-08-30T00:00:00.000-00", "datetime_to_string":"2022-10-02T01:04:04-04:00", "string_to_dt":"2022-11-02T03:04:05-07:00", "number_to_int": 10, "int_to_number": 11}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637789200, "data": { "id": 3, "date": "2022-08-31", "timestamp_col": "2020-08-31T00:00:00+00", "datetime_to_string":"2022-10-03T01:04:04-04:00", "string_to_dt":"2022-11-03T03:04:06-07:00", "number_to_int": 11, "int_to_number": 12}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637889300, "data": { "id": 4, "date": "2022-09-01", "timestamp_col": "2020-08-31T00:00:00+0000", "datetime_to_string":"2022-10-04T01:04:04-04:00", "string_to_dt":"2022-11-04T03:04:07-07:00", "number_to_int": 111, "int_to_number": 133}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637989400, "data": { "id": 5, "date": "2022-09-02", "timestamp_col": "2020-09-01T00:00:00Z", "datetime_to_string":"2022-10-05T01:04:04-04:00", "string_to_dt":"2022-11-05T03:04:08-12:00", "number_to_int": 1010, "int_to_number": 1300}}}
{"type": "RECORD", "record": {"stream": "stream_test_scd_drop", "emitted_at": 1602637989400, "data": { "id": 6, "date": "2022-09-03", "timestamp_col": "2020-09-01T00:00:00Z", "datetime_to_string":"this is a string, not a datetime value", "string_to_dt":"2022-11-05T03:04:08-12:00", "number_to_int": 1010, "int_to_number": 1300.25}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
select * from {{ ref('test_scd_drop_row_counts') }}
where row_count != expected_count
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import os
import pathlib
import shutil

import pytest
from integration_tests.dbt_integration_test import DbtIntegrationTest
from integration_tests.utils import generate_dbt_models, run_destination_process, setup_test_dir
from normalization import DestinationType

temporary_folders = set()
dbt_test_utils = DbtIntegrationTest()


@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
destinations_to_test = dbt_test_utils.get_test_targets()
# set clean-up args to clean target destination after the test
clean_up_args = {
"destination_type": [d for d in DestinationType if d.value in destinations_to_test],
"test_type": "test_reset_scd_overwrite",
"tmp_folders": temporary_folders,
}
dbt_test_utils.set_target_schema("test_reset_scd_overwrite")
dbt_test_utils.change_current_test_dir(request)
dbt_test_utils.setup_db(destinations_to_test)
os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"]
yield
dbt_test_utils.clean_tmp_tables(**clean_up_args)
dbt_test_utils.tear_down_db()
for folder in temporary_folders:
print(f"Deleting temporary test folder {folder}")
shutil.rmtree(folder, ignore_errors=True)


@pytest.fixture
def setup_test_path(request):
dbt_test_utils.change_current_test_dir(request)
print(f"Running from: {pathlib.Path().absolute()}")
print(f"Current PATH is: {os.environ['PATH']}")
yield
os.chdir(request.config.invocation_dir)


@pytest.mark.parametrize("destination_type", list(DestinationType))
def test_reset_scd_on_overwrite(destination_type: DestinationType, setup_test_path):
if destination_type.value not in dbt_test_utils.get_test_targets():
pytest.skip(f"Destinations {destination_type} is not in NORMALIZATION_TEST_TARGET env variable")

if destination_type.value in [DestinationType.ORACLE.value, DestinationType.TIDB.value]:
# Oracle and TiDB do not support incremental syncs with schema changes yet
pytest.skip(f"{destination_type} does not support incremental sync with schema change yet")
elif destination_type.value == DestinationType.REDSHIFT.value:
# set unique schema for Redshift test
dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_reset_scd_"))

test_resource_name = "test_reset_scd_overwrite"
# Select target schema
target_schema = dbt_test_utils.target_schema

try:
print(f"Testing resetting SCD tables on overwrite with {destination_type} in schema {target_schema}")
run_reset_scd_on_overwrite_test(destination_type, test_resource_name)
finally:
dbt_test_utils.set_target_schema(target_schema)


def run_reset_scd_on_overwrite_test(destination_type: DestinationType, test_resource_name: str):
# Generate DBT profile yaml
integration_type = destination_type.value
test_root_dir = setup_test_dir(integration_type, temporary_folders)
destination_config = dbt_test_utils.generate_profile_yaml_file(destination_type, test_root_dir)
test_directory = os.path.join(test_root_dir, "models/generated")
shutil.rmtree(test_directory, ignore_errors=True)

# Generate config file for the destination
config_file = os.path.join(test_root_dir, "destination_config.json")
with open(config_file, "w") as f:
f.write(json.dumps(destination_config))

# make sure DBT dependencies are installed
dbt_test_utils.dbt_check(destination_type, test_root_dir)

# Generate catalog for an initial reset/cleanup (pre-test)
original_catalog_file = os.path.join("resources", test_resource_name, "data_input", "test_drop_scd_catalog.json")
dbt_test_utils.copy_replace(
original_catalog_file,
os.path.join(test_root_dir, "initial_reset_catalog.json"),
pattern='"destination_sync_mode": ".*"',
replace_value='"destination_sync_mode": "overwrite"',
)

# Force a reset in destination raw tables to remove any data left over from previous test runs
assert run_destination_process(destination_type, test_root_dir, "", "initial_reset_catalog.json", dbt_test_utils)
# generate models from catalog
generate_dbt_models(destination_type, test_resource_name, test_root_dir, "models", "test_drop_scd_catalog_reset.json", dbt_test_utils)

# Run dbt process to normalize data from the first sync
dbt_test_utils.dbt_run(destination_type, test_root_dir, force_full_refresh=True)

# Remove models generated in previous step to avoid DBT compilation errors
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_incremental")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_views")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_ctes")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_tables")
shutil.rmtree(test_directory, ignore_errors=True)

# Run the first sync to create raw tables in destinations
dbt_test_utils.copy_replace(original_catalog_file, os.path.join(test_root_dir, "destination_catalog.json"))
message_file = os.path.join("resources", test_resource_name, "data_input", "test_drop_scd_messages.txt")
assert run_destination_process(destination_type, test_root_dir, message_file, "destination_catalog.json", dbt_test_utils)

# generate models from catalog
generate_dbt_models(destination_type, test_resource_name, test_root_dir, "models", "test_drop_scd_catalog.json", dbt_test_utils)

# Run dbt process to normalize data from the first sync
dbt_test_utils.dbt_run(destination_type, test_root_dir, force_full_refresh=True)

# Remove models generated in previous step to avoid DBT compilation errors
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_incremental")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_views")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_ctes")
shutil.rmtree(test_directory, ignore_errors=True)

# Generate a catalog with modified schema for a reset
reset_catalog_file = os.path.join("resources", test_resource_name, "data_input", "test_drop_scd_catalog_reset.json")
dbt_test_utils.copy_replace(reset_catalog_file, os.path.join(test_root_dir, "reset_catalog.json"))

# Run a reset
assert run_destination_process(destination_type, test_root_dir, "", "reset_catalog.json", dbt_test_utils)

# Run dbt process after reset to drop SCD table
generate_dbt_models(destination_type, test_resource_name, test_root_dir, "models", "test_drop_scd_catalog_reset.json", dbt_test_utils)
dbt_test_utils.dbt_run(destination_type, test_root_dir, force_full_refresh=True)

# Remove models generated in previous step to avoid DBT compilation errors
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_incremental")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_views")
shutil.rmtree(test_directory, ignore_errors=True)
test_directory = os.path.join(test_root_dir, "models/generated/airbyte_ctes")
shutil.rmtree(test_directory, ignore_errors=True)

# Run another sync with modified catalog
modified_catalog_file = os.path.join("resources", test_resource_name, "data_input", "test_drop_scd_catalog_incremental.json")
dbt_test_utils.copy_replace(modified_catalog_file, os.path.join(test_root_dir, "destination_catalog.json"))
message_file = os.path.join("resources", test_resource_name, "data_input", "test_scd_reset_messages_incremental.txt")
assert run_destination_process(destination_type, test_root_dir, message_file, "destination_catalog.json", dbt_test_utils)

# Run dbt process
generate_dbt_models(destination_type, test_resource_name, test_root_dir, "models", "test_drop_scd_catalog_reset.json", dbt_test_utils)
dbt_test_utils.dbt_run(destination_type, test_root_dir)
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import pathlib
import re
import shutil
import tempfile
from distutils.dir_util import copy_tree
from typing import Any, Dict

import pytest
from integration_tests.dbt_integration_test import DbtIntegrationTest
from integration_tests.utils import setup_test_dir
from normalization.destination_type import DestinationType
from normalization.transform_catalog import TransformCatalog

Expand Down Expand Up @@ -108,7 +107,7 @@ def run_test(destination_type: DestinationType, column_count: int, expected_exce
print("Testing ephemeral")
integration_type = destination_type.value
# Create the test folder with dbt project and appropriate destination settings to run integration tests from
test_root_dir = setup_test_dir(integration_type)
test_root_dir = setup_test_dir(integration_type, temporary_folders)
destination_config = dbt_test_utils.generate_profile_yaml_file(destination_type, test_root_dir)
# generate a catalog and associated dbt models files
generate_dbt_models(destination_type, test_root_dir, column_count)
Expand All @@ -131,30 +130,6 @@ def search_logs_for_pattern(log_file: str, pattern: str):
return False


def setup_test_dir(integration_type: str) -> str:
"""
We prepare a clean folder to run the tests from.
"""
test_root_dir = f"{pathlib.Path().joinpath('..', 'build', 'normalization_test_output', integration_type.lower()).resolve()}"
os.makedirs(test_root_dir, exist_ok=True)
test_root_dir = tempfile.mkdtemp(dir=test_root_dir)
temporary_folders.add(test_root_dir)
shutil.rmtree(test_root_dir, ignore_errors=True)
print(f"Setting up test folder {test_root_dir}")
copy_tree("../dbt-project-template", test_root_dir)
if integration_type == DestinationType.MSSQL.value:
copy_tree("../dbt-project-template-mssql", test_root_dir)
elif integration_type == DestinationType.MYSQL.value:
copy_tree("../dbt-project-template-mysql", test_root_dir)
elif integration_type == DestinationType.ORACLE.value:
copy_tree("../dbt-project-template-oracle", test_root_dir)
elif integration_type == DestinationType.SNOWFLAKE.value:
copy_tree("../dbt-project-template-snowflake", test_root_dir)
elif integration_type == DestinationType.TIDB.value:
copy_tree("../dbt-project-template-tidb", test_root_dir)
return test_root_dir


def setup_input_raw_data(integration_type: str, test_root_dir: str, destination_config: Dict[str, Any]) -> bool:
"""
This should populate the associated "raw" tables from which normalization is reading from when running dbt CLI.
Expand Down
Loading

0 comments on commit 8cf5464

Please sign in to comment.