forked from fsspec/universal_pathlib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_flavour.py
147 lines (125 loc) · 4.16 KB
/
_flavour.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import ntpath
import os
import posixpath
from os import PathLike
from os import stat_result
from typing import AnyStr
from fsspec import get_filesystem_class
from fsspec.core import split_protocol
from fsspec.utils import stringify_path
from upath._types import PathlibFlavour
from upath._types import StrOrBytesPath
__all__ = [
"fsspecpath",
]
def _ensure_str(s: str | bytes) -> str:
if isinstance(s, str):
return s
else:
return s.decode()
@object.__new__
class fsspecpath(PathlibFlavour):
__slots__ = ()
sep = "/"
altsep = None
@staticmethod
def splitroot(path: PathLike[AnyStr]) -> tuple[AnyStr, AnyStr, AnyStr]:
protocol, pth = split_protocol(path)
root_marker = get_filesystem_class(protocol).root_marker
has_root = pth.startswith(root_marker)
if has_root:
pth = pth[len(root_marker) :]
if protocol is None and not has_root:
out = "", "", _ensure_str(pth)
elif protocol is None and has_root:
out = "", f"file://{root_marker}", _ensure_str(pth)
else:
out = "", f"{protocol}://{root_marker}", _ensure_str(pth)
return out # type: ignore
@staticmethod
def join(__a: PathLike[AnyStr], *paths: PathLike[AnyStr]) -> AnyStr:
protocol, pth = split_protocol(__a)
joined = os.path.join(pth, *paths)
if protocol is None:
return joined
else:
return f"{protocol}://{joined}" # type: ignore
@staticmethod
def splitdrive(p: PathLike[AnyStr]) -> tuple[AnyStr, AnyStr]:
protocol, pth = split_protocol(p)
if protocol is None:
return os.path.splitdrive(pth)
else:
return "", os.fspath(p) # type: ignore
@staticmethod
def normcase(s: PathLike[AnyStr]) -> AnyStr:
protocol, pth = split_protocol(s)
if protocol is None:
if os.name == "nt":
return ntpath.normcase(pth)
else:
return posixpath.normcase(pth)
else:
return os.fspath(s)
@staticmethod
def isabs(s: StrOrBytesPath) -> bool:
protocol, pth = split_protocol(s)
if protocol is None or protocol == "file":
if os.name == "nt":
return ntpath.isabs(pth)
else:
return posixpath.isabs(pth)
else:
return True
@staticmethod
def ismount(path: StrOrBytesPath) -> bool:
protocol, pth = split_protocol(path)
if protocol is None or protocol == "file":
if os.name == "nt":
return ntpath.ismount(pth)
else:
return posixpath.ismount(pth)
else:
return False
@staticmethod
def isjunction(path: StrOrBytesPath) -> bool:
return False
@staticmethod
def samestat(s1: stat_result, s2: stat_result) -> bool:
return os.path.samestat(s1, s2)
@staticmethod
def abspath(path: PathLike[AnyStr]) -> AnyStr:
protocol, pth = split_protocol(path)
if protocol is None or protocol == "file":
if os.name == "nt":
return ntpath.abspath(pth)
else:
return posixpath.abspath(pth)
else:
return stringify_path(path)
@staticmethod
def realpath(filename: PathLike[AnyStr], *, strict: bool = False) -> AnyStr:
protocol, pth = split_protocol(filename)
if protocol is None or protocol == "file":
if os.name == "nt":
return ntpath.realpath(pth)
else:
return posixpath.realpath(pth)
else:
return stringify_path(filename)
@staticmethod
def expanduser(path: AnyStr) -> AnyStr:
protocol, pth = split_protocol(path)
if protocol is None or protocol == "file":
if os.name == "nt":
return ntpath.expanduser(pth)
else:
return posixpath.expanduser(pth)
else:
return stringify_path(path)
class FSSpecStats:
def __init__(self, info: dict) -> None:
self._info = info
@property
def st_mode(self):
return self._info["type"]