-
Notifications
You must be signed in to change notification settings - Fork 196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose staging tables truncation to config #1717
Changes from all commits
261dbcc
5dcdcf6
6fd36ac
46e4117
6a2bdd2
e65f853
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,36 +1,33 @@ | ||
--- | ||
title: Staging | ||
description: Configure an s3 or gcs bucket for staging before copying into the destination | ||
description: Configure an S3 or GCS bucket for staging before copying into the destination | ||
keywords: [staging, destination] | ||
--- | ||
# Staging | ||
|
||
The goal of staging is to bring the data closer to the database engine so the modification of the destination (final) dataset happens faster and without errors. `dlt`, when asked, creates two | ||
staging areas: | ||
The goal of staging is to bring the data closer to the database engine so that the modification of the destination (final) dataset happens faster and without errors. `dlt`, when asked, creates two staging areas: | ||
1. A **staging dataset** used by the [merge and replace loads](../general-usage/incremental-loading.md#merge-incremental_loading) to deduplicate and merge data with the destination. | ||
2. A **staging storage** which is typically a s3/gcp bucket where [loader files](file-formats/) are copied before they are loaded by the destination. | ||
2. A **staging storage** which is typically an S3/GCP bucket where [loader files](file-formats/) are copied before they are loaded by the destination. | ||
|
||
## Staging dataset | ||
`dlt` creates a staging dataset when write disposition of any of the loaded resources requires it. It creates and migrates required tables exactly like for the | ||
main dataset. Data in staging tables is truncated when load step begins and only for tables that will participate in it. | ||
Such staging dataset has the same name as the dataset passed to `dlt.pipeline` but with `_staging` suffix in the name. Alternatively, you can provide your own staging dataset pattern or use a fixed name, identical for all the | ||
configured datasets. | ||
`dlt` creates a staging dataset when the write disposition of any of the loaded resources requires it. It creates and migrates required tables exactly like for the main dataset. Data in staging tables is truncated when the load step begins and only for tables that will participate in it. | ||
Such a staging dataset has the same name as the dataset passed to `dlt.pipeline` but with a `_staging` suffix in the name. Alternatively, you can provide your own staging dataset pattern or use a fixed name, identical for all the configured datasets. | ||
```toml | ||
[destination.postgres] | ||
staging_dataset_name_layout="staging_%s" | ||
``` | ||
Entry above switches the pattern to `staging_` prefix and for example for dataset with name **github_data** `dlt` will create **staging_github_data**. | ||
The entry above switches the pattern to `staging_` prefix and for example, for a dataset with the name **github_data**, `dlt` will create **staging_github_data**. | ||
|
||
To configure static staging dataset name, you can do the following (we use destination factory) | ||
To configure a static staging dataset name, you can do the following (we use the destination factory) | ||
```py | ||
import dlt | ||
|
||
dest_ = dlt.destinations.postgres(staging_dataset_name_layout="_dlt_staging") | ||
``` | ||
All pipelines using `dest_` as destination will use **staging_dataset** to store staging tables. Make sure that your pipelines are not overwriting each other's tables. | ||
All pipelines using `dest_` as the destination will use the **staging_dataset** to store staging tables. Make sure that your pipelines are not overwriting each other's tables. | ||
|
||
### Cleanup up staging dataset automatically | ||
`dlt` does not truncate tables in staging dataset at the end of the load. Data that is left after contains all the extracted data and may be useful for debugging. | ||
### Cleanup staging dataset automatically | ||
`dlt` does not truncate tables in the staging dataset at the end of the load. Data that is left after contains all the extracted data and may be useful for debugging. | ||
If you prefer to truncate it, put the following line in `config.toml`: | ||
|
||
```toml | ||
|
@@ -39,19 +36,23 @@ truncate_staging_dataset=true | |
``` | ||
|
||
## Staging storage | ||
`dlt` allows to chain destinations where the first one (`staging`) is responsible for uploading the files from local filesystem to the remote storage. It then generates followup jobs for the second destination that (typically) copy the files from remote storage into destination. | ||
`dlt` allows chaining destinations where the first one (`staging`) is responsible for uploading the files from the local filesystem to the remote storage. It then generates follow-up jobs for the second destination that (typically) copy the files from remote storage into the destination. | ||
|
||
Currently, only one destination the [filesystem](destinations/filesystem.md) can be used as a staging. Following destinations can copy remote files: | ||
1. [Redshift.](destinations/redshift.md#staging-support) | ||
2. [Bigquery.](destinations/bigquery.md#staging-support) | ||
3. [Snowflake.](destinations/snowflake.md#staging-support) | ||
Currently, only one destination, the [filesystem](destinations/filesystem.md), can be used as staging. The following destinations can copy remote files: | ||
|
||
1. [Azure Synapse](destinations/synapse#staging-support) | ||
1. [Athena](destinations/athena#staging-support) | ||
1. [Bigquery](destinations/bigquery.md#staging-support) | ||
1. [Dremio](destinations/dremio#staging-support) | ||
1. [Redshift](destinations/redshift.md#staging-support) | ||
1. [Snowflake](destinations/snowflake.md#staging-support) | ||
|
||
### How to use | ||
In essence, you need to set up two destinations and then pass them to `dlt.pipeline`. Below we'll use `filesystem` staging with `parquet` files to load into `Redshift` destination. | ||
In essence, you need to set up two destinations and then pass them to `dlt.pipeline`. Below we'll use `filesystem` staging with `parquet` files to load into the `Redshift` destination. | ||
|
||
1. **Set up the s3 bucket and filesystem staging.** | ||
1. **Set up the S3 bucket and filesystem staging.** | ||
|
||
Please follow our guide in [filesystem destination documentation](destinations/filesystem.md). Test the staging as standalone destination to make sure that files go where you want them. In your `secrets.toml` you should now have a working `filesystem` configuration: | ||
Please follow our guide in the [filesystem destination documentation](destinations/filesystem.md). Test the staging as a standalone destination to make sure that files go where you want them. In your `secrets.toml`, you should now have a working `filesystem` configuration: | ||
```toml | ||
[destination.filesystem] | ||
bucket_url = "s3://[your_bucket_name]" # replace with your bucket name, | ||
|
@@ -63,31 +64,31 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel | |
|
||
2. **Set up the Redshift destination.** | ||
|
||
Please follow our guide in [redshift destination documentation](destinations/redshift.md). In your `secrets.toml` you added: | ||
Please follow our guide in the [redshift destination documentation](destinations/redshift.md). In your `secrets.toml`, you added: | ||
```toml | ||
# keep it at the top of your toml file! before any section starts | ||
destination.redshift.credentials="redshift://loader:<password>@localhost/dlt_data?connect_timeout=15" | ||
``` | ||
|
||
3. **Authorize Redshift cluster to access the staging bucket.** | ||
3. **Authorize the Redshift cluster to access the staging bucket.** | ||
|
||
By default `dlt` will forward the credentials configured for `filesystem` to the `Redshift` COPY command. If you are fine with this, move to the next step. | ||
By default, `dlt` will forward the credentials configured for `filesystem` to the `Redshift` COPY command. If you are fine with this, move to the next step. | ||
|
||
4. **Chain staging to destination and request `parquet` file format.** | ||
|
||
Pass the `staging` argument to `dlt.pipeline`. It works like the destination `argument`: | ||
```py | ||
# Create a dlt pipeline that will load | ||
# chess player data to the redshift destination | ||
# via staging on s3 | ||
# via staging on S3 | ||
pipeline = dlt.pipeline( | ||
pipeline_name='chess_pipeline', | ||
destination='redshift', | ||
staging='filesystem', # add this to activate the staging location | ||
dataset_name='player_data' | ||
) | ||
``` | ||
`dlt` will automatically select an appropriate loader file format for the staging files. Below we explicitly specify `parquet` file format (just to demonstrate how to do it): | ||
`dlt` will automatically select an appropriate loader file format for the staging files. Below we explicitly specify the `parquet` file format (just to demonstrate how to do it): | ||
```py | ||
info = pipeline.run(chess(), loader_file_format="parquet") | ||
``` | ||
|
@@ -96,4 +97,21 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel | |
|
||
Run the pipeline script as usual. | ||
|
||
> 💡 Please note that `dlt` does not delete loaded files from the staging storage after the load is complete. | ||
:::tip | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd run this whole new section through chatgpt to grammar correct (or run the grammar checker on this page) |
||
Please note that `dlt` does not delete loaded files from the staging storage after the load is complete, but it truncates previously loaded files. | ||
::: | ||
|
||
### How to prevent staging files truncation | ||
|
||
Before `dlt` loads data to the staging storage, it truncates previously loaded files. To prevent it and keep the whole history | ||
of loaded files, you can use the following parameter: | ||
|
||
```toml | ||
[destination.redshift] | ||
truncate_table_before_load_on_staging_destination=false | ||
``` | ||
|
||
:::caution | ||
The [Athena](destinations/athena#staging-support) destination only truncates not iceberg tables with `replace` merge_disposition. | ||
Therefore, the parameter `truncate_table_before_load_on_staging_destination` only controls the truncation of corresponding files for these tables. | ||
::: |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,12 @@ | ||
import pytest | ||
from typing import Dict, Any, List | ||
from typing import List | ||
|
||
import dlt, os | ||
from dlt.common import json, sleep | ||
from copy import deepcopy | ||
from dlt.common import json | ||
from dlt.common.storages.configuration import FilesystemConfiguration | ||
from dlt.common.utils import uniq_id | ||
from dlt.common.schema.typing import TDataType | ||
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient | ||
|
||
from tests.load.pipeline.test_merge_disposition import github | ||
from tests.pipeline.utils import load_table_counts, assert_load_info | ||
|
@@ -40,6 +40,13 @@ def load_modified_issues(): | |
yield from issues | ||
|
||
|
||
@dlt.resource(table_name="events", write_disposition="append", primary_key="timestamp") | ||
def event_many_load_2(): | ||
with open("tests/normalize/cases/event.event.many_load_2.json", "r", encoding="utf-8") as f: | ||
events = json.load(f) | ||
yield from events | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name | ||
) | ||
|
@@ -183,6 +190,50 @@ def test_staging_load(destination_config: DestinationTestConfiguration) -> None: | |
assert replace_counts == initial_counts | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name | ||
) | ||
def test_truncate_staging_dataset(destination_config: DestinationTestConfiguration) -> None: | ||
"""This test checks if tables truncation on staging destination done according to the configuration. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the test is good, but to make it great we should also test wether keeping the staging files around will make the data be loaded again although it shouldn't. but for now i'd say it's good. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, should be easy to do |
||
|
||
Test loads data to the destination three times: | ||
* with truncation | ||
* without truncation (after this 2 staging files should be left) | ||
* with truncation (after this 1 staging file should be left) | ||
""" | ||
pipeline = destination_config.setup_pipeline( | ||
pipeline_name="test_stage_loading", dataset_name="test_staging_load" + uniq_id() | ||
) | ||
resource = event_many_load_2() | ||
table_name: str = resource.table_name # type: ignore[assignment] | ||
|
||
# load the data, files stay on the stage after the load | ||
info = pipeline.run(resource) | ||
assert_load_info(info) | ||
|
||
# load the data without truncating of the staging, should see two files on staging | ||
pipeline.destination.config_params["truncate_tables_on_staging_destination_before_load"] = False | ||
info = pipeline.run(resource) | ||
assert_load_info(info) | ||
# check there are two staging files | ||
_, staging_client = pipeline._get_destination_clients(pipeline.default_schema) | ||
with staging_client: | ||
assert len(staging_client.list_table_files(table_name)) == 2 # type: ignore[attr-defined] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note for later: we should probably allow fs_client on the pipeline to also return the staging filesystem client with a flag. |
||
|
||
# load the data with truncating, so only new file is on the staging | ||
pipeline.destination.config_params["truncate_tables_on_staging_destination_before_load"] = True | ||
info = pipeline.run(resource) | ||
assert_load_info(info) | ||
# check that table exists in the destination | ||
with pipeline.sql_client() as sql_client: | ||
qual_name = sql_client.make_qualified_table_name | ||
assert len(sql_client.execute_sql(f"SELECT * from {qual_name(table_name)}")) > 4 | ||
# check there is only one staging file | ||
_, staging_client = pipeline._get_destination_clients(pipeline.default_schema) | ||
with staging_client: | ||
assert len(staging_client.list_table_files(table_name)) == 1 # type: ignore[attr-defined] | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imho having the same exact line in all implementations is not DRY. We can keep it like this for now, but I would rather implement this in the superclass, probably with hasattr and isinstance to get the config and verify the type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you see the previous version? at least here this is simple and mypy forces you to do overrides. previously you had to setup cooperative calling of super() and keep state in the class that should be an interface/trait. also if you forgot about that, the config would have no effect and you'll never know about it