-
Notifications
You must be signed in to change notification settings - Fork 152
/
fsspec.py
179 lines (142 loc) · 6.35 KB
/
fsspec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import os
import posixpath
from typing import Any, Callable, Iterator, List, Optional, Sequence, Tuple, Union
from torch.utils.data.datapipes.utils.common import match_masks
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterableWrapper, IterDataPipe
from torchdata.datapipes.utils import StreamWrapper
try:
import fsspec
except ImportError:
fsspec = None
U = Union[bytes, bytearray, str]
def _assert_fsspec() -> None:
if fsspec is None:
raise ModuleNotFoundError(
"Package `fsspec` is required to be installed to use this datapipe."
"Please use `pip install fsspec` or `conda install -c conda-forge fsspec`"
"to install the package"
)
class FSSpecFileListerIterDataPipe(IterDataPipe[str]):
r"""
Lists the contents of the directory at the provided ``root`` pathname or URL,
and yields the full pathname or URL for each file within the directory.
Args:
root: The root `fsspec` path directory or list of path directories to list files from
masks: Unix style filter string or string list for filtering file name(s)
Example:
>>> from torchdata.datapipes.iter import FSSpecFileLister
>>> datapipe = FSSpecFileLister(root=dir_path)
"""
def __init__(
self,
root: Union[str, Sequence[str], IterDataPipe],
masks: Union[str, List[str]] = "",
) -> None:
_assert_fsspec()
if isinstance(root, str):
root = [
root,
]
if not isinstance(root, IterDataPipe):
self.datapipe: IterDataPipe = IterableWrapper(root) # type: ignore[assignment]
else:
self.datapipe = root
self.masks = masks
def __iter__(self) -> Iterator[str]:
for root in self.datapipe:
fs, path = fsspec.core.url_to_fs(root)
if isinstance(fs.protocol, str):
protocol_list = [fs.protocol]
else:
protocol_list = fs.protocol
is_local = fs.protocol == "file" or not any(root.startswith(protocol) for protocol in protocol_list)
if fs.isfile(path):
yield root
else:
for file_name in fs.ls(path):
if not match_masks(file_name, self.masks):
continue
# ensure the file name has the full fsspec protocol path
if any(file_name.startswith(protocol) for protocol in protocol_list):
yield file_name
else:
if is_local:
abs_path = os.path.join(path, file_name)
else:
abs_path = posixpath.join(path, file_name)
starts_with = False
for protocol in protocol_list:
if root.startswith(protocol):
starts_with = True
yield protocol + "://" + abs_path
break
if not starts_with:
yield abs_path
@functional_datapipe("open_file_by_fsspec")
class FSSpecFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]):
r"""
Opens files from input datapipe which contains `fsspec` paths and yields a tuple of
pathname and opened file stream (functional name: ``open_file_by_fsspec``).
Args:
source_datapipe: Iterable DataPipe that provides the pathnames or URLs
mode: An optional string that specifies the mode in which the file is opened (``"r"`` by default)
Example:
>>> from torchdata.datapipes.iter import FSSpecFileLister
>>> datapipe = FSSpecFileLister(root=dir_path)
>>> file_dp = datapipe.open_file_by_fsspec()
"""
def __init__(self, source_datapipe: IterDataPipe[str], mode: str = "r") -> None:
_assert_fsspec()
self.source_datapipe: IterDataPipe[str] = source_datapipe
self.mode: str = mode
def __iter__(self) -> Iterator[Tuple[str, StreamWrapper]]:
for file_uri in self.source_datapipe:
fs, path = fsspec.core.url_to_fs(file_uri)
file = fs.open(path, self.mode)
yield file_uri, StreamWrapper(file)
def __len__(self) -> int:
return len(self.source_datapipe)
@functional_datapipe("save_by_fsspec")
class FSSpecSaverIterDataPipe(IterDataPipe[str]):
r"""
Takes in a DataPipe of tuples of metadata and data, saves the data to the target
path (generated by the filepath_fn and metadata), and yields the resulting `fsspec`
path (functional name: ``save_by_fsspec``).
Args:
source_datapipe: Iterable DataPipe with tuples of metadata and data
mode: Mode in which the file will be opened for write the data (``"w"`` by default)
filepath_fn: Function that takes in metadata and returns the target path of the new file
Example:
>>> from torchdata.datapipes.iter import IterableWrapper
>>> def filepath_fn(name: str) -> str:
>>> return dir_path + name
>>> name_to_data = {"1.txt": b"DATA1", "2.txt": b"DATA2", "3.txt": b"DATA3"}
>>> source_dp = IterableWrapper(sorted(name_to_data.items()))
>>> fsspec_saver_dp = source_dp.save_by_fsspec(filepath_fn=filepath_fn, mode="wb")
>>> res_file_paths = list(fsspec_saver_dp)
"""
def __init__(
self,
source_datapipe: IterDataPipe[Tuple[Any, U]],
mode: str = "w",
filepath_fn: Optional[Callable] = None,
):
_assert_fsspec()
self.source_datapipe: IterDataPipe[Tuple[Any, U]] = source_datapipe
self.mode: str = mode
self.filepath_fn: Optional[Callable] = filepath_fn
def __iter__(self) -> Iterator[str]:
for meta, data in self.source_datapipe:
filepath = meta if self.filepath_fn is None else self.filepath_fn(meta)
fs, path = fsspec.core.url_to_fs(filepath)
with fs.open(path, self.mode) as f:
f.write(data)
yield filepath
def __len__(self) -> int:
return len(self.source_datapipe)