Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filesystem delete old pipeline state files #1838

Merged
merged 16 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class FilesystemConfiguration(BaseConfiguration):
kwargs: Optional[DictStrAny] = None
client_kwargs: Optional[DictStrAny] = None
deltalake_storage_options: Optional[DictStrAny] = None
max_state_files: int = 100
"""Maximum number of pipeline state files to keep; 0 or negative value disables cleanup."""

@property
def protocol(self) -> str:
Expand Down
39 changes: 36 additions & 3 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import base64

from types import TracebackType
from typing import Dict, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast
from typing import Dict, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast, Any
from fsspec import AbstractFileSystem
from contextlib import contextmanager

Expand Down Expand Up @@ -479,7 +479,9 @@ def _to_path_safe_string(self, s: str) -> str:
"""for base64 strings"""
return base64.b64decode(s).hex() if s else None

def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str]]]:
def _list_dlt_table_files(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have changed the bevahior of this function but not updated the other places where it is used, this will no longer just list the files of the current pipeline by default, so please double check that there are no suprising side effects :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function behaviour hasn't change for the other places because pipeline_name is set to None by default.

And the condition will always be True when pipeline_name is None - returning all the files wihtout filtering. The functions _iter_stored_schema_files and _list_dlt_table_files aren't affected by it.

# Filters only if pipeline_name provided
if pipeline_name is None or fileparts[0] == pipeline_name:
    yield filepath, filepart

self, table_name: str, pipeline_name: str = None
) -> Iterator[Tuple[str, List[str]]]:
dirname = self.get_table_dir(table_name)
if not self.fs_client.exists(self.pathlib.join(dirname, INIT_FILE_NAME)):
raise DestinationUndefinedEntity({"dir": dirname})
Expand All @@ -488,7 +490,9 @@ def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str
fileparts = filename.split(FILENAME_SEPARATOR)
if len(fileparts) != 3:
continue
yield filepath, fileparts
# Filters only if pipeline_name provided
if pipeline_name is None or fileparts[0] == pipeline_name:
yield filepath, fileparts

def _store_load(self, load_id: str) -> None:
# write entry to load "table"
Expand Down Expand Up @@ -523,6 +527,31 @@ def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: s
f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)

def _cleanup_pipeline_states(self, pipeline_name: str) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the ticket it says that we should make sure to not delete state files attached to failed loads, but we are not saving state on failed loads, so we should be good here.

Copy link
Collaborator Author

@donotpush donotpush Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed. @rudolfix specified:

delete only the state files that corresponds to finished loads (they have corresponding completed entry). 
this is to prevent a rate case when we have 100 unsuccessful partial loads and we delete the last right state

Will partial loads store a state file? if not, we can keep the code as it is.

state_table_files = list(
self._list_dlt_table_files(self.schema.state_table_name, pipeline_name)
)

if len(state_table_files) > self.config.max_state_files:
# filter and collect a list of state files
state_file_info: List[Dict[str, Any]] = [
{
"load_id": float(fileparts[1]), # convert load_id to float for comparison
"filepath": filepath,
}
for filepath, fileparts in state_table_files
]

# sort state file info by load_id in descending order
state_file_info.sort(key=lambda x: x["load_id"], reverse=True)

# keeping only the most recent MAX_STATE_HISTORY files
files_to_delete = state_file_info[self.config.max_state_files :]

# delete the old files
for file_info in files_to_delete:
donotpush marked this conversation as resolved.
Show resolved Hide resolved
self._delete_file(file_info["filepath"])

def _store_current_state(self, load_id: str) -> None:
# don't save the state this way when used as staging
if self.config.as_staging_destination:
Expand All @@ -542,6 +571,10 @@ def _store_current_state(self, load_id: str) -> None:
# write
self._write_to_json_file(hash_path, cast(DictStrAny, pipeline_state_doc))

# perform state cleanup only if max_state_files is set to a positive value
if self.config.max_state_files >= 1:
self._cleanup_pipeline_states(pipeline_name)

def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
# search newest state
selected_path = None
Expand Down
3 changes: 2 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -697,5 +697,6 @@ This destination fully supports [dlt state sync](../../general-usage/state#synci

You will also notice `init` files being present in the root folder and the special `dlt` folders. In the absence of the concepts of schemas and tables in blob storages and directories, `dlt` uses these special files to harmonize the behavior of the `filesystem` destination with the other implemented destinations.

<!--@@@DLT_TUBA filesystem-->
**Note:** When a load generates a new state, for example when using incremental loads, a new state file appears in the `_dlt_pipeline_state` folder at the destination. To prevent data accumulation, state cleanup mechanisms automatically remove old state files, retaining only the latest 100 by default. This cleanup process can be customized or disabled using the filesystem configuration `max_state_files`, which determines the maximum number of pipeline state files to retain (default is 100). Setting this value to 0 or a negative number disables the cleanup of old states.

<!--@@@DLT_TUBA filesystem-->
2 changes: 2 additions & 0 deletions tests/load/filesystem/test_filesystem_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def test_filesystem_configuration() -> None:
"bucket_url": "az://root",
"credentials": None,
"client_kwargs": None,
"max_state_files": 100,
"kwargs": None,
"deltalake_storage_options": None,
}
Expand Down Expand Up @@ -173,6 +174,7 @@ def test_filesystem_configuration_with_additional_arguments() -> None:
"read_only": False,
"bucket_url": "az://root",
"credentials": None,
"max_state_files": 100,
"kwargs": {"use_ssl": True},
"client_kwargs": {"verify": "public.crt"},
"deltalake_storage_options": {"AWS_S3_LOCKING_PROVIDER": "dynamodb"},
Expand Down
184 changes: 184 additions & 0 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dlt.common.storages.load_package import ParsedLoadJobFileName
from dlt.common.utils import uniq_id
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.configuration.exceptions import ConfigurationValueError
from dlt.destinations import filesystem
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient
from dlt.destinations.impl.filesystem.typing import TExtraPlaceholders
Expand Down Expand Up @@ -1334,3 +1335,186 @@ def table_3():
# test truncate multiple
fs_client.truncate_tables(["table_1", "table_3"])
assert load_table_counts(p, "table_1", "table_2", "table_3") == {"table_2": 21}


