-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: filesystem delete old pipeline state files #1838
Conversation
✅ Deploy Preview for dlt-hub-docs ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
@@ -520,6 +520,31 @@ def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: s | |||
f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", | |||
) | |||
|
|||
def _cleanup_pipeline_states(self, pipeline_name: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the ticket it says that we should make sure to not delete state files attached to failed loads, but we are not saving state on failed loads, so we should be good here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, indeed. @rudolfix specified:
delete only the state files that corresponds to finished loads (they have corresponding completed entry).
this is to prevent a rate case when we have 100 unsuccessful partial loads and we delete the last right state
Will partial loads store a state file? if not, we can keep the code as it is.
@sh-rp I have implemented the requested changes and added additional tests. I have also documented these tests. The only divergence from your comments concerns the use of None for the integer max_state_files. I encountered some errors with None, so I decided to disable cleanup proces when max_state_files is set to 0 or negative values. This change is properly documented in the codebase. |
@@ -476,7 +476,9 @@ def _to_path_safe_string(self, s: str) -> str: | |||
"""for base64 strings""" | |||
return base64.b64decode(s).hex() if s else None | |||
|
|||
def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str]]]: | |||
def _list_dlt_table_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have changed the bevahior of this function but not updated the other places where it is used, this will no longer just list the files of the current pipeline by default, so please double check that there are no suprising side effects :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function behaviour hasn't change for the other places because pipeline_name
is set to None by default.
And the condition will always be True when pipeline_name is None - returning all the files wihtout filtering. The functions _iter_stored_schema_files
and _list_dlt_table_files
aren't affected by it.
# Filters only if pipeline_name provided
if pipeline_name is None or fileparts[0] == pipeline_name:
yield filepath, filepart
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very good, just one concern mentioned above. Please also update the filesystem docs and this is a breaking change as old state files will be deleted automatically after this merge.
Description
Keep only the latest 100 pipeline state files
Related Issues