Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New storage APIs #536

Merged
merged 5 commits into from
Dec 15, 2023
Merged

New storage APIs #536

merged 5 commits into from
Dec 15, 2023

Conversation

knighton
Copy link
Contributor

@knighton knighton commented Dec 15, 2023

Layer some new abstractions on top of, and alongside, the existing Streaming storage APIs.

These changes are made use of by Delta/Parquet streaming. Breaking them out into their own focused PR here for everyone's reviewing sanity.

There is a general trend or purpose here to move some of the complex low-level file management out of StreamingDataset and into the storage APIs, simplifying the core StreamingDataset logic where we can.

Also wrapped some behavior that was sus for my purposes, but I do not pretend to know all the ways the storage APIs will be used, now and in the future.

Also wait_for_file_to_exist has a new home in this PR.

If we are able to "stream"-line or otherwise better organize this code later, all the better.

For now, let's keep going toward Delta...

def wait_for_file_to_exist(filename: str, poll_interval: float, timeout: float,
                           err_msg: str) -> None:

def _normalize_path(path: str) -> Tuple[str, bool]:

def _normalize_dir(dirname: str) -> str:

def walk_prefix(prefix: str) -> List[str]:

def walk_dir(root: str) -> List[str]:

def _filter(keep: Optional[Union[str, Pattern, Callable[[str], bool]]],
            paths: Optional[Iterable[str]]) -> Iterable[str]:

def _get_overlap(want: Set[str], have: Set[str]) -> Dict[str, Any]:

def list_dataset_files(
        *,
        local: str,
        remote: Optional[str] = None,
        split: Optional[str] = None,
        paths: Optional[Iterable[str]] = None,
        keep: Optional[Union[str, Pattern, Callable[[str], bool]]] = None) -> List[str]:

def smart_download_file(*,
                        remote: str,
                        local: str,
                        timeout: Union[float, str] = 60,
                        size: Optional[Union[int, str]] = None,
                        max_size: Optional[Union[int, str]] = None,
                        hashes: Optional[Dict[str, str]] = None) -> None:

def file_exists(*,
                path: str,
                local: str,
                remote: Optional[str] = None,
                split: Optional[str] = None) -> bool:

@knighton knighton merged commit 02bd910 into dev Dec 15, 2023
6 checks passed
@knighton knighton deleted the james/new-storage-apis branch December 15, 2023 10:42
karan6181 pushed a commit that referenced this pull request Jan 26, 2024
* New storage APIs.

* Potentially fix import issue.

* Fix (path).

* Fix (paths).

* Fix (paths).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant