-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathlocal.py
211 lines (164 loc) · 6.4 KB
/
local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import logging
import os
import shutil
import threading
import fsspec
from funcy import cached_property, wrap_prop
from . import system
from .base import FileSystem
from .utils import copyfile, makedirs, move, remove, tmp_fname
logger = logging.getLogger(__name__)
# pylint:disable=abstract-method, arguments-differ
class FsspecLocalFileSystem(fsspec.AbstractFileSystem):
sep = os.sep
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fs = fsspec.filesystem("file")
def makedirs(self, path, exist_ok=False):
self.fs.makedirs(path, exist_ok=exist_ok)
def mkdir(self, path, create_parents=True, **kwargs):
self.fs.mkdir(path, create_parents=create_parents, **kwargs)
def lexists(self, path, **kwargs):
return self.fs.lexists(path, **kwargs)
def exists(self, path, **kwargs):
# TODO: replace this with os.path.exists once the problem is fixed on
# the fsspec https://github.com/intake/filesystem_spec/issues/742
return self.lexists(path)
def checksum(self, path) -> str:
return self.fs.checksum(path)
def info(self, path, **kwargs):
return self.fs.info(path)
def ls(self, path, **kwargs):
return self.fs.ls(path, **kwargs)
def isfile(self, path) -> bool:
return os.path.isfile(path)
def isdir(self, path) -> bool:
return os.path.isdir(path)
def walk(self, path, maxdepth=None, topdown=True, detail=False, **kwargs):
"""Directory fs generator.
See `os.walk` for the docs. Differences:
- no support for symlinks
"""
for root, dirs, files in os.walk(
path,
topdown=topdown,
):
if detail:
dirs_dict = {name: self.info(os.path.join(root, name)) for name in dirs}
files_dict = {
name: self.info(os.path.join(root, name)) for name in files
}
yield (
os.path.normpath(root),
dirs_dict,
files_dict,
)
# NOTE: with os.walk you can modify dirs to avoid walking
# into them. This is us emulating that behaviour for
# dicts returned by detail=True.
dirs[:] = list(dirs_dict.keys())
else:
yield os.path.normpath(root), dirs, files
def find(self, path, **kwargs):
for root, _, files in self.walk(path, **kwargs):
for file in files:
# NOTE: os.path.join is ~5.5 times slower
yield f"{root}{os.sep}{file}"
@classmethod
def _parent(cls, path):
return os.path.dirname(path)
def put_file(self, lpath, rpath, callback=None, **kwargs):
parent = self._parent(rpath)
makedirs(parent, exist_ok=True)
tmp_file = os.path.join(parent, tmp_fname())
copyfile(lpath, tmp_file, callback=callback)
os.replace(tmp_file, rpath)
def get_file(self, rpath, lpath, callback=None, **kwargs):
if self.isdir(rpath):
# emulating fsspec's localfs.get_file
self.makedirs(lpath, exist_ok=True)
return
copyfile(rpath, lpath, callback=callback)
def mv(self, path1, path2, **kwargs):
self.makedirs(self._parent(path2), exist_ok=True)
move(path1, path2)
def rmdir(self, path):
self.fs.rmdir(path)
def rm_file(self, path):
remove(path)
def rm(self, path, recursive=False, maxdepth=None):
if isinstance(path, str):
path = [path]
for p in path:
remove(p)
def cp_file(self, path1, path2, **kwargs):
return self.copy(path1, path2, **kwargs)
def copy(self, path1, path2, recursive=False, on_error=None, **kwargs):
tmp_info = os.path.join(self._parent(path2), tmp_fname(""))
try:
copyfile(path1, tmp_info)
os.rename(tmp_info, path2)
except Exception:
self.rm_file(tmp_info)
raise
def open(self, path, mode="r", encoding=None, **kwargs):
return open(path, mode=mode, encoding=encoding)
def symlink(self, path1, path2):
return self.fs.symlink(path1, path2)
def islink(self, path):
return self.fs.islink(path)
@staticmethod
def is_hardlink(path):
return system.is_hardlink(path)
def link(self, path1, path2):
# If there are a lot of empty files (which happens a lot in datasets),
# and the cache type is `hardlink`, we might reach link limits and
# will get something like: `too many links error`
#
# This is because all those empty files will have the same hash
# (i.e. 68b329da9893e34099c7d8ad5cb9c940), therefore, they will be
# linked to the same file in the cache.
#
# From https://en.wikipedia.org/wiki/Hard_link
# * ext4 limits the number of hard links on a file to 65,000
# * Windows with NTFS has a limit of 1024 hard links on a file
#
# That's why we simply create an empty file rather than a link.
if self.size(path1) == 0:
self.open(path2, "w").close()
logger.debug("Created empty file: %s -> %s", path1, path2)
return
return system.hardlink(path1, path2)
def reflink(self, path1, path2):
return system.reflink(path1, path2)
def created(self, path):
return self.fs.created(path)
def modified(self, path):
return self.fs.modified(path)
class LocalFileSystem(FileSystem):
sep = os.sep
protocol = "local"
PARAM_CHECKSUM = "md5"
PARAM_PATH = "path"
TRAVERSE_PREFIX_LEN = 2
@wrap_prop(threading.Lock())
@cached_property
def fs(self):
return FsspecLocalFileSystem(**self.config)
@cached_property
def path(self):
from .path import LocalFileSystemPath
return LocalFileSystemPath(
self.sep, getcwd=os.getcwd, realpath=os.path.realpath
)
def upload_fobj(self, fobj, to_info, **kwargs):
self.makedirs(self.path.parent(to_info))
tmp_info = self.path.join(self.path.parent(to_info), tmp_fname(""))
try:
with open(tmp_info, "wb+") as fdest:
shutil.copyfileobj(fobj, fdest)
os.rename(tmp_info, to_info)
except Exception:
self.remove(tmp_info)
raise
localfs = LocalFileSystem()