-
Notifications
You must be signed in to change notification settings - Fork 164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dataset factory #1945
dataset factory #1945
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
dlt/destinations/job_client_impl.py
Outdated
f" ORDER BY {c_inserted_at} DESC;" | ||
) | ||
return self._row_to_schema_info(query, self.schema.name) | ||
if any_schema_name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be compressed and also needs to be implemented for all destinations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also: is this signature ok, or do we want to add a new function for this? I'm also not sure about this "any_schema_name":.. :)
tests/load/test_read_interfaces.py
Outdated
@@ -212,6 +212,47 @@ def double_items(): | |||
loads_table = pipeline._dataset()[pipeline.default_schema.loads_table_name] | |||
loads_table.fetchall() | |||
|
|||
# check dataset factory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need proper tests to ensure that always the newest schema actually is selected, this is just basic test code to make sure it generally works
def dataset( | ||
destination: TDestinationReferenceArg, | ||
dataset_name: str, | ||
schema: Union[Schema, str, None] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we allow a given schema or alternatively a schema name which will be loaded from the destination or no schema name which will do the autodiscovery as discussed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK cool. but as discussed we'll need to implement dataset compatible with pipeline dataset (many schemas, different database layout: we support schema separation but it is rarely used)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is good! I think there's a big overlap with dataset and part of pipeline that does the same:
- keeping destinations (also staging - you should IMO include is as optional, sometimes we'll use ie. Athena to do Iceberg but actually open staging filesystem as a data lake)
- keeping a list of schemas on the dataset
- initializing configs, exposing various clients
do you think it makes sense to refactor pipeline right now?
do you think we could keep a dataset instance in the pipeline and just expose some methods from it
the standalone part looks good. I'm not sure if we should go for a single schema or for many schemas dataset?
eb43626
to
9e0147b
Compare
… fixed linter errors, made dataset aware of config types
cbe8ef5
to
213b89c
Compare
760ed95
to
54a9d6a
Compare
54a9d6a
to
d9ab96c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd change the WithState interface to be more explicit and also add schema tests for the filesystem
dlt/common/destination/reference.py
Outdated
@@ -657,8 +659,8 @@ def __exit__( | |||
|
|||
class WithStateSync(ABC): | |||
@abstractmethod | |||
def get_stored_schema(self) -> Optional[StorageSchemaInfo]: | |||
"""Retrieves newest schema from destination storage""" | |||
def get_stored_schema(self, any_schema_name: bool = False) -> Optional[StorageSchemaInfo]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather add a new method but it really do not fit here. this interface assumes that there's a known name of a schema.
my take would be to change signature to
get_stored_schema(self, schema_name: str = None)
if None is specified, we load the newest schema, if name is provided we load the newest schema with given name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I had the same idea but thought it might not be good to change the default behavior of this method. I have changed it now and updated all the places in the code and tests where it is used.
def dataset( | ||
destination: TDestinationReferenceArg, | ||
dataset_name: str, | ||
schema: Union[Schema, str, None] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK cool. but as discussed we'll need to implement dataset compatible with pipeline dataset (many schemas, different database layout: we support schema separation but it is rarely used)
tests/load/test_read_interfaces.py
Outdated
dataset = dlt.dataset( | ||
destination=destination_for_dataset, | ||
dataset_name=pipeline.dataset_name, | ||
schema="wrong_schema_name", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we allow a new schema to be added right? that's why we do not raise when schema is not known? we'll need a better method of adding schemas to a dataset. also to sync schemas etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What actually happens here is that an empty schema is created as a standin, it does not do anything and also does not get saved anywhere. I can change that if you like, but afaik for now it should be ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, that is OK. I just thing that all the code that interacts with destination and now is in the pipeline (ie. schema lists, schema storage etc. probably could go to Dataset at some point)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! ready for merge!
Description
This PR is an example implementation of a dataset factory to build datasets without a pipeline object.