Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source and schema changes #769

Merged
merged 8 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Schema:
_dlt_tables_prefix: str
_stored_version: int # version at load/creation time
_stored_version_hash: str # version hash at load/creation time
_stored_ancestors: Optional[List[str]] # list of ancestor hashes of the schema
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not ancestor. this is previous version hash. maybe let's name it like that. we do not have any schema derivation scheme like we have in pydantic models (ie. base schema etc.) this is just a version (probably revision would be better - but it is too late to change it)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_previous_version_hashes or _previous_hashes would be better

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

_imported_version_hash: str # version hash of recently imported schema
_schema_description: str # optional schema description
_schema_tables: TSchemaTables
Expand All @@ -61,6 +62,7 @@ def __init__(self, name: str, normalizers: TNormalizersConfig = None) -> None:

@classmethod
def from_dict(cls, d: DictStrAny) -> "Schema":

# upgrade engine if needed
stored_schema = utils.migrate_schema(d, d["engine_version"], cls.ENGINE_VERSION)
# verify schema
Expand Down Expand Up @@ -91,7 +93,8 @@ def to_dict(self, remove_defaults: bool = False) -> TStoredSchema:
"name": self._schema_name,
"tables": self._schema_tables,
"settings": self._settings,
"normalizers": self._normalizers_config
"normalizers": self._normalizers_config,
"ancestors": self._stored_ancestors
}
if self._imported_version_hash and not remove_defaults:
stored_schema["imported_version_hash"] = self._imported_version_hash
Expand Down Expand Up @@ -223,7 +226,7 @@ def update_schema(self, schema: "Schema") -> None:
self._compile_settings()


def bump_version(self) -> Tuple[int, str]:
def bump_version(self) -> Tuple[int, str, List[str]]:
"""Computes schema hash in order to check if schema content was modified. In such case the schema ``stored_version`` and ``stored_version_hash`` are updated.

Should not be used in production code. The method ``to_dict`` will generate TStoredSchema with correct value, only once before persisting schema to storage.
Expand All @@ -232,7 +235,7 @@ def bump_version(self) -> Tuple[int, str]:
Tuple[int, str]: Current (``stored_version``, ``stored_version_hash``) tuple
"""
version = utils.bump_version_if_modified(self.to_dict())
self._stored_version, self._stored_version_hash = version
self._stored_version, self._stored_version_hash, self._stored_ancestors = version
return version

def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: StrAny) -> StrAny:
Expand Down Expand Up @@ -350,6 +353,11 @@ def version_hash(self) -> str:
"""Current version hash of the schema, recomputed from the actual content"""
return utils.bump_version_if_modified(self.to_dict())[1]

@property
def ancestors(self) -> List[str]:
"""Current version hash of the schema, recomputed from the actual content"""
return utils.bump_version_if_modified(self.to_dict())[2]

@property
def stored_version_hash(self) -> str:
"""Version hash of the schema content form the time of schema loading/creation."""
Expand Down Expand Up @@ -532,6 +540,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> No
self._stored_version_hash: str = None
self._imported_version_hash: str = None
self._schema_description: str = None
self._stored_ancestors: List[str] = []

self._settings: TSchemaSettings = {}
self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = []
Expand Down Expand Up @@ -570,6 +579,7 @@ def _from_stored_schema(self, stored_schema: TStoredSchema) -> None:
self._imported_version_hash = stored_schema.get("imported_version_hash")
self._schema_description = stored_schema.get("description")
self._settings = stored_schema.get("settings") or {}
self._stored_ancestors = stored_schema.get("ancestors")
self._compile_settings()

def _set_schema_name(self, name: str) -> None:
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


# current version of schema engine
SCHEMA_ENGINE_VERSION = 6
SCHEMA_ENGINE_VERSION = 7

# dlt tables
VERSION_TABLE_NAME = "_dlt_version"
Expand Down Expand Up @@ -108,6 +108,7 @@ class TStoredSchema(TypedDict, total=False):
"""TypeDict defining the schema representation in storage"""
version: int
version_hash: str
ancestors: List[str]
imported_version_hash: Optional[str]
engine_version: int
name: str
Expand Down
22 changes: 19 additions & 3 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def add_column_defaults(column: TColumnSchemaBase) -> TColumnSchema:
# return copy(column) # type: ignore


