Skip to content

Commit

Permalink
add schema ancestry tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Nov 18, 2023
1 parent 08fdd3e commit 2437983
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 2 deletions.
5 changes: 3 additions & 2 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, Li
elif hash_ != previous_hash:
stored_schema["version"] += 1
# unshift previous hash to ancestors and limit array to 10 entries
stored_schema["ancestors"].insert(0, previous_hash)
stored_schema["ancestors"] = stored_schema["ancestors"][:10]
if previous_hash not in stored_schema["ancestors"]:
stored_schema["ancestors"].insert(0, previous_hash)
stored_schema["ancestors"] = stored_schema["ancestors"][:10]

stored_schema["version_hash"] = hash_
return stored_schema["version"], hash_, stored_schema["ancestors"]
Expand Down
9 changes: 9 additions & 0 deletions tests/common/schema/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ def test_upgrade_engine_v1_schema() -> None:
assert upgraded["engine_version"] == 6
utils.validate_stored_schema(upgraded)

# upgrade 1 -> 7
schema_dict = load_json_case("schemas/ev1/event.schema")
assert schema_dict["engine_version"] == 1
upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=7)
assert upgraded["engine_version"] == 7
utils.validate_stored_schema(upgraded)
# we should have an empty ancestors list after upgrade to 7
assert upgraded["ancestors"] == []


def test_unknown_engine_upgrade() -> None:
schema_dict: TStoredSchema = load_json_case("schemas/ev1/event.schema")
Expand Down
29 changes: 29 additions & 0 deletions tests/common/schema/test_versioning.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import yaml
from copy import deepcopy

from dlt.common import json
from dlt.common.schema import utils
Expand Down Expand Up @@ -122,3 +123,31 @@ def test_version_preserve_on_reload(remove_defaults: bool) -> None:
saved_rasa_schema = Schema.from_dict(yaml.safe_load(rasa_yml))
assert saved_rasa_schema.stored_version == rasa_schema.stored_version
assert saved_rasa_schema.stored_version_hash == rasa_schema.stored_version_hash


def test_create_ancestry() -> None:
eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7")
schema = Schema.from_dict(eth_v7) # type: ignore[arg-type]
assert schema._stored_ancestors == ["Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ="]
version = schema._stored_version

# modify save and load schema 15 times and check ancestry
expected_ancestors = ["Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ="]
for i in range(1,15):
# keep expected ancestors
expected_ancestors.insert(0, schema._stored_version_hash)

# update schema
row = {f"float{i}": 78172.128}
_, new_table = schema.coerce_row("event_user", None, row)
schema.update_table(new_table)
schema_dict = schema.to_dict()
schema = Schema.from_stored_schema(schema_dict)

assert schema._stored_ancestors == expected_ancestors[:10]
assert schema._stored_version == version + i

# we never have more than 10 ancestors
assert len(schema._stored_ancestors) == i + 1 if i + 1 <= 10 else 10

assert len(schema._stored_ancestors) == 10
7 changes: 7 additions & 0 deletions tests/common/storages/test_schema_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch
assert storage_schema.version == reloaded_schema.stored_version
assert storage_schema.version_hash == reloaded_schema.stored_version_hash
assert storage_schema._imported_version_hash == reloaded_schema._imported_version_hash
assert storage_schema.ancestors == reloaded_schema.ancestors
# the import schema gets modified
storage_schema.tables["_dlt_loads"]["write_disposition"] = "append"
storage_schema.tables.pop("event_user")
Expand All @@ -96,7 +97,11 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch
# we have overwritten storage schema
assert reloaded_schema.tables["_dlt_loads"]["write_disposition"] == "append"
assert "event_user" not in reloaded_schema.tables

# hash and ancestry stay the same
assert reloaded_schema._imported_version_hash == storage_schema.version_hash
assert storage_schema.ancestors == reloaded_schema.ancestors

# but original version has increased
assert reloaded_schema.stored_version == storage_schema.version + 1

Expand Down Expand Up @@ -200,6 +205,7 @@ def test_save_store_schema_over_import(ie_storage: SchemaStorage) -> None:
assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7
assert schema._stored_version_hash == schema_hash
assert schema.version_hash == schema_hash
assert schema.ancestors == []
# we have simple schema in export folder
fs = FileStorage(ie_storage.config.export_schema_path)
exported_name = ie_storage._file_name_in_store("ethereum", "yaml")
Expand All @@ -219,6 +225,7 @@ def test_save_store_schema_over_import_sync(synced_storage: SchemaStorage) -> No
exported_name = synced_storage._file_name_in_store("ethereum", "yaml")
exported_schema = yaml.safe_load(fs.load(exported_name))
assert schema.version_hash == exported_schema["version_hash"] == schema_hash
assert schema.ancestors == []
# when it is loaded we will import schema again which is identical to the current one but the import link
# will be set to itself
schema = synced_storage.load_schema("ethereum")
Expand Down

0 comments on commit 2437983

Please sign in to comment.