@pytest.mark.parametrize(
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_cleanup_states_by_load_id(destination_config: DestinationTestConfiguration) -> None:
"""
Test the pipeline state cleanup functionality by verifying that old state files are removed based on `load_id` when multiple loads are executed.

Specifically, the oldest state file (corresponding to the first `load_id`) should be deleted.

This test checks that when running a pipeline with a resource that produces incremental data, older state files are cleared according to the `max_state_files` setting.

Steps:
1. Set `max_state_files` to 2, allowing only two newest state files to be kept.
2. Run the pipeline three times.
3. Verify that the state file from the first load is no longer present in the state table.
"""

dataset_name = f"{destination_config.destination_name}{uniq_id()}"
p = destination_config.setup_pipeline("p1", dataset_name=dataset_name)

@dlt.resource(name="items", primary_key="id")
def r1(_=dlt.sources.incremental("id")):
yield from [{"id": 0}]

@dlt.resource(name="items", primary_key="id")
def r2(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}]

@dlt.resource(name="items", primary_key="id")
def r3(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}]

os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(2)

info = p.run(r1)
first_load_id = info.loads_ids[0]

info = p.run(r2)
second_load_id = [load_id for load_id in info.loads_ids if load_id != first_load_id][0]

info = p.run(r3)
third_load_id = [
load_id
for load_id in info.loads_ids
if load_id != first_load_id and load_id != second_load_id
][0]

client: FilesystemClient = p.destination_client() # type: ignore
state_table_files = list(client._list_dlt_table_files(client.schema.state_table_name, "p1"))

assert not any(fileparts[1] == first_load_id for _, fileparts in state_table_files)
assert any(fileparts[1] == second_load_id for _, fileparts in state_table_files)
assert any(fileparts[1] == third_load_id for _, fileparts in state_table_files)


@pytest.mark.parametrize(
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("max_state_files", [-1, 0, 1, 3])
def test_cleanup_states(
destination_config: DestinationTestConfiguration, max_state_files: int
) -> None:
"""
Test the behavior of pipeline state cleanup based on different max_state_files configurations.

Steps:
1. Run the pipeline five times with max_state_files set to -1, 0, 1, and 3.
2. Verify that state files are cleaned or retained according to the max_state_files setting:
- Negative or zero values disable cleanup.
- Positive values trigger cleanup, keeping only the specified number of state files.
"""
os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(max_state_files)

dataset_name = f"{destination_config.destination_name}{uniq_id()}"
p = destination_config.setup_pipeline("p1", dataset_name=dataset_name)

@dlt.resource(name="items", primary_key="id")
def r1(_=dlt.sources.incremental("id")):
yield from [{"id": 0}]

@dlt.resource(name="items", primary_key="id")
def r2(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}]

@dlt.resource(name="items", primary_key="id")
def r3(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}]

@dlt.resource(name="items", primary_key="id")
def r4(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]

@dlt.resource(name="items", primary_key="id")
def r5(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}]

# run pipeline
run_count = 5

p.run(r1)
p.run(r2)
p.run(r3)
p.run(r4)
p.run(r5)

client: FilesystemClient = p.destination_client() # type: ignore
state_table_files = list(client._list_dlt_table_files(client.schema.state_table_name, "p1"))

if max_state_files == -1 or max_state_files == 0:
assert len(state_table_files) == run_count
else:
assert len(state_table_files) == max_state_files


@pytest.mark.parametrize(
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_cleanup_states_shared_dataset(destination_config: DestinationTestConfiguration) -> None:
"""
Test that two pipelines sharing the same bucket_url and dataset_name can independently
clean their _dlt_pipeline_state files with different max_state_files configurations.

Steps:
1. Run pipeline p1 five times with max_state_files set to 5.
2. Run pipeline p2 five times with max_state_files set to 2.
3. Verify that each pipeline only deletes its own state files and does not affect the other.
"""
dataset_name = f"{destination_config.destination_name}{uniq_id()}"

p1 = destination_config.setup_pipeline("p1", dataset_name=dataset_name)
p2 = destination_config.setup_pipeline("p2", dataset_name=dataset_name)

@dlt.resource(name="items", primary_key="id")
def r1(_=dlt.sources.incremental("id")):
yield from [{"id": 0}]

@dlt.resource(name="items", primary_key="id")
def r2(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}]

@dlt.resource(name="items", primary_key="id")
def r3(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}]

@dlt.resource(name="items", primary_key="id")
def r4(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]

@dlt.resource(name="items", primary_key="id")
def r5(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}]

os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(5)
p1.run(r1)
p1.run(r2)
p1.run(r3)
p1.run(r4)
p1.run(r5)

os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(2)
p2.run(r1)
p2.run(r2)
p2.run(r3)
p2.run(r4)
p2.run(r5)

p1_client: FilesystemClient = p1.destination_client() # type: ignore
p1_state_files = list(p1_client._list_dlt_table_files(p1_client.schema.state_table_name, "p1"))

p2_client: FilesystemClient = p2.destination_client() # type: ignore
p2_state_files = list(p2_client._list_dlt_table_files(p2_client.schema.state_table_name, "p2"))

assert len(p1_state_files) == 5

assert len(p2_state_files) == 2
Loading