Skip to content

Commit

Permalink
start
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Nov 12, 2024
1 parent 9a16171 commit 06e1dd2
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 29 deletions.
4 changes: 2 additions & 2 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ async def _copy(
continue
raise ex

async def _pipe_file(self, path, value, **kwargs):
async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
raise NotImplementedError

async def _pipe(self, path, value=None, batch_size=None, **kwargs):
Expand Down Expand Up @@ -517,7 +517,7 @@ async def _cat_ranges(
coros, batch_size=batch_size, nofiles=True, return_exceptions=True
)

async def _put_file(self, lpath, rpath, **kwargs):
async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
raise NotImplementedError

async def _put(
Expand Down
4 changes: 4 additions & 0 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,12 @@ async def _put_file(
chunk_size=5 * 2**20,
callback=DEFAULT_CALLBACK,
method="post",
mode="overwrite",
**kwargs,
):
if mode != "overwrite":
raise NotImplementedError("Exclusive write")

async def gen_chunks():
# Support passing arbitrary file-like objects
# and use them instead of streams.
Expand Down
7 changes: 5 additions & 2 deletions fsspec/implementations/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,13 @@ def makedirs(self, path, exist_ok=False):
if not exist_ok:
raise

def pipe_file(self, path, value, **kwargs):
def pipe_file(self, path, value, mode="overwrite", **kwargs):
"""Set the bytes of given file
Avoids copies of the data if possible
"""
self.open(path, "wb", data=value)
mode = "xb" if mode == "create" else "wb"
self.open(path, mode=mode, data=value)

def rmdir(self, path):
path = self._strip_protocol(path)
Expand Down Expand Up @@ -178,6 +179,8 @@ def _open(
**kwargs,
):
path = self._strip_protocol(path)
if "x" in mode and self.exists(path):
raise FileExistsError
if path in self.pseudo_dirs:
raise IsADirectoryError(path)
parent = path
Expand Down
8 changes: 6 additions & 2 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,13 +1181,17 @@ async def _rm_file(self, path, **kwargs):
) # ignores FileNotFound, just as well for directories
self.dircache.clear() # this is a bit heavy handed

async def _pipe_file(self, path, data):
async def _pipe_file(self, path, data, mode="overwrite", **kwargs):
if mode == "create" and self.exists(path):
raise FileExistsError
# can be str or bytes
self.references[path] = data
self.dircache.clear() # this is a bit heavy handed

async def _put_file(self, lpath, rpath, **kwargs):
async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
# puts binary
if mode == "create" and self.exists(rpath):
raise FileExistsError
with open(lpath, "rb") as f:
self.references[rpath] = f.read()
self.dircache.clear() # this is a bit heavy handed
Expand Down
18 changes: 12 additions & 6 deletions fsspec/implementations/tests/test_smb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

pytest.importorskip("smbprotocol")


def delay_rerun(*args):
time.sleep(0.1)
return True


# ruff: noqa: F821

if os.environ.get("WSL_INTEROP"):
Expand Down Expand Up @@ -72,7 +78,7 @@ def smb_params(request):
stop_docker(container)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_simple(smb_params):
adir = "/home/adir"
adir2 = "/home/adir/otherdir/"
Expand All @@ -89,7 +95,7 @@ def test_simple(smb_params):
assert not fsmb.exists(adir)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_auto_mkdir(smb_params):
adir = "/home/adir"
adir2 = "/home/adir/otherdir/"
Expand All @@ -116,7 +122,7 @@ def test_auto_mkdir(smb_params):
assert not fsmb.exists(another_dir)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_with_url(smb_params):
if smb_params["port"] is None:
smb_url = "smb://{username}:{password}@{host}/home/someuser.txt"
Expand All @@ -131,7 +137,7 @@ def test_with_url(smb_params):
assert read_result == b"hello"


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_transaction(smb_params):
afile = "/home/afolder/otherdir/afile"
afile2 = "/home/afolder/otherdir/afile2"
Expand All @@ -152,14 +158,14 @@ def test_transaction(smb_params):
assert fsmb.find(adir) == [afile, afile2]


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_makedirs_exist_ok(smb_params):
fsmb = fsspec.get_filesystem_class("smb")(**smb_params)
fsmb.makedirs("/home/a/b/c")
fsmb.makedirs("/home/a/b/c", exist_ok=True)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_rename_from_upath(smb_params):
fsmb = fsspec.get_filesystem_class("smb")(**smb_params)
fsmb.makedirs("/home/a/b/c", exist_ok=True)
Expand Down
46 changes: 29 additions & 17 deletions fsspec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,12 @@ def cat_file(self, path, start=None, end=None, **kwargs):
return f.read(end - f.tell())
return f.read()

def pipe_file(self, path, value, **kwargs):
def pipe_file(self, path, value, mode="overwrite", **kwargs):
"""Set the bytes of given file"""
if mode == "create" and self.exists(path):
# non-atomic but simple way; or could use "xb" in open(), which is likely
# not as well supported
raise FileExistsError
with self.open(path, "wb", **kwargs) as f:
f.write(value)

Expand Down Expand Up @@ -973,8 +977,12 @@ def get(
with callback.branched(rpath, lpath) as child:
self.get_file(rpath, lpath, callback=child, **kwargs)

def put_file(self, lpath, rpath, callback=DEFAULT_CALLBACK, **kwargs):
def put_file(
self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs
):
"""Copy single file to remote"""
if mode == "create" and self.exists(rpath):
raise FileExistsError
if os.path.isdir(lpath):
self.makedirs(rpath, exist_ok=True)
return None
Expand Down Expand Up @@ -1264,6 +1272,9 @@ def open(
Target file
mode: str like 'rb', 'w'
See builtin ``open()``
Mode "x" (exclusive write) may be implemented by the backend. Even if
it is, whether it is checked up front or on commit, and whether it is
atomic is implementation-dependent.
block_size: int
Some indication of buffering - this is a value in bytes
cache_options : dict, optional
Expand Down Expand Up @@ -1797,7 +1808,7 @@ def discard(self):

def info(self):
"""File information about this path"""
if "r" in self.mode:
if self.readable():
return self.details
else:
raise ValueError("Info not available while writing")
Expand Down Expand Up @@ -1844,7 +1855,7 @@ def write(self, data):
data: bytes
Set of bytes to be written.
"""
if self.mode not in {"wb", "ab"}:
if not self.writable():
raise ValueError("File not in write mode")
if self.closed:
raise ValueError("I/O operation on closed file.")
Expand Down Expand Up @@ -1877,7 +1888,7 @@ def flush(self, force=False):
if force:
self.forced = True

if self.mode not in {"wb", "ab"}:
if self.readable():
# no-op to flush on read-mode
return

Expand Down Expand Up @@ -2026,29 +2037,30 @@ def close(self):
return
if self.closed:
return
if self.mode == "rb":
self.cache = None
else:
if not self.forced:
self.flush(force=True)

if self.fs is not None:
self.fs.invalidate_cache(self.path)
self.fs.invalidate_cache(self.fs._parent(self.path))
try:
if self.mode == "rb":
self.cache = None
else:
if not self.forced:
self.flush(force=True)

self.closed = True
if self.fs is not None:
self.fs.invalidate_cache(self.path)
self.fs.invalidate_cache(self.fs._parent(self.path))
finally:
self.closed = True

def readable(self):
"""Whether opened for reading"""
return self.mode == "rb" and not self.closed
return "r" in self.mode and not self.closed

def seekable(self):
"""Whether is seekable (only in read mode)"""
return self.readable()

def writable(self):
"""Whether opened for writing"""
return self.mode in {"wb", "ab"} and not self.closed
return self.mode in {"wb", "ab", "xb"} and not self.closed

def __del__(self):
if not self.closed:
Expand Down
8 changes: 8 additions & 0 deletions fsspec/tests/abstract/put.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,14 @@ def test_put_directory_without_files_with_same_name_prefix(
assert fs.isfile(fs_join(fs_target, "subdir", "subfile.txt"))
assert fs.isfile(fs_join(fs_target, "subdir.txt"))

def test_pipe_exclusive(self, fs, fs_target):
fs.pipe_file(fs_target, b"data")
assert fs.cat_file(fs_target) == b"data"
with pytest.raises(FileExistsError):
fs.pipe_file(fs_target, b"data", mode="create")
fs.pipe_file(fs_target, b"new data", mode="overwrite")
assert fs.cat_file(fs_target) == b"new data"

def test_copy_with_source_and_destination_as_list(
self, fs, fs_target, fs_join, local_join, local_10_files_with_hashed_names
):
Expand Down

0 comments on commit 06e1dd2

Please sign in to comment.