Skip to content

Commit

Permalink
add schema ancestors
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Nov 19, 2023
1 parent e970741 commit 33a64fb
Show file tree
Hide file tree
Showing 13 changed files with 560 additions and 27 deletions.
16 changes: 13 additions & 3 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Schema:
_dlt_tables_prefix: str
_stored_version: int # version at load/creation time
_stored_version_hash: str # version hash at load/creation time
_stored_ancestors: Optional[List[str]] # list of ancestor hashes of the schema
_imported_version_hash: str # version hash of recently imported schema
_schema_description: str # optional schema description
_schema_tables: TSchemaTables
Expand All @@ -61,6 +62,7 @@ def __init__(self, name: str, normalizers: TNormalizersConfig = None) -> None:

@classmethod
def from_dict(cls, d: DictStrAny) -> "Schema":

# upgrade engine if needed
stored_schema = utils.migrate_schema(d, d["engine_version"], cls.ENGINE_VERSION)
# verify schema
Expand Down Expand Up @@ -91,7 +93,8 @@ def to_dict(self, remove_defaults: bool = False) -> TStoredSchema:
"name": self._schema_name,
"tables": self._schema_tables,
"settings": self._settings,
"normalizers": self._normalizers_config
"normalizers": self._normalizers_config,
"ancestors": self._stored_ancestors
}
if self._imported_version_hash and not remove_defaults:
stored_schema["imported_version_hash"] = self._imported_version_hash
Expand Down Expand Up @@ -223,7 +226,7 @@ def update_schema(self, schema: "Schema") -> None:
self._compile_settings()


def bump_version(self) -> Tuple[int, str]:
def bump_version(self) -> Tuple[int, str, List[str]]:
"""Computes schema hash in order to check if schema content was modified. In such case the schema ``stored_version`` and ``stored_version_hash`` are updated.
Should not be used in production code. The method ``to_dict`` will generate TStoredSchema with correct value, only once before persisting schema to storage.
Expand All @@ -232,7 +235,7 @@ def bump_version(self) -> Tuple[int, str]:
Tuple[int, str]: Current (``stored_version``, ``stored_version_hash``) tuple
"""
version = utils.bump_version_if_modified(self.to_dict())
self._stored_version, self._stored_version_hash = version
self._stored_version, self._stored_version_hash, self._stored_ancestors = version
return version

def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: StrAny) -> StrAny:
Expand Down Expand Up @@ -350,6 +353,11 @@ def version_hash(self) -> str:
"""Current version hash of the schema, recomputed from the actual content"""
return utils.bump_version_if_modified(self.to_dict())[1]

@property
def ancestors(self) -> List[str]:
"""Current version hash of the schema, recomputed from the actual content"""
return utils.bump_version_if_modified(self.to_dict())[2]

@property
def stored_version_hash(self) -> str:
"""Version hash of the schema content form the time of schema loading/creation."""
Expand Down Expand Up @@ -532,6 +540,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> No
self._stored_version_hash: str = None
self._imported_version_hash: str = None
self._schema_description: str = None
self._stored_ancestors: List[str] = []

self._settings: TSchemaSettings = {}
self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = []
Expand Down Expand Up @@ -570,6 +579,7 @@ def _from_stored_schema(self, stored_schema: TStoredSchema) -> None:
self._imported_version_hash = stored_schema.get("imported_version_hash")
self._schema_description = stored_schema.get("description")
self._settings = stored_schema.get("settings") or {}
self._stored_ancestors = stored_schema.get("ancestors")
self._compile_settings()

def _set_schema_name(self, name: str) -> None:
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


# current version of schema engine
SCHEMA_ENGINE_VERSION = 6
SCHEMA_ENGINE_VERSION = 7

# dlt tables
VERSION_TABLE_NAME = "_dlt_version"
Expand Down Expand Up @@ -108,6 +108,7 @@ class TStoredSchema(TypedDict, total=False):
"""TypeDict defining the schema representation in storage"""
version: int
version_hash: str
ancestors: List[str]
imported_version_hash: Optional[str]
engine_version: int
name: str
Expand Down
22 changes: 19 additions & 3 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def add_column_defaults(column: TColumnSchemaBase) -> TColumnSchema:
# return copy(column) # type: ignore


def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str]:
def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, List[str]]:
# if any change to schema document is detected then bump version and write new hash
hash_ = generate_version_hash(stored_schema)
previous_hash = stored_schema.get("version_hash")
Expand All @@ -143,8 +143,13 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str]:
pass
elif hash_ != previous_hash:
stored_schema["version"] += 1
# unshift previous hash to ancestors and limit array to 10 entries
if previous_hash not in stored_schema["ancestors"]:
stored_schema["ancestors"].insert(0, previous_hash)
stored_schema["ancestors"] = stored_schema["ancestors"][:10]

stored_schema["version_hash"] = hash_
return stored_schema["version"], hash_
return stored_schema["version"], hash_, stored_schema["ancestors"]


def generate_version_hash(stored_schema: TStoredSchema) -> str:
Expand All @@ -153,6 +158,7 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str:
schema_copy.pop("version")
schema_copy.pop("version_hash", None)
schema_copy.pop("imported_version_hash", None)
schema_copy.pop("ancestors", None)
# ignore order of elements when computing the hash
content = json.dumps(schema_copy, sort_keys=True)
h = hashlib.sha3_256(content.encode("utf-8"))
Expand Down Expand Up @@ -240,12 +246,18 @@ def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern:


def validate_stored_schema(stored_schema: TStoredSchema) -> None:
# exclude validation of keys added later
ignored_keys = []
if stored_schema["engine_version"] < 7:
ignored_keys.append("ancestors")

# use lambda to verify only non extra fields
validate_dict_ignoring_xkeys(
spec=TStoredSchema,
doc=stored_schema,
path=".",
validator_f=simple_regex_validator
validator_f=simple_regex_validator,
filter_required=lambda k: k not in ignored_keys
)
# check child parent relationships
for table_name, table in stored_schema["tables"].items():
Expand All @@ -256,6 +268,7 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None:


def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema:

if from_engine == to_engine:
return cast(TStoredSchema, schema_dict)

Expand Down Expand Up @@ -340,6 +353,9 @@ def migrate_filters(group: str, filters: List[str]) -> None:
# replace loads table
schema_dict["tables"][LOADS_TABLE_NAME] = load_table()
from_engine = 6
if from_engine == 6 and to_engine > 6:
schema_dict["ancestors"] = []
from_engine = 7

schema_dict["engine_version"] = from_engine
if from_engine != to_engine:
Expand Down
5 changes: 3 additions & 2 deletions dlt/common/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
TCustomValidator = Callable[[str, str, Any, Any], bool]


def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFunc = None, validator_f: TCustomValidator = None) -> None:
def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFunc = None, validator_f: TCustomValidator = None, filter_required: TFilterFunc = None) -> None:
"""Validate the `doc` dictionary based on the given typed dictionary specification `spec`.
Args:
Expand All @@ -32,11 +32,12 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil
"""
# pass through filter
filter_f = filter_f or (lambda _: True)
filter_required = filter_required or (lambda _: True)
# cannot validate anything
validator_f = validator_f or (lambda p, pk, pv, t: False)

allowed_props = get_type_hints(spec)
required_props = {k: v for k, v in allowed_props.items() if not is_optional_type(v)}
required_props = {k: v for k, v in allowed_props.items() if (not is_optional_type(v) and filter_required(k))}
# remove optional props
props = {k: v for k, v in doc.items() if filter_f(k)}
# check missing props
Expand Down
Loading

0 comments on commit 33a64fb

Please sign in to comment.