diff --git a/.editorconfig b/.editorconfig index f6406ca985..82a6b37b3c 100644 --- a/.editorconfig +++ b/.editorconfig @@ -14,5 +14,5 @@ max_line_length = 200 # Use tabs for indentation (Makefiles require tabs) indent_style = tab -[*.{yaml,yml,js}] +[*.{yaml,yml,js,md}] indent_size = 2 diff --git a/dlt/__init__.py b/dlt/__init__.py index 521d574c2c..4f412fdc59 100644 --- a/dlt/__init__.py +++ b/dlt/__init__.py @@ -4,22 +4,21 @@ 1. Write a pipeline script >>> import dlt - >>> dlt.run(source=my_complicated_json, destination="duckdb") + >>> import requests + >>> dlt.run(requests.get("https://api.chess.com/pub/player/magnuscarlsen/games/2022/11").json()["games"], destination="bigquery", table_name="magnus_games") - 2. Run your pipeline script - $ python my_pipeline.py + 2. Run your pipeline script (no BigQuery default credentials? go here: https://dlthub.com/docs/destinations#google-bigquery) + $ python magnus_games.py - 3. See and use your data - $ dlt pipeline show my_pipeline.py + 3. See and query your data with autogenerated Streamlit app + $ dlt pipeline dlt_magnus_games show - This will auto-generate and run a Streamlit app where you can see the data and the schema - -Or start with our pipeline template with sample chess.com data to bigquery +Or start with our pipeline template with sample chess.com data loaded to bigquery $ dlt init chess bigquery -For more detailed info, see https://dlthub.com/docs +For more detailed info, see https://dlthub.com/docs/getting-started """ from dlt.version import __version__ @@ -35,9 +34,9 @@ pipeline = _pipeline TSecretValue = _TSecretValue -"When typing source/resource function arguments indicates that given argument is a secret and should be taken from dlt.secrets. The value itself is a string" +"When typing source/resource function arguments it indicates that a given argument is a secret and should be taken from dlt.secrets." TCredentials = _CredentialsConfiguration -"When typing source/resource function arguments indicates that given argument represents credentials and should be taken from dlt.secrets. Credentials may be string, dictionaries or any other types." +"When typing source/resource function arguments it indicates that a given argument represents credentials and should be taken from dlt.secrets. Credentials may be a string, dictionary or any other type." diff --git a/dlt/common/configuration/accessors.py b/dlt/common/configuration/accessors.py index 32d998a87a..186f30fcf5 100644 --- a/dlt/common/configuration/accessors.py +++ b/dlt/common/configuration/accessors.py @@ -120,7 +120,7 @@ def default_type(self) -> AnyType: config = _ConfigAccessor() -"""Dictionary-like access to all secrets known to dlt""" +"""Dictionary-like access to all config values to dlt""" secrets = _SecretsAccessor() -"""Dictionary-like access to all config values known to dlt""" +"""Dictionary-like access to all secrets known known to dlt""" diff --git a/dlt/common/configuration/specs/base_configuration.py b/dlt/common/configuration/specs/base_configuration.py index 39e3dead99..d89528e744 100644 --- a/dlt/common/configuration/specs/base_configuration.py +++ b/dlt/common/configuration/specs/base_configuration.py @@ -114,7 +114,7 @@ def parse_native_representation(self, native_value: Any) -> None: """Initialize the configuration fields by parsing the `native_value` which should be a native representation of the configuration or credentials, for example database connection string or JSON serialized GCP service credentials file. - Args: + ### Args: native_value (Any): A native representation of the configuration Raises: diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 76b723dc71..efa6f616c7 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -73,13 +73,17 @@ class TPipelineState(TypedDict, total=False): class SupportsPipeline(Protocol): """A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties""" pipeline_name: str + """Name of the pipeline""" destination: DestinationReference + """The destination reference which is ModuleType. `destination.__name__` returns the name string""" dataset_name: str = None + """Name of the dataset to which pipeline will be loaded to""" runtime_config: RunConfiguration + """A configuration of runtime options like logging level and format and various tracing options""" @property def state(self) -> TPipelineState: - ... + """Returns dictionary with pipeline state""" def run( self, @@ -139,8 +143,8 @@ def __init__(self, deferred_pipeline: Callable[..., SupportsPipeline]) -> None: self._deferred_pipeline = deferred_pipeline -def get_default_working_dir() -> str: - """ Gets default working dir of the pipeline, which may be +def get_default_pipelines_dir() -> str: + """ Gets default directory where pipelines' data will be stored 1. in user home directory ~/.dlt/pipelines/ 2. if current user is root in /var/dlt/pipelines 3. if current user does not have a home directory in /tmp/dlt/pipelines diff --git a/dlt/common/schema/detections.py b/dlt/common/schema/detections.py index 49acabf97b..81aa395e51 100644 --- a/dlt/common/schema/detections.py +++ b/dlt/common/schema/detections.py @@ -8,7 +8,7 @@ _NOW_TS: float = pendulum.now().timestamp() -_FLOAT_TS_RANGE = 31536000.0 # seconds in year +_FLOAT_TS_RANGE = 5 * 31536000.0 # seconds in year def is_timestamp(t: Type[Any], v: Any) -> Optional[TDataType]: diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index 1b62c8a8a7..57431592ca 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -43,6 +43,36 @@ def source(func: None = ..., /, name: str = None, max_table_nesting: int = None, ... def source(func: Optional[AnyFun] = None, /, name: str = None, max_table_nesting: int = None, schema: Schema = None, spec: Type[BaseConfiguration] = None) -> Any: + """A decorator that transforms a function returning one or more `dlt resources` into a `dlt source` in order to load it with `dlt`. + + ### Summary + A `dlt source` is a logical grouping of resources that are often extracted and loaded together. A source is associated with a schema, which describes the structure of the loaded data and provides instructions how to load it. + Such schema contains table schemas that describe the structure of the data coming from the resources. See https://dlthub.com/docs/glossary for more basic term definitions. + + ### Passing credentials + Another important function of the source decorator is to provide credentials and other configuration to the code that extracts data. The decorator may automatically bind the source function arguments to the secret and config values. + >>> def chess(username, chess_url: str = dlt.config.value, api_secret = dlt.secret.value, title: str = "GM"): + >>> return user_profile(username, chess_url, api_secret), user_games(username, chess_url, api_secret, with_titles=title) + >>> + >>> list(chess("magnuscarlsen")) + + Here `username` is a required, explicit python argument, `chess_url` is a required argument, that if not explicitly passed will be taken from configuration ie. `config.toml`, `api_secret` is a required argument, that if not explicitly passed will be taken from dlt secrets ie. `secrets.toml`. + See https://dlthub.com/docs/customization/credentials for details. + + ### Args: + func: A function that returns a dlt resource or a list of those or a list of any data items that can be loaded by `dlt`. + + name (str, optional): A name of the source which is also the name of the associated schema. If not present, the function name will be used. + + max_table_nesting (int, optional): A schema hint that sets the maximum depth of nested table beyond which the remaining nodes are loaded as string. + + schema (Schema, optional): An explicit `Schema` instance to be associated with the source. If not present, `dlt` creates a new `Schema` object with provided `name`. If such `Schema` already exists in the same folder as the module containing the decorated function, such schema will be loaded from file. + + spec (Type[BaseConfiguration], optional): A specification of configuration and secret values required by the source. + + Returns: + `DltSource` instance + """ if name and schema: raise ArgumentsOverloadException("'name' has no effect when `schema` argument is present", source.__name__) @@ -160,7 +190,52 @@ def resource( spec: Type[BaseConfiguration] = None, depends_on: TUnboundDltResource = None ) -> Any: + """When used as a decorator, transforms any generator (yielding) function into a `dlt resource`. When used as a function, it transforms data in `data` argument into a `dlt resource`. + + ### Summary + A `resource`is a location within a `source` that holds the data with specific structure (schema) or coming from specific origin. A resource may be a rest API endpoint, table in the database or a tab in Google Sheets. + A `dlt resource` is python representation of a `resource` that combines both data and metadata (table schema) that describes the structure and instructs the loading of the data. + A `dlt resource` is also an `Iterable` and can used like any other similar object ie. list or tuple. See https://dlthub.com/docs/glossary for more on basic term definitions. + + ### Passing credentials + If used as a decorator (`data` argument is a `Generator`), it may automatically bind the source function arguments to the secret and config values. + >>> def user_games(username, chess_url: str = dlt.config.value, api_secret = dlt.secret.value): + >>> return requests.get("%s/games/%s" % (chess_url, username), headers={"Authorization": f"Bearer {api_secret}"}) + >>> + >>> list(user_games("magnuscarlsen")) + + Here `username` is a required, explicit python argument, `chess_url` is a required argument, that if not explicitly passed will be taken from configuration ie. `config.toml`, `api_secret` is a required argument, that if not explicitly passed will be taken from dlt secrets ie. `secrets.toml`. + See https://dlthub.com/docs/customization/credentials for details. + Note that if decorated function is an inner function, passing of the credentials will be disabled. + + ### Args: + data (Callable | Any, optional): a function to be decorated or a data compatible with `dlt` `run`. + + name (str, optional): A name of the resource that by default also becomes the name of the table to which the data is loaded. + If not present, the name of the decorated function will be used. + + table_name (TTableHintTemplate[str], optional): An table name, if different from `name`. + This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. + + write_disposition (Literal["skip", "append", "replace"], optional): Controls how to write data to a table. `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. . Defaults to "append". + This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. + + columns (Sequence[TColumnSchema], optional): A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema. + This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. + + selected (bool, optional): When `True` `dlt pipeline` will extract and load this resource, if `False`, the resource will be ignored. + + spec (Type[BaseConfiguration], optional): A specification of configuration and secret values required by the source. + + depends_on (TUnboundDltResource, optional): Allows to pipe data from one resource to another to build multi-step pipelines. + + ### Raises + ResourceNameMissing: indicates that name of the resource cannot be inferred from the `data` being passed. + InvalidResourceDataType: indicates that the `data` argument cannot be converted into `dlt resource` + Returns: + DltResource instance which may be loaded, iterated or combined with other resources into a pipeline. + """ def make_resource(_name: str, _data: Any) -> DltResource: table_template = DltResource.new_table_template(table_name or _name, write_disposition=write_disposition, columns=columns) return DltResource.from_data(_data, _name, table_template, selected, cast(DltResource, depends_on)) @@ -214,6 +289,24 @@ def transformer( selected: bool = True, spec: Type[BaseConfiguration] = None ) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], Callable[TResourceFunParams, DltResource]]: + """A form of `dlt resource` that takes input from other resources in order to enrich or transformer the data. + + ### Example + >>> @dlt.resource + >>> def players(title, chess_url=dlt.config.value): + >>> r = requests.get(f"{chess_url}titled/{title}") + >>> yield r.json()["players"] # returns list of player names + >>> + >>> # this resource takes data from players and returns profiles + >>> @dlt.transformer(data_from=players, write_disposition="replace") + >>> def player_profile(player: Any) -> Iterator[TDataItems]: + >>> r = requests.get(f"{chess_url}player/{player}") + >>> r.raise_for_status() + >>> yield r.json() + >>> + >>> list(players("GM") | player_profile) # pipes the data from players into player profile to produce a list of player profiles + + """ f: AnyFun = None # if data_from is a function we are called without parens if inspect.isfunction(data_from): diff --git a/dlt/extract/source.py b/dlt/extract/source.py index 7eff5ebee9..be35b5db7e 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -196,7 +196,6 @@ def add_pipe(self, data: Any) -> None: raise InvalidResourceDataTypeMultiplePipes(self.name, data, type(data)) def select_tables(self, *table_names: Iterable[str]) -> "DltResource": - def _filter(item: TDataItem, meta: Any = None) -> bool: is_in_meta = isinstance(meta, TableNameMeta) and meta.table_name in table_names is_in_dyn = self._table_name_hint_fun and self._table_name_hint_fun(item) in table_names @@ -397,6 +396,17 @@ def __delitem__(self, resource_name: str) -> None: class DltSource(Iterable[TDataItem]): + """Groups several `dlt resources` under a single schema and allows to perform operations on them. + + ### Summary + The instance of this class is created whenever you call the `dlt.source` decorated function. It automates several functions for you: + * You can pass this instance to `dlt` `run` method in order to load all data present in the `dlt resources`. + * You can select and deselect resources that you want to load via `with_resources` method + * You can access the resources (which are `DltResource` instances) as source attributes + * It implements `Iterable` interface so you can get all the data from the resources yourself and without dlt pipeline present. + * You can get the `schema` for the source and all the resources within it. + * You can use a `run` method to load the data with a default instance of dlt pipeline. + """ def __init__(self, name: str, schema: Schema, resources: Sequence[DltResource] = None) -> None: self.name = name self.exhausted = False @@ -408,6 +418,7 @@ def __init__(self, name: str, schema: Schema, resources: Sequence[DltResource] = @classmethod def from_data(cls, name: str, schema: Schema, data: Any) -> "DltSource": + """Converts any `data` supported by `dlt` `run` method into `dlt source` with a name `name` and `schema` schema""" # creates source from various forms of data if isinstance(data, DltSource): return data @@ -423,10 +434,12 @@ def from_data(cls, name: str, schema: Schema, data: Any) -> "DltSource": @property def resources(self) -> DltResourceDict: + """A dictionary of all resources present in the source, where the key is a resource name.""" return self._resources @property def selected_resources(self) -> Dict[str, DltResource]: + """A dictionary of all the resources that are selected to be loaded.""" return self._resources.selected @property @@ -447,11 +460,13 @@ def discover_schema(self) -> Schema: return self._schema def with_resources(self, *resource_names: str) -> "DltSource": + """A convenience method to select one of more resources to be loaded. Returns a source with the specified resources selected.""" self._resources.select(*resource_names) return self @property def run(self) -> SupportsPipelineRun: + """A convenience method that will call `run` run on the currently active `dlt` pipeline. If pipeline instance is not found, one with default settings will be created.""" self_run: SupportsPipelineRun = makefun.partial(Container()[PipelineContext].pipeline().run, *(), data=self) return self_run diff --git a/dlt/normalize/normalize.py b/dlt/normalize/normalize.py index 9b92374d94..d76870fd64 100644 --- a/dlt/normalize/normalize.py +++ b/dlt/normalize/normalize.py @@ -152,6 +152,7 @@ def _w_normalize_chunk(load_storage: LoadStorage, schema: Schema, load_id: str, columns = schema.get_table_columns(table_name) column_schemas[table_name] = columns # store row + # TODO: it is possible to write to single file from many processes using this: https://gitlab.com/warsaw/flufl.lock load_storage.write_data_item(load_id, schema_name, table_name, row, columns) # count total items items_count += 1 diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index 678e46409d..82feee077f 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -1,6 +1,5 @@ -from typing import Sequence, cast +from typing import Sequence, cast, overload -from dlt import __version__ from dlt.common.schema import Schema from dlt.common.schema.typing import TColumnSchema, TWriteDisposition @@ -9,12 +8,71 @@ from dlt.common.configuration.container import Container from dlt.common.configuration.inject import get_orig_args, last_config from dlt.common.destination import DestinationReference, TDestinationReferenceArg -from dlt.common.pipeline import LoadInfo, PipelineContext, get_default_working_dir +from dlt.common.pipeline import LoadInfo, PipelineContext, get_default_pipelines_dir from dlt.pipeline.configuration import PipelineConfiguration, ensure_correct_pipeline_kwargs from dlt.pipeline.pipeline import Pipeline +@overload +def pipeline( + pipeline_name: str = None, + pipelines_dir: str = None, + pipeline_salt: TSecretValue = None, + destination: TDestinationReferenceArg = None, + dataset_name: str = None, + import_schema_path: str = None, + export_schema_path: str = None, + full_refresh: bool = False, + credentials: Any = None +) -> Pipeline: + """Creates a new instance of `dlt` pipeline, which moves the data from the source ie. a REST API and a destination ie. database or a data lake. + + ### Summary + The `pipeline` functions allows you to pass the destination name to which the data should be loaded, the name of the dataset and several other options that govern loading of the data. + The created `Pipeline` object lets you load the data from any source with `run` method or to have more granular control over the loading process with `extract`, `normalize` and `load` methods. + + Please refer to the following doc pages + - Write your first pipeline walkthrough: https://dlthub.com/docs/walkthroughs/create-a-pipeline + - Pipeline architecture and data loading steps: https://dlthub.com/docs/architecture + - List of supported destinations: https://dlthub.com/docs/destinations + + ### Args: + pipeline_name (str, optional): A name of the pipeline that will be used to identify it in monitoring events and to restore its state and data schemas on subsequent runs. + Defaults to the file name of pipeline script with `dlt_` prefix added. + + pipelines_dir (str, optional): A working directory in which pipeline state and temporary files will be stored. Defaults to user home directory: `~/dlt/pipelines/`. + + pipeline_salt (TSecretValue, optional): A random value used for deterministic hashing during data anonymization. Defaults to a value derived from the pipeline name. + Default value should not be used for any cryptographic purposes. + + destination (str | DestinationReference, optional): A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`. + May also be provided to `run` method of the `pipeline`. + + dataset_name (str, optional): A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files. + May also be provided later to the `run` or `load` methods of the `Pipeline`. If not provided at all then defaults to the `pipeline_name` + + import_schema_path (str, optional): A path from which the schema `yaml` file will be imported on each pipeline run. Defaults to None which disables importing. + + export_schema_path (str, optional): A path where the schema `yaml` file will be exported after every schema change. Defaults to None which disables exporting. + + full_refresh (bool, optional): When set to True, each instance of the pipeline with the `pipeline_name` starts from scratch when run and loads the data to a separate dataset. + The datasets are identified by `dataset_name_` + datetime suffix. Use this setting whenever you experiment with your data to be sure you start fresh on each run. Defaults to False. + + credentials (Any, optional): Credentials for the `destination` ie. database connection string or a dictionary with google cloud credentials. + In most cases should be set to None, which lets `dlt` to use `secrets.toml` or environment variables to infer right credentials values. + + ### Returns: + Pipeline: An instance of `Pipeline` class with. Please check the documentation of `run` method for information on what to do with it. + """ + + +@overload +def pipeline() -> Pipeline: # type: ignore + """When called without any arguments, returns the recently created `Pipeline` instance. + If not found, it creates a new instance with all the pipeline options set to defaults.""" + + @with_config(spec=PipelineConfiguration, auto_namespace=True) def pipeline( pipeline_name: str = None, @@ -44,7 +102,7 @@ def pipeline( # if working_dir not provided use temp folder if not pipelines_dir: - pipelines_dir = get_default_working_dir() + pipelines_dir = get_default_pipelines_dir() destination = DestinationReference.from_name(destination or kwargs["destination_name"]) # create new pipeline instance @@ -78,7 +136,7 @@ def attach( ensure_correct_pipeline_kwargs(attach, **kwargs) # if working_dir not provided use temp folder if not pipelines_dir: - pipelines_dir = get_default_working_dir() + pipelines_dir = get_default_pipelines_dir() # create new pipeline instance p = Pipeline(pipeline_name, pipelines_dir, pipeline_salt, None, None, None, None, None, full_refresh, True, last_config(**kwargs), kwargs["runtime"]) # set it as current pipeline @@ -86,10 +144,6 @@ def attach( return p -# setup default pipeline in the container -Container()[PipelineContext] = PipelineContext(pipeline) - - def run( data: Any, *, @@ -101,6 +155,55 @@ def run( columns: Sequence[TColumnSchema] = None, schema: Schema = None ) -> LoadInfo: + """Loads the data from `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`. + + ### Summary + This method will `extract` the data from the `data` argument, infer the schema, `normalize` the data into a load package (ie. jsonl or PARQUET files representing tables) and then `load` such packages into the `destination`. + + The data may be supplied in several forms: + * a `list` or `Iterable` of any JSON-serializable objects ie. `dlt.run([1, 2, 3], table_name="numbers")` + * any `Iterator` or a function that yield (`Generator`) ie. `dlt.run(range(1, 10), table_name="range")` + * a function or a list of functions decorated with @dlt.resource ie. `dlt.run([chess_players(title="GM"), chess_games()])` + * a function or a list of functions decorated with @dlt.source. + + Please note that `dlt` deals with `bytes`, `datetime`, `decimal` and `uuid` objects so you are free to load binary data or documents containing dates. + + ### Execution + The `run` method will first use `sync_destination` method to synchronize pipeline state and schemas with the destination. You can disable this behavior with `restore_from_destination` configuration option. + Next it will make sure that data from the previous is fully processed. If not, `run` method normalizes and loads pending data items. + Only then the new data from `data` argument is extracted, normalized and loaded. + + ### Args: + data (Any): Data to be loaded to destination + + destination (str | DestinationReference, optional): A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`. + If not provided, the value passed to `dlt.pipeline` will be used. + + dataset_name (str, optional):A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files. + If not provided, the value passed to `dlt.pipeline` will be used. If not provided at all then defaults to the `pipeline_name` + + + credentials (Any, optional): Credentials for the `destination` ie. database connection string or a dictionary with google cloud credentials. + In most cases should be set to None, which lets `dlt` to use `secrets.toml` or environment variables to infer right credentials values. + + table_name (str, optional): The name of the table to which the data should be loaded within the `dataset`. This argument is required for a `data` that is a list/Iterable or Iterator without `__name__` attribute. + The behavior of this argument depends on the type of the `data`: + * generator functions: the function name is used as table name, `table_name` overrides this default + * `@dlt.resource`: resource contains the full table schema and that includes the table name. `table_name` will override this property. Use with care! + * `@dlt.source`: source contains several resources each with a table schema. `table_name` will override all table names within the source and load the data into single table. + + write_disposition (Literal["skip", "append", "replace"], optional): Controls how to write data to a table. `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. . Defaults to "append". + Please note that in case of `dlt.resource` the table schema value will be overwritten and in case of `dlt.source`, the values in all resources will be overwritten. + + columns (Sequence[TColumnSchema], optional): A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema. + + schema (Schema, optional): An explicit `Schema` object in which all table schemas will be grouped. By default `dlt` takes the schema from the source (if passed in `data` argument) or creates a default one itself. + + ### Raises: + PipelineStepFailed when a problem happened during `extract`, `normalize` or `load` steps. + ### Returns: + LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo. + """ destination = DestinationReference.from_name(destination) return pipeline().run( data, @@ -113,3 +216,6 @@ def run( schema=schema ) + +# setup default pipeline in the container +Container()[PipelineContext] = PipelineContext(pipeline) diff --git a/dlt/pipeline/configuration.py b/dlt/pipeline/configuration.py index 88487dfe67..c77217833f 100644 --- a/dlt/pipeline/configuration.py +++ b/dlt/pipeline/configuration.py @@ -12,10 +12,14 @@ class PipelineConfiguration(BaseConfiguration): pipeline_name: Optional[str] = None pipelines_dir: Optional[str] = None destination_name: Optional[str] = None + dataset_name: Optional[str] = None pipeline_salt: Optional[TSecretValue] = None restore_from_destination: bool = True + """Enables the `run` method of the `Pipeline` object to restore the pipeline state and schemas from the destination""" enable_runtime_trace: bool = True + """Enables the tracing. Tracing saves the execution trace locally and is required by `dlt deploy`.""" use_single_dataset: bool = True + """Stores all schemas in single dataset. When False, each schema will get a separate dataset with `{dataset_name}_{schema_name}""" runtime: RunConfiguration def on_resolved(self) -> None: diff --git a/dlt/pipeline/state.py b/dlt/pipeline/state.py index 26221ec8eb..af4574cbe5 100644 --- a/dlt/pipeline/state.py +++ b/dlt/pipeline/state.py @@ -130,8 +130,36 @@ def migrate_state(pipeline_name: str, state: DictStrAny, from_engine: int, to_en def state() -> DictStrAny: - """Returns a dictionary with the current source state. Any JSON-serializable values can be written and the read from the state. - The state is persisted after the data is successfully read from the source. + """Returns a dictionary with the source/resource state. Such state is preserved across pipeline runs and may be used to implement incremental loads. + + ### Summary + The state is a python dictionary-like object that is available within the `@dlt.source` and `@dlt.resource` decorated functions and may be read and written to. + The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time. + When using the state: + * Any JSON-serializable values can be written and the read from the state. + * The state available in the `dlt source` is read only and any changes will be discarded. Still it may be used to initialize the resources. + * The state available in the `dlt resource` is writable and written values will be available only once + + ### Example + The most typical use case for the state is to implement incremental load. + >>> @dlt.resource(write_disposition="append") + >>> def players_games(chess_url, players, start_month=None, end_month=None): + >>> checked_archives = dlt.state().setdefault("archives", []) + >>> archives = players_archives(chess_url, players) + >>> for url in archives: + >>> if url in checked_archives: + >>> print(f"skipping archive {url}") + >>> continue + >>> else: + >>> print(f"getting archive {url}") + >>> checked_archives.append(url) + >>> # get the filtered archive + >>> r = requests.get(url) + >>> r.raise_for_status() + >>> yield r.json().get("games", []) + + Here we store all the urls with game archives in the state and we skip loading them on next run. The archives are immutable. The state will grow with the coming months (and more players). + Up to few thousand archives we should be good though. """ global _last_full_state @@ -152,6 +180,7 @@ def state() -> DictStrAny: raise PipelineStateNotAvailable(source_name) else: # get unmanaged state that is read only + # TODO: make sure that state if up to date by syncing the pipeline earlier state = proxy.pipeline().state # type: ignore source_state = state.setdefault("sources", {}) diff --git a/docs/website/docs/command-line-interface.md b/docs/website/docs/command-line-interface.md index f33bbe4bf0..5133f9c7e6 100644 --- a/docs/website/docs/command-line-interface.md +++ b/docs/website/docs/command-line-interface.md @@ -26,4 +26,12 @@ dlt deploy