Skip to content

Commit

Permalink
Merge branch 'devel' into fix/docs/capitalize_toml
Browse files Browse the repository at this point in the history
  • Loading branch information
burnash authored Oct 2, 2024
2 parents 32c7785 + c312fb4 commit 338f3bc
Show file tree
Hide file tree
Showing 23 changed files with 458 additions and 473 deletions.
27 changes: 26 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,31 @@ This destination fully supports [dlt state sync](../../general-usage/state#synci

You will also notice `init` files being present in the root folder and the special `dlt` folders. In the absence of the concepts of schemas and tables in blob storages and directories, `dlt` uses these special files to harmonize the behavior of the `filesystem` destination with the other implemented destinations.

**Note:** When a load generates a new state, for example when using incremental loads, a new state file appears in the `_dlt_pipeline_state` folder at the destination. To prevent data accumulation, state cleanup mechanisms automatically remove old state files, retaining only the latest 100 by default. This cleanup process can be customized or disabled using the filesystem configuration `max_state_files`, which determines the maximum number of pipeline state files to retain (default is 100). Setting this value to 0 or a negative number disables the cleanup of old states.
:::note
When a load generates a new state, for example when using incremental loads, a new state file appears in the `_dlt_pipeline_state` folder at the destination. To prevent data accumulation, state cleanup mechanisms automatically remove old state files, retaining only the latest 100 by default. This cleanup process can be customized or disabled using the filesystem configuration `max_state_files`, which determines the maximum number of pipeline state files to retain (default is 100). Setting this value to 0 or a negative number disables the cleanup of old states.
:::

## Troubleshooting
### File Name Too Long Error
When running your pipeline, you might encounter an error like `[Errno 36] File name too long Error`. This error occurs because the generated file name exceeds the maximum allowed length on your filesystem.

To prevent the file name length error, set the `max_identifier_length` parameter for your destination. This truncates all identifiers (including filenames) to a specified maximum length.
For example:

```py
from dlt.destinations import duckdb

pipeline = dlt.pipeline(
pipeline_name="your_pipeline_name",
destination=duckdb(
max_identifier_length=200, # Adjust the length as needed
),
)
```

:::note
- `max_identifier_length` truncates all identifiers (tables, columns). Ensure the length maintains uniqueness to avoid collisions.
- Adjust `max_identifier_length` based on your data structure and filesystem limits.
:::

<!--@@@DLT_TUBA filesystem-->
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ The filesystem ensures consistent file representation across bucket types and of

#### `FileItem` fields

- `file_url` - complete URL of the file (e.g. `s3://bucket-name/path/file`). This field serves as a primary key.
- `file_url` - complete URL of the file (e.g., `s3://bucket-name/path/file`). This field serves as a primary key.
- `file_name` - name of the file from the bucket URL.
- `relative_path` - set when doing `glob`, is a relative path to a `bucket_url` argument.
- `mime_type` - file's mime type. It is sourced from the bucket provider or inferred from its extension.
- `mime_type` - file's MIME type. It is sourced from the bucket provider or inferred from its extension.
- `modification_date` - file's last modification time (format: `pendulum.DateTime`).
- `size_in_bytes` - file size.
- `file_content` - content, provided upon request.
Expand Down Expand Up @@ -90,7 +90,7 @@ example_xls = filesystem(
bucket_url=BUCKET_URL, file_glob="../directory/example.xlsx"
) | read_excel("example_table") # Pass the data through the transformer to read the "example_table" sheet.

pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb", dataset_name="example_xls_data",)
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb", dataset_name="example_xls_data")
# Execute the pipeline and load the extracted data into the "duckdb" destination.
load_info = pipeline.run(example_xls.with_name("example_xls_data"))
# Print the loading information.
Expand Down Expand Up @@ -119,7 +119,7 @@ def read_xml(items: Iterator[FileItemDict]) -> Iterator[TDataItems]:
for file_obj in items:
# Open the file object.
with file_obj.open() as file:
# Parse the file to dict records
# Parse the file to dict records.
yield xmltodict.parse(file.read())

# Set up the pipeline to fetch a specific XML file from a filesystem (bucket).
Expand All @@ -143,14 +143,14 @@ You can get an fsspec client from the filesystem resource after it was extracted
from dlt.sources.filesystem import filesystem, read_csv
from dlt.sources.filesystem.helpers import fsspec_from_resource

# get filesystem source
# Get filesystem source.
gs_resource = filesystem("gs://ci-test-bucket/")
# extract files
# Extract files.
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
pipeline.run(gs_resource | read_csv())
# get fs client
# Get fs client.
fs_client = fsspec_from_resource(gs_resource)
# do any operation
# Do any operation.
fs_client.ls("ci-test-bucket/standard_source/samples")
```

Expand All @@ -166,31 +166,32 @@ from dlt.common.storages.fsspec_filesystem import FileItemDict
from dlt.sources.filesystem import filesystem

def _copy(item: FileItemDict) -> FileItemDict:
# instantiate fsspec and copy file
# Instantiate fsspec and copy file
dest_file = os.path.join(local_folder, item["file_name"])
# create dest folder
# Create destination folder
os.makedirs(os.path.dirname(dest_file), exist_ok=True)
# download file
# Download file
item.fsspec.download(item["file_url"], dest_file)
# return file item unchanged
# Return file item unchanged
return item

BUCKET_URL = "gs://ci-test-bucket/"

# use recursive glob pattern and add file copy step
# Use recursive glob pattern and add file copy step
downloader = filesystem(BUCKET_URL, file_glob="**").add_map(_copy)

# NOTE: you do not need to load any data to execute extract, below we obtain
# NOTE: You do not need to load any data to execute extract; below, we obtain
# a list of files in a bucket and also copy them locally
listing = list(downloader)
print(listing)
# download to table "listing"
# Download to table "listing"
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(
downloader.with_name("listing"), write_disposition="replace"
)
# pretty print the information on data that was loaded
# Pretty print the information on data that was loaded
print(load_info)
print(listing)
print(pipeline.last_trace.last_normalize_info)
```
```

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Filesystem source allows loading files from remote locations (AWS S3, Google Clo

To load unstructured data (`.pdf`, `.txt`, e-mail), please refer to the [unstructured data source](https://github.com/dlt-hub/verified-sources/tree/master/sources/unstructured_data).

## How Filesystem source works?
## How filesystem source works

The Filesystem source doesn't just give you an easy way to load data from both remote and local files — it also comes with a powerful set of tools that let you customize the loading process to fit your specific needs.

Expand Down Expand Up @@ -54,7 +54,7 @@ To get started with your data pipeline, follow these steps:
dlt init filesystem duckdb
```

[dlt init command](../../../reference/command-line-interface) will initialize
The [dlt init command](../../../reference/command-line-interface) will initialize
[the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/filesystem_pipeline.py)
with the filesystem as the source and [duckdb](../../destinations/duckdb.md) as the destination.

Expand All @@ -66,6 +66,8 @@ To get started with your data pipeline, follow these steps:

## Configuration



### Get credentials

<Tabs
Expand Down Expand Up @@ -132,7 +134,7 @@ dlt supports several authentication methods:
3. Username/Password authentication
4. GSS-API authentication
Learn more about SFTP authentication options in [SFTP section](../../destinations/filesystem#sftp). To obtain credentials, contact your server administrator.
Learn more about SFTP authentication options in the [SFTP section](../../destinations/filesystem#sftp). To obtain credentials, contact your server administrator.
</TabItem>
<TabItem value="local">
Expand All @@ -145,7 +147,7 @@ You don't need any credentials for the local filesystem.

To provide credentials to the filesystem source, you can use [any method available](../../../general-usage/credentials/setup#available-config-providers) in `dlt`.
One of the easiest ways is to use configuration files. The `.dlt` folder in your working directory
contains two files: `config.toml` and `secrets.toml`. Sensitive information, like passwords and
contains two files: `config.toml` and `secrets.toml`. Sensitive information, like passwords and
access tokens, should only be put into `secrets.toml`, while any other configuration, like the path to
a bucket, can be specified in `config.toml`.

Expand Down Expand Up @@ -212,7 +214,7 @@ bucket_url="gs://<bucket_name>/<path_to_files>/"
<TabItem value="sftp">

Learn how to set up SFTP credentials for each authentication method in the [SFTP section](../../destinations/filesystem#sftp).
For example, in case of key-based authentication, you can configure the source the following way:
For example, in the case of key-based authentication, you can configure the source the following way:

```toml
# secrets.toml
Expand All @@ -229,7 +231,7 @@ bucket_url = "sftp://[hostname]/[path]"

<TabItem value="local">

You can use both native local filesystem paths and `file://` URI. Absolute, relative, and UNC Windows paths are supported.
You can use both native local filesystem paths and the `file://` URI. Absolute, relative, and UNC Windows paths are supported.

You could provide an absolute filepath:

Expand All @@ -239,7 +241,7 @@ You could provide an absolute filepath:
bucket_url='file://Users/admin/Documents/csv_files'
```

Or skip the schema and provide the local path in a format native for your operating system. For example, for Windows:
Or skip the schema and provide the local path in a format native to your operating system. For example, for Windows:

```toml
[sources.filesystem]
Expand All @@ -250,7 +252,7 @@ bucket_url='~\Documents\csv_files\'

</Tabs>

You can also specify the credentials using Environment variables. The name of the corresponding environment
You can also specify the credentials using environment variables. The name of the corresponding environment
variable should be slightly different from the corresponding name in the TOML file. Simply replace dots `.` with double
underscores `__`:

Expand All @@ -260,7 +262,7 @@ export SOURCES__FILESYSTEM__AWS_SECRET_ACCESS_KEY = "Please set me up!"
```

:::tip
`dlt` supports more ways of authorizing with the cloud storage, including identity-based
`dlt` supports more ways of authorizing with cloud storage, including identity-based
and default credentials. To learn more about adding credentials to your pipeline, please refer to the
[Configuration and secrets section](../../../general-usage/credentials/complex_types#gcp-credentials).
:::
Expand Down Expand Up @@ -310,7 +312,7 @@ or taken from the config:
Full list of `filesystem` resource parameters:

* `bucket_url` - full URL of the bucket (could be a relative path in the case of the local filesystem).
* `credentials` - cloud storage credentials of `AbstractFilesystem` instance (should be empty for the local filesystem). We recommend not to specify this parameter in the code, but put it in secrets file instead.
* `credentials` - cloud storage credentials of `AbstractFilesystem` instance (should be empty for the local filesystem). We recommend not specifying this parameter in the code, but putting it in a secrets file instead.
* `file_glob` - file filter in glob format. Defaults to listing all non-recursive files in the bucket URL.
* `files_per_page` - number of files processed at once. The default value is `100`.
* `extract_content` - if true, the content of the file will be read and returned in the resource. The default value is `False`.
Expand All @@ -332,15 +334,15 @@ filesystem_pipe = filesystem(

#### Available transformers

- `read_csv()` - process `csv` files using `pandas`
- `read_jsonl()` - process `jsonl` files chuck by chunk
- `read_parquet()` - process `parquet` files using `pyarrow`
- `read_csv_duckdb()` - this transformer process `csv` files using DuckDB, which usually shows better performance, than `pandas`.
- `read_csv()` - processes `csv` files using `pandas`
- `read_jsonl()` - processes `jsonl` files chunk by chunk
- `read_parquet()` - processes `parquet` files using `pyarrow`
- `read_csv_duckdb()` - this transformer processes `csv` files using DuckDB, which usually shows better performance than `pandas`.

:::tip
We advise that you give each resource a
[specific name](../../../general-usage/resource#duplicate-and-rename-resources)
before loading with `pipeline.run`. This will make sure that data goes to a table with the name you
before loading with `pipeline.run`. This will ensure that data goes to a table with the name you
want and that each pipeline uses a
[separate state for incremental loading.](../../../general-usage/state#read-and-write-pipeline-state-in-a-resource)
:::
Expand All @@ -366,7 +368,7 @@ import dlt
from dlt.sources.filesystem import filesystem, read_csv
filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv()
# tell dlt to merge on date
# Tell dlt to merge on date
filesystem_pipe.apply_hints(write_disposition="merge", merge_key="date")
# We load the data into the table_name table
Expand All @@ -380,19 +382,19 @@ print(load_info)
Here are a few simple ways to load your data incrementally:

1. [Load files based on modification date](#load-files-based-on-modification-date). Only load files that have been updated since the last time `dlt` processed them. `dlt` checks the files' metadata (like the modification date) and skips those that haven't changed.
2. [Load new records based on a specific column](#load-new-records-based-on-a-specific-column). You can load only the new or updated records by looking at a specific column, like `updated_at`. Unlike the first method, this approach would read all files every time and then filter the records which was updated.
3. [Combine loading only updated files and records](#combine-loading-only-updated-files-and-records). Finally, you can combine both methods. It could be useful if new records could be added to existing files, so you not only want to filter the modified files, but modified records as well.
2. [Load new records based on a specific column](#load-new-records-based-on-a-specific-column). You can load only the new or updated records by looking at a specific column, like `updated_at`. Unlike the first method, this approach would read all files every time and then filter the records which were updated.
3. [Combine loading only updated files and records](#combine-loading-only-updated-files-and-records). Finally, you can combine both methods. It could be useful if new records could be added to existing files, so you not only want to filter the modified files, but also the modified records.

#### Load files based on modification date
For example, to load only new CSV files with [incremental loading](../../../general-usage/incremental-loading) you can use `apply_hints` method.
For example, to load only new CSV files with [incremental loading](../../../general-usage/incremental-loading), you can use the `apply_hints` method.

```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv
# This configuration will only consider new csv files
# This configuration will only consider new CSV files
new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
# add incremental on modification time
# Add incremental on modification time
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
Expand All @@ -402,13 +404,13 @@ print(load_info)

#### Load new records based on a specific column

In this example we load only new records based on the field called `updated_at`. This method may be useful if you are not able to
filter files by modification date because for example, all files are modified each time new record is appeared.
In this example, we load only new records based on the field called `updated_at`. This method may be useful if you are not able to
filter files by modification date because, for example, all files are modified each time a new record appears.
```py
import dlt
from dlt.sources.filesystem import filesystem, read_csv
# We consider all csv files
# We consider all CSV files
all_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
# But filter out only updated records
Expand All @@ -425,11 +427,11 @@ print(load_info)
import dlt
from dlt.sources.filesystem import filesystem, read_csv
# This configuration will only consider modified csv files
# This configuration will only consider modified CSV files
new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))
# And in each modified file we filter out only updated records
# And in each modified file, we filter out only updated records
filesystem_pipe = (new_files | read_csv())
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
Expand Down Expand Up @@ -459,7 +461,7 @@ print(load_info)
```
:::tip
You could also use `file_glob` to filter files by names. It works very well in simple cases, for example, filtering by extention:
You could also use `file_glob` to filter files by names. It works very well in simple cases, for example, filtering by extension:
```py
from dlt.sources.filesystem import filesystem
Expand Down Expand Up @@ -493,8 +495,8 @@ print(load_info)
Windows supports paths up to 255 characters. When you access a path longer than 255 characters, you'll see a `FileNotFound` exception.
To go over this limit, you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry).
**Note that Python glob does not work with extended UNC paths**, so you will not be able to use them
To go over this limit, you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry).
**Note that Python glob does not work with extended UNC paths**, so you will not be able to use them
```toml
[sources.filesystem]
Expand All @@ -514,4 +516,5 @@ function to configure the resource correctly. Use `**` to include recursive file
filesystem supports full Python [glob](https://docs.python.org/3/library/glob.html#glob.glob) functionality,
while cloud storage supports a restricted `fsspec` [version](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.glob).
<!--@@@DLT_TUBA filesystem-->
<!--@@@DLT_TUBA filesystem-->
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ The Filesystem source allows seamless loading of files from the following locati
* remote filesystem (via SFTP)
* local filesystem

The Filesystem source natively supports `csv`, `parquet`, and `jsonl` files and allows customization for loading any type of structured files.
The Filesystem source natively supports `csv`, `parquet`, and `jsonl` files and allows customization for loading any type of structured file.

import DocCardList from '@theme/DocCardList';

<DocCardList />
<DocCardList />

Loading

0 comments on commit 338f3bc

Please sign in to comment.