def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str]:
def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, List[str]]:
# if any change to schema document is detected then bump version and write new hash
hash_ = generate_version_hash(stored_schema)
previous_hash = stored_schema.get("version_hash")
Expand All @@ -143,8 +143,13 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str]:
pass
elif hash_ != previous_hash:
stored_schema["version"] += 1
# unshift previous hash to ancestors and limit array to 10 entries
if previous_hash not in stored_schema["ancestors"]:
stored_schema["ancestors"].insert(0, previous_hash)
stored_schema["ancestors"] = stored_schema["ancestors"][:10]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could have it in the contants so it is easy to find and change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean?


stored_schema["version_hash"] = hash_
return stored_schema["version"], hash_
return stored_schema["version"], hash_, stored_schema["ancestors"]


def generate_version_hash(stored_schema: TStoredSchema) -> str:
Expand All @@ -153,6 +158,7 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str:
schema_copy.pop("version")
schema_copy.pop("version_hash", None)
schema_copy.pop("imported_version_hash", None)
schema_copy.pop("ancestors", None)
# ignore order of elements when computing the hash
content = json.dumps(schema_copy, sort_keys=True)
h = hashlib.sha3_256(content.encode("utf-8"))
Expand Down Expand Up @@ -240,12 +246,18 @@ def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern:


def validate_stored_schema(stored_schema: TStoredSchema) -> None:
# exclude validation of keys added later
ignored_keys = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is required? first we migrate the dictionary and then we validate the schema. so the engine should be always 7

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in test_upgrade_engine_v1_schema many different schema versions are validated. we could alternatively change that test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really I was sure we migrate and then validate. We should validate only the version after migration. So you can change the test.

but you can keep this filter_required providing that you add tests for it

if stored_schema["engine_version"] < 7:
ignored_keys.append("ancestors")

# use lambda to verify only non extra fields
validate_dict_ignoring_xkeys(
spec=TStoredSchema,
doc=stored_schema,
path=".",
validator_f=simple_regex_validator
validator_f=simple_regex_validator,
filter_required=lambda k: k not in ignored_keys
)
# check child parent relationships
for table_name, table in stored_schema["tables"].items():
Expand All @@ -256,6 +268,7 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None:


def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema:

if from_engine == to_engine:
return cast(TStoredSchema, schema_dict)

Expand Down Expand Up @@ -340,6 +353,9 @@ def migrate_filters(group: str, filters: List[str]) -> None:
# replace loads table
schema_dict["tables"][LOADS_TABLE_NAME] = load_table()
from_engine = 6
if from_engine == 6 and to_engine > 6:
schema_dict["ancestors"] = []
from_engine = 7

schema_dict["engine_version"] = from_engine
if from_engine != to_engine:
Expand Down
5 changes: 3 additions & 2 deletions dlt/common/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
TCustomValidator = Callable[[str, str, Any, Any], bool]


def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFunc = None, validator_f: TCustomValidator = None) -> None:
def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFunc = None, validator_f: TCustomValidator = None, filter_required: TFilterFunc = None) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter_required - what it does? you added new argument without docstrings

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename to ignored_props_f. And maybe it is time to consider Pydantic instead? honestly I wrote this code because pydantic handling of TypedDicts was weak (forgot the detaiuls already).

on the other hand I do not want Pydantic in core deps

"""Validate the `doc` dictionary based on the given typed dictionary specification `spec`.

Args:
Expand All @@ -32,11 +32,12 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil
"""
# pass through filter
filter_f = filter_f or (lambda _: True)
filter_required = filter_required or (lambda _: True)
# cannot validate anything
validator_f = validator_f or (lambda p, pk, pv, t: False)

allowed_props = get_type_hints(spec)
required_props = {k: v for k, v in allowed_props.items() if not is_optional_type(v)}
required_props = {k: v for k, v in allowed_props.items() if (not is_optional_type(v) and filter_required(k))}
# remove optional props
props = {k: v for k, v in doc.items() if filter_f(k)}
# check missing props
Expand Down
9 changes: 3 additions & 6 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ def decorator(f: Callable[TSourceFunParams, Any]) -> Callable[TSourceFunParams,
if name and name != schema.name:
raise ExplicitSourceNameInvalid(name, schema.name)

# the name of the source must be identical to the name of the schema
name = schema.name

# wrap source extraction function in configuration with section
func_module = inspect.getmodule(f)
source_section = section or _get_source_section_name(func_module)
Expand All @@ -162,16 +159,16 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltSourceImpl:
# configurations will be accessed in this section in the source
proxy = Container()[PipelineContext]
pipeline_name = None if not proxy.is_active() else proxy.pipeline().pipeline_name
with inject_section(ConfigSectionContext(pipeline_name=pipeline_name, sections=source_sections, source_state_key=name)):
with inject_section(ConfigSectionContext(pipeline_name=pipeline_name, sections=source_sections, source_state_key=schema.name)):
rv = conf_f(*args, **kwargs)
if rv is None:
raise SourceDataIsNone(name)
raise SourceDataIsNone(schema.name)
# if generator, consume it immediately
if inspect.isgenerator(rv):
rv = list(rv)

# convert to source
s = _impl_cls.from_data(name, source_section, schema.clone(update_normalizers=True), rv)
s = _impl_cls.from_data(source_section, schema.clone(update_normalizers=True), rv)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

# apply hints
if max_table_nesting is not None:
s.max_table_nesting = max_table_nesting
Expand Down
19 changes: 9 additions & 10 deletions dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,22 +642,17 @@ class DltSource(Iterable[TDataItem]):
* You can use a `run` method to load the data with a default instance of dlt pipeline.
* You can get source read only state for the currently active Pipeline instance
"""
def __init__(self, name: str, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None:
self.name = name
def __init__(self, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we swap schema with section? it feels more natural - schema is the most important arg

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self.section = section
"""Tells if iterator associated with a source is exhausted"""
self._schema = schema
self._resources: DltResourceDict = DltResourceDict(self.name, self.section)

if self.name != schema.name:
# raise ValueError(f"Schema name {schema.name} differs from source name {name}! The explicit source name argument is deprecated and will be soon removed.")
warnings.warn(f"Schema name {schema.name} differs from source name {name}! The explicit source name argument is deprecated and will be soon removed.")

if resources:
self.resources.add(*resources)

@classmethod
def from_data(cls, name: str, section: str, schema: Schema, data: Any) -> Self:
def from_data(cls, section: str, schema: Schema, data: Any) -> Self:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"""Converts any `data` supported by `dlt` `run` method into `dlt source` with a name `section`.`name` and `schema` schema."""
# creates source from various forms of data
if isinstance(data, DltSource):
Expand All @@ -669,10 +664,14 @@ def from_data(cls, name: str, section: str, schema: Schema, data: Any) -> Self:
else:
resources = [DltResource.from_data(data)]

return cls(name, section, schema, resources)
return cls(section, schema, resources)

@property
def name(self) -> str:
return self._schema.name

# TODO: 4 properties below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer.

# TODO: 4 properties below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer.
@property
def max_table_nesting(self) -> int:
"""A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON."""
Expand Down Expand Up @@ -795,7 +794,7 @@ def state(self) -> StrAny:
def clone(self) -> "DltSource":
"""Creates a deep copy of the source where copies of schema, resources and pipes are created"""
# mind that resources and pipes are cloned when added to the DltResourcesDict in the source constructor
return DltSource(self.name, self.section, self.schema.clone(), list(self._resources.values()))
return DltSource(self.section, self.schema.clone(), list(self._resources.values()))

def __iter__(self) -> Iterator[TDataItem]:
"""Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class.
Expand Down
6 changes: 3 additions & 3 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ def append_data(data_item: Any) -> None:
# do not set section to prevent source that represent a standalone resource
# to overwrite other standalone resources (ie. parents) in that source
sources.append(
DltSource(effective_schema.name, "", effective_schema, [data_item])
DltSource("", effective_schema, [data_item])
)
else:
# iterator/iterable/generator
Expand All @@ -848,7 +848,7 @@ def append_data(data_item: Any) -> None:

if resources:
# add all the appended resources in one source
sources.append(DltSource(effective_schema.name, self.pipeline_name, effective_schema, resources))
sources.append(DltSource( self.pipeline_name, effective_schema, resources))

return sources

Expand Down Expand Up @@ -1252,7 +1252,7 @@ def _save_state(self, state: TPipelineState) -> None:
def _extract_state(self, state: TPipelineState) -> TPipelineState:
# this will extract the state into current load package and update the schema with the _dlt_pipeline_state table
# note: the schema will be persisted because the schema saving decorator is over the state manager decorator for extract
state_source = DltSource(self.default_schema.name, self.pipeline_name, self.default_schema, [state_resource(state)])
state_source = DltSource(self.pipeline_name, self.default_schema, [state_resource(state)])
storage = ExtractorStorage(self._normalize_storage_config)
extract_id = extract_with_schema(storage, state_source, self.default_schema, _NULL_COLLECTOR, 1, 1)
storage.commit_extract_files(extract_id)
Expand Down
Loading