diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 77a5ae8e8e..1878cf63d6 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -37,6 +37,7 @@ class Schema: _dlt_tables_prefix: str _stored_version: int # version at load/creation time _stored_version_hash: str # version hash at load/creation time + _stored_ancestors: Optional[List[str]] # list of ancestor hashes of the schema _imported_version_hash: str # version hash of recently imported schema _schema_description: str # optional schema description _schema_tables: TSchemaTables @@ -61,6 +62,7 @@ def __init__(self, name: str, normalizers: TNormalizersConfig = None) -> None: @classmethod def from_dict(cls, d: DictStrAny) -> "Schema": + # upgrade engine if needed stored_schema = utils.migrate_schema(d, d["engine_version"], cls.ENGINE_VERSION) # verify schema @@ -91,7 +93,8 @@ def to_dict(self, remove_defaults: bool = False) -> TStoredSchema: "name": self._schema_name, "tables": self._schema_tables, "settings": self._settings, - "normalizers": self._normalizers_config + "normalizers": self._normalizers_config, + "ancestors": self._stored_ancestors } if self._imported_version_hash and not remove_defaults: stored_schema["imported_version_hash"] = self._imported_version_hash @@ -223,7 +226,7 @@ def update_schema(self, schema: "Schema") -> None: self._compile_settings() - def bump_version(self) -> Tuple[int, str]: + def bump_version(self) -> Tuple[int, str, List[str]]: """Computes schema hash in order to check if schema content was modified. In such case the schema ``stored_version`` and ``stored_version_hash`` are updated. Should not be used in production code. The method ``to_dict`` will generate TStoredSchema with correct value, only once before persisting schema to storage. @@ -232,7 +235,7 @@ def bump_version(self) -> Tuple[int, str]: Tuple[int, str]: Current (``stored_version``, ``stored_version_hash``) tuple """ version = utils.bump_version_if_modified(self.to_dict()) - self._stored_version, self._stored_version_hash = version + self._stored_version, self._stored_version_hash, self._stored_ancestors = version return version def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: StrAny) -> StrAny: @@ -350,6 +353,11 @@ def version_hash(self) -> str: """Current version hash of the schema, recomputed from the actual content""" return utils.bump_version_if_modified(self.to_dict())[1] + @property + def ancestors(self) -> List[str]: + """Current version hash of the schema, recomputed from the actual content""" + return utils.bump_version_if_modified(self.to_dict())[2] + @property def stored_version_hash(self) -> str: """Version hash of the schema content form the time of schema loading/creation.""" @@ -532,6 +540,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> No self._stored_version_hash: str = None self._imported_version_hash: str = None self._schema_description: str = None + self._stored_ancestors: List[str] = [] self._settings: TSchemaSettings = {} self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = [] @@ -570,6 +579,7 @@ def _from_stored_schema(self, stored_schema: TStoredSchema) -> None: self._imported_version_hash = stored_schema.get("imported_version_hash") self._schema_description = stored_schema.get("description") self._settings = stored_schema.get("settings") or {} + self._stored_ancestors = stored_schema.get("ancestors") self._compile_settings() def _set_schema_name(self, name: str) -> None: diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index ac17f0ae9f..7d53f4e8a8 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -11,7 +11,7 @@ # current version of schema engine -SCHEMA_ENGINE_VERSION = 6 +SCHEMA_ENGINE_VERSION = 7 # dlt tables VERSION_TABLE_NAME = "_dlt_version" @@ -108,6 +108,7 @@ class TStoredSchema(TypedDict, total=False): """TypeDict defining the schema representation in storage""" version: int version_hash: str + ancestors: List[str] imported_version_hash: Optional[str] engine_version: int name: str diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index f2075ce85d..75b7d0dd31 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -134,7 +134,7 @@ def add_column_defaults(column: TColumnSchemaBase) -> TColumnSchema: # return copy(column) # type: ignore -def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str]: +def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, List[str]]: # if any change to schema document is detected then bump version and write new hash hash_ = generate_version_hash(stored_schema) previous_hash = stored_schema.get("version_hash") @@ -143,8 +143,13 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str]: pass elif hash_ != previous_hash: stored_schema["version"] += 1 + # unshift previous hash to ancestors and limit array to 10 entries + if previous_hash not in stored_schema["ancestors"]: + stored_schema["ancestors"].insert(0, previous_hash) + stored_schema["ancestors"] = stored_schema["ancestors"][:10] + stored_schema["version_hash"] = hash_ - return stored_schema["version"], hash_ + return stored_schema["version"], hash_, stored_schema["ancestors"] def generate_version_hash(stored_schema: TStoredSchema) -> str: @@ -153,6 +158,7 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str: schema_copy.pop("version") schema_copy.pop("version_hash", None) schema_copy.pop("imported_version_hash", None) + schema_copy.pop("ancestors", None) # ignore order of elements when computing the hash content = json.dumps(schema_copy, sort_keys=True) h = hashlib.sha3_256(content.encode("utf-8")) @@ -240,12 +246,18 @@ def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern: def validate_stored_schema(stored_schema: TStoredSchema) -> None: + # exclude validation of keys added later + ignored_keys = [] + if stored_schema["engine_version"] < 7: + ignored_keys.append("ancestors") + # use lambda to verify only non extra fields validate_dict_ignoring_xkeys( spec=TStoredSchema, doc=stored_schema, path=".", - validator_f=simple_regex_validator + validator_f=simple_regex_validator, + filter_required=lambda k: k not in ignored_keys ) # check child parent relationships for table_name, table in stored_schema["tables"].items(): @@ -256,6 +268,7 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None: def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema: + if from_engine == to_engine: return cast(TStoredSchema, schema_dict) @@ -340,6 +353,9 @@ def migrate_filters(group: str, filters: List[str]) -> None: # replace loads table schema_dict["tables"][LOADS_TABLE_NAME] = load_table() from_engine = 6 + if from_engine == 6 and to_engine > 6: + schema_dict["ancestors"] = [] + from_engine = 7 schema_dict["engine_version"] = from_engine if from_engine != to_engine: diff --git a/dlt/common/validation.py b/dlt/common/validation.py index f1900c1b0e..7a313b1b29 100644 --- a/dlt/common/validation.py +++ b/dlt/common/validation.py @@ -9,7 +9,7 @@ TCustomValidator = Callable[[str, str, Any, Any], bool] -def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFunc = None, validator_f: TCustomValidator = None) -> None: +def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFunc = None, validator_f: TCustomValidator = None, filter_required: TFilterFunc = None) -> None: """Validate the `doc` dictionary based on the given typed dictionary specification `spec`. Args: @@ -32,11 +32,12 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil """ # pass through filter filter_f = filter_f or (lambda _: True) + filter_required = filter_required or (lambda _: True) # cannot validate anything validator_f = validator_f or (lambda p, pk, pv, t: False) allowed_props = get_type_hints(spec) - required_props = {k: v for k, v in allowed_props.items() if not is_optional_type(v)} + required_props = {k: v for k, v in allowed_props.items() if (not is_optional_type(v) and filter_required(k))} # remove optional props props = {k: v for k, v in doc.items() if filter_f(k)} # check missing props diff --git a/tests/common/cases/schemas/eth/ethereum_schema_v7.yml b/tests/common/cases/schemas/eth/ethereum_schema_v7.yml new file mode 100644 index 0000000000..fd612df987 --- /dev/null +++ b/tests/common/cases/schemas/eth/ethereum_schema_v7.yml @@ -0,0 +1,459 @@ +version: 14 +version_hash: ZbDv9+tdJK7P/4QIB0qqHzqNSsVynVx90GL4giV8/p0= +engine_version: 7 +name: ethereum +tables: + _dlt_loads: + columns: + load_id: + nullable: false + data_type: text + name: load_id + schema_name: + nullable: true + data_type: text + name: schema_name + status: + nullable: false + data_type: bigint + name: status + inserted_at: + nullable: false + data_type: timestamp + name: inserted_at + schema_version_hash: + nullable: true + data_type: text + name: schema_version_hash + write_disposition: skip + description: Created by DLT. Tracks completed loads + name: _dlt_loads + resource: _dlt_loads + _dlt_version: + columns: + version: + nullable: false + data_type: bigint + name: version + engine_version: + nullable: false + data_type: bigint + name: engine_version + inserted_at: + nullable: false + data_type: timestamp + name: inserted_at + schema_name: + nullable: false + data_type: text + name: schema_name + version_hash: + nullable: false + data_type: text + name: version_hash + schema: + nullable: false + data_type: text + name: schema + write_disposition: skip + description: Created by DLT. Tracks schema updates + name: _dlt_version + resource: _dlt_version + blocks: + description: Ethereum blocks + x-annotation: this will be preserved on save + write_disposition: append + table_sealed: true + filters: + includes: [] + excludes: [] + columns: + _dlt_load_id: + nullable: false + description: load id coming from the extractor + data_type: text + name: _dlt_load_id + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + number: + nullable: false + primary_key: true + data_type: bigint + name: number + parent_hash: + nullable: true + data_type: text + name: parent_hash + hash: + nullable: false + cluster: true + unique: true + data_type: text + name: hash + base_fee_per_gas: + nullable: false + data_type: wei + name: base_fee_per_gas + difficulty: + nullable: false + data_type: wei + name: difficulty + extra_data: + nullable: true + data_type: text + name: extra_data + gas_limit: + nullable: false + data_type: bigint + name: gas_limit + gas_used: + nullable: false + data_type: bigint + name: gas_used + logs_bloom: + nullable: true + data_type: binary + name: logs_bloom + miner: + nullable: true + data_type: text + name: miner + mix_hash: + nullable: true + data_type: text + name: mix_hash + nonce: + nullable: true + data_type: text + name: nonce + receipts_root: + nullable: true + data_type: text + name: receipts_root + sha3_uncles: + nullable: true + data_type: text + name: sha3_uncles + size: + nullable: true + data_type: bigint + name: size + state_root: + nullable: false + data_type: text + name: state_root + timestamp: + nullable: false + unique: true + sort: true + data_type: timestamp + name: timestamp + total_difficulty: + nullable: true + data_type: wei + name: total_difficulty + transactions_root: + nullable: false + data_type: text + name: transactions_root + name: blocks + resource: blocks + blocks__transactions: + parent: blocks + columns: + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + block_number: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: block_number + transaction_index: + nullable: false + primary_key: true + data_type: bigint + name: transaction_index + hash: + nullable: false + unique: true + data_type: text + name: hash + block_hash: + nullable: false + cluster: true + data_type: text + name: block_hash + block_timestamp: + nullable: false + sort: true + data_type: timestamp + name: block_timestamp + chain_id: + nullable: true + data_type: text + name: chain_id + from: + nullable: true + data_type: text + name: from + gas: + nullable: true + data_type: bigint + name: gas + gas_price: + nullable: true + data_type: bigint + name: gas_price + input: + nullable: true + data_type: text + name: input + max_fee_per_gas: + nullable: true + data_type: wei + name: max_fee_per_gas + max_priority_fee_per_gas: + nullable: true + data_type: wei + name: max_priority_fee_per_gas + nonce: + nullable: true + data_type: bigint + name: nonce + r: + nullable: true + data_type: text + name: r + s: + nullable: true + data_type: text + name: s + status: + nullable: true + data_type: bigint + name: status + to: + nullable: true + data_type: text + name: to + type: + nullable: true + data_type: text + name: type + v: + nullable: true + data_type: bigint + name: v + value: + nullable: false + data_type: wei + name: value + eth_value: + nullable: true + data_type: decimal + name: eth_value + name: blocks__transactions + blocks__transactions__logs: + parent: blocks__transactions + columns: + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + address: + nullable: false + data_type: text + name: address + block_timestamp: + nullable: false + sort: true + data_type: timestamp + name: block_timestamp + block_hash: + nullable: false + cluster: true + data_type: text + name: block_hash + block_number: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: block_number + transaction_index: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: transaction_index + log_index: + nullable: false + primary_key: true + data_type: bigint + name: log_index + data: + nullable: true + data_type: text + name: data + removed: + nullable: true + data_type: bool + name: removed + transaction_hash: + nullable: false + data_type: text + name: transaction_hash + name: blocks__transactions__logs + blocks__transactions__logs__topics: + parent: blocks__transactions__logs + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__transactions__logs__topics + blocks__transactions__access_list: + parent: blocks__transactions + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + address: + nullable: true + data_type: text + name: address + name: blocks__transactions__access_list + blocks__transactions__access_list__storage_keys: + parent: blocks__transactions__access_list + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__transactions__access_list__storage_keys + blocks__uncles: + parent: blocks + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__uncles +settings: + schema_sealed: true + default_hints: + foreign_key: + - _dlt_parent_id + not_null: + - re:^_dlt_id$ + - _dlt_root_id + - _dlt_parent_id + - _dlt_list_idx + unique: + - _dlt_id + cluster: + - block_hash + partition: + - block_timestamp + root_key: + - _dlt_root_id + preferred_types: + timestamp: timestamp + block_timestamp: timestamp +normalizers: + names: dlt.common.normalizers.names.snake_case + json: + module: dlt.common.normalizers.json.relational + config: + generate_dlt_id: true + propagation: + root: + _dlt_id: _dlt_root_id + tables: + blocks: + timestamp: block_timestamp + hash: block_hash +ancestors: +- Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ= + diff --git a/tests/common/schema/test_schema.py b/tests/common/schema/test_schema.py index 8b465d796e..c84c25574f 100644 --- a/tests/common/schema/test_schema.py +++ b/tests/common/schema/test_schema.py @@ -295,6 +295,15 @@ def test_upgrade_engine_v1_schema() -> None: assert upgraded["engine_version"] == 6 utils.validate_stored_schema(upgraded) + # upgrade 1 -> 7 + schema_dict = load_json_case("schemas/ev1/event.schema") + assert schema_dict["engine_version"] == 1 + upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=7) + assert upgraded["engine_version"] == 7 + utils.validate_stored_schema(upgraded) + # we should have an empty ancestors list after upgrade to 7 + assert upgraded["ancestors"] == [] + def test_unknown_engine_upgrade() -> None: schema_dict: TStoredSchema = load_json_case("schemas/ev1/event.schema") @@ -573,7 +582,8 @@ def assert_new_schema_values(schema: Schema) -> None: assert schema.stored_version == 1 assert schema.stored_version_hash is not None assert schema.version_hash is not None - assert schema.ENGINE_VERSION == 6 + assert schema.ENGINE_VERSION == 7 + assert schema._stored_ancestors == [] assert len(schema.settings["default_hints"]) > 0 # check settings assert utils.standard_type_detections() == schema.settings["detections"] == schema._type_detections diff --git a/tests/common/schema/test_versioning.py b/tests/common/schema/test_versioning.py index 1bfaaa5da2..7d0074e934 100644 --- a/tests/common/schema/test_versioning.py +++ b/tests/common/schema/test_versioning.py @@ -1,5 +1,6 @@ import pytest import yaml +from copy import deepcopy from dlt.common import json from dlt.common.schema import utils @@ -83,7 +84,7 @@ def test_infer_column_bumps_version() -> None: def test_preserve_version_on_load() -> None: - eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v6") + eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") version = eth_v6["version"] version_hash = eth_v6["version_hash"] schema = Schema.from_dict(eth_v6) # type: ignore[arg-type] @@ -95,7 +96,7 @@ def test_preserve_version_on_load() -> None: @pytest.mark.parametrize("remove_defaults", [True, False]) def test_version_preserve_on_reload(remove_defaults: bool) -> None: - eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v6") + eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") schema = Schema.from_dict(eth_v6) # type: ignore[arg-type] to_save_dict = schema.to_dict(remove_defaults=remove_defaults) @@ -122,3 +123,31 @@ def test_version_preserve_on_reload(remove_defaults: bool) -> None: saved_rasa_schema = Schema.from_dict(yaml.safe_load(rasa_yml)) assert saved_rasa_schema.stored_version == rasa_schema.stored_version assert saved_rasa_schema.stored_version_hash == rasa_schema.stored_version_hash + + +def test_create_ancestry() -> None: + eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") + schema = Schema.from_dict(eth_v7) # type: ignore[arg-type] + assert schema._stored_ancestors == ["Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ="] + version = schema._stored_version + + # modify save and load schema 15 times and check ancestry + expected_ancestors = ["Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ="] + for i in range(1,15): + # keep expected ancestors + expected_ancestors.insert(0, schema._stored_version_hash) + + # update schema + row = {f"float{i}": 78172.128} + _, new_table = schema.coerce_row("event_user", None, row) + schema.update_table(new_table) + schema_dict = schema.to_dict() + schema = Schema.from_stored_schema(schema_dict) + + assert schema._stored_ancestors == expected_ancestors[:10] + assert schema._stored_version == version + i + + # we never have more than 10 ancestors + assert len(schema._stored_ancestors) == i + 1 if i + 1 <= 10 else 10 + + assert len(schema._stored_ancestors) == 10 \ No newline at end of file diff --git a/tests/common/storages/test_schema_storage.py b/tests/common/storages/test_schema_storage.py index f45773e4f5..a577729e5d 100644 --- a/tests/common/storages/test_schema_storage.py +++ b/tests/common/storages/test_schema_storage.py @@ -11,7 +11,7 @@ from dlt.common.storages import SchemaStorageConfiguration, SchemaStorage, LiveSchemaStorage, FileStorage from tests.utils import autouse_test_storage, TEST_STORAGE_ROOT -from tests.common.utils import load_yml_case, yml_case_path, COMMON_TEST_CASES_PATH, IMPORTED_VERSION_HASH_ETH_V6 +from tests.common.utils import load_yml_case, yml_case_path, COMMON_TEST_CASES_PATH, IMPORTED_VERSION_HASH_ETH_V7 @pytest.fixture @@ -87,6 +87,7 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch assert storage_schema.version == reloaded_schema.stored_version assert storage_schema.version_hash == reloaded_schema.stored_version_hash assert storage_schema._imported_version_hash == reloaded_schema._imported_version_hash + assert storage_schema.ancestors == reloaded_schema.ancestors # the import schema gets modified storage_schema.tables["_dlt_loads"]["write_disposition"] = "append" storage_schema.tables.pop("event_user") @@ -96,7 +97,11 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch # we have overwritten storage schema assert reloaded_schema.tables["_dlt_loads"]["write_disposition"] == "append" assert "event_user" not in reloaded_schema.tables + + # hash and ancestry stay the same assert reloaded_schema._imported_version_hash == storage_schema.version_hash + assert storage_schema.ancestors == reloaded_schema.ancestors + # but original version has increased assert reloaded_schema.stored_version == storage_schema.version + 1 @@ -194,12 +199,13 @@ def test_save_store_schema_over_import(ie_storage: SchemaStorage) -> None: ie_storage.save_schema(schema) assert schema.version_hash == schema_hash # we linked schema to import schema - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V6 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 # load schema and make sure our new schema is here schema = ie_storage.load_schema("ethereum") - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V6 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 assert schema._stored_version_hash == schema_hash assert schema.version_hash == schema_hash + assert schema.ancestors == [] # we have simple schema in export folder fs = FileStorage(ie_storage.config.export_schema_path) exported_name = ie_storage._file_name_in_store("ethereum", "yaml") @@ -213,12 +219,13 @@ def test_save_store_schema_over_import_sync(synced_storage: SchemaStorage) -> No schema = Schema("ethereum") schema_hash = schema.version_hash synced_storage.save_schema(schema) - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V6 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 # import schema is overwritten fs = FileStorage(synced_storage.config.import_schema_path) exported_name = synced_storage._file_name_in_store("ethereum", "yaml") exported_schema = yaml.safe_load(fs.load(exported_name)) assert schema.version_hash == exported_schema["version_hash"] == schema_hash + assert schema.ancestors == [] # when it is loaded we will import schema again which is identical to the current one but the import link # will be set to itself schema = synced_storage.load_schema("ethereum") @@ -269,12 +276,12 @@ def test_schema_from_file() -> None: def prepare_import_folder(storage: SchemaStorage) -> None: - shutil.copy(yml_case_path("schemas/eth/ethereum_schema_v6"), os.path.join(storage.storage.storage_path, "../import/ethereum.schema.yaml")) + shutil.copy(yml_case_path("schemas/eth/ethereum_schema_v7"), os.path.join(storage.storage.storage_path, "../import/ethereum.schema.yaml")) def assert_schema_imported(synced_storage: SchemaStorage, storage: SchemaStorage) -> Schema: prepare_import_folder(synced_storage) - eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v6") + eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") schema = synced_storage.load_schema("ethereum") # is linked to imported schema schema._imported_version_hash = eth_v6["version_hash"] diff --git a/tests/common/test_validation.py b/tests/common/test_validation.py index 4583da3a1e..bbda683717 100644 --- a/tests/common/test_validation.py +++ b/tests/common/test_validation.py @@ -83,14 +83,14 @@ def test_doc() -> TTestRecord: def test_validate_schema_cases() -> None: - with open("tests/common/cases/schemas/eth/ethereum_schema_v4.yml", mode="r", encoding="utf-8") as f: + with open("tests/common/cases/schemas/eth/ethereum_schema_v7.yml", mode="r", encoding="utf-8") as f: schema_dict: TStoredSchema = yaml.safe_load(f) validate_dict_ignoring_xkeys( spec=TStoredSchema, doc=schema_dict, path=".", - validator_f=simple_regex_validator + validator_f=simple_regex_validator, ) # with open("tests/common/cases/schemas/rasa/event.schema.json") as f: diff --git a/tests/common/utils.py b/tests/common/utils.py index 54a48825af..8e0d5351e6 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -16,7 +16,7 @@ COMMON_TEST_CASES_PATH = "./tests/common/cases/" # for import schema tests, change when upgrading the schema version -IMPORTED_VERSION_HASH_ETH_V6 = "Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ=" +IMPORTED_VERSION_HASH_ETH_V7 = "ZbDv9+tdJK7P/4QIB0qqHzqNSsVynVx90GL4giV8/p0=" # test sentry DSN TEST_SENTRY_DSN = "https://797678dd0af64b96937435326c7d30c1@o1061158.ingest.sentry.io/4504306172821504" # preserve secrets path to be able to restore it diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index f80dbbd7e6..c9c6c4c437 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -18,7 +18,7 @@ from tests.utils import TEST_STORAGE_ROOT from tests.cases import JSON_TYPED_DICT, JSON_TYPED_DICT_DECODED -from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V6, yml_case_path as common_yml_case_path +from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V7, yml_case_path as common_yml_case_path from tests.common.configuration.utils import environment from tests.load.pipeline.utils import assert_query_data, drop_active_pipeline_data from tests.load.utils import destinations_configs, DestinationTestConfiguration, get_normalized_dataset_name @@ -404,7 +404,7 @@ def test_restore_schemas_while_import_schemas_exist(destination_config: Destinat assert normalized_annotations in schema.tables # check if attached to import schema - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V6 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 # extract some data with restored pipeline p.run(["C", "D", "E"], table_name="blacklist") assert normalized_labels in schema.tables diff --git a/tests/load/weaviate/test_naming.py b/tests/load/weaviate/test_naming.py index a965201425..488d66b725 100644 --- a/tests/load/weaviate/test_naming.py +++ b/tests/load/weaviate/test_naming.py @@ -87,7 +87,7 @@ def test_reserved_property_names() -> None: # print(schema_2.name) # print(schema_2.naming) -# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v6") +# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v7") # eth_v6_schema = dlt.Schema.from_dict(eth_v6) # pipeline.extract(s, schema=eth_v6_schema) @@ -101,7 +101,7 @@ def test_reserved_property_names() -> None: # print(pipeline.dataset_name) # s = small() -# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v6") +# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v7") # eth_v6_schema = dlt.Schema.from_dict(eth_v6) # pipeline.extract(s, schema=eth_v6_schema) diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index 09d8e98d82..ebd2ddb515 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -81,7 +81,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None: pipeline.sync_destination() # print(pipeline.working_dir) # we have updated schema - assert pipeline.default_schema.ENGINE_VERSION == 6 + assert pipeline.default_schema.ENGINE_VERSION == 7 # make sure that schema hash retrieved from the destination is exactly the same as the schema hash that was in storage before the schema was wiped assert pipeline.default_schema.stored_version_hash == github_schema["version_hash"] @@ -114,6 +114,6 @@ def test_load_package_with_dlt_update(test_storage: FileStorage) -> None: github_schema = json.loads(test_storage.load(f".dlt/pipelines/{GITHUB_PIPELINE_NAME}/schemas/github.schema.json")) pipeline = pipeline.drop() pipeline.sync_destination() - assert pipeline.default_schema.ENGINE_VERSION == 6 + assert pipeline.default_schema.ENGINE_VERSION == 7 # schema version does not match `dlt.attach` does not update to the right schema by itself assert pipeline.default_schema.stored_version_hash != github_schema["version_hash"]