From 3fba994bcabdcf38dc708baf2c2945d1f650d069 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 3 Jul 2019 09:08:41 -0400 Subject: [PATCH 1/6] changes required for dask integration --- fsspec/core.py | 22 ++++++++++++++++++---- fsspec/implementations/local.py | 4 ++++ fsspec/spec.py | 16 ++++++++-------- fsspec/utils.py | 16 +++++++++++++++- 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/fsspec/core.py b/fsspec/core.py index 79f1570ed..39e74a548 100644 --- a/fsspec/core.py +++ b/fsspec/core.py @@ -5,7 +5,7 @@ import logging from .compression import compr from .utils import (infer_compression, build_name_function, - update_storage_options) + update_storage_options, stringify_path) from .registry import get_filesystem_class logger = logging.getLogger('fsspec') @@ -37,9 +37,14 @@ class OpenFile(object): How to handle encoding errors if opened in text mode. newline : None or str Passed to TextIOWrapper in text mode, how to handle line endings. + context : bool + Normally, instances are designed for use in a ``with`` context, to + ensure pickleability and release of resources. However, + ``context=False`` will open all the file objects immediately, leaving + it up to the calling code to fo ``f.close()`` explicitly. """ def __init__(self, fs, path, mode='rb', compression=None, encoding=None, - errors=None, newline=None): + errors=None, newline=None, context=True): self.fs = fs self.path = path self.mode = mode @@ -48,6 +53,8 @@ def __init__(self, fs, path, mode='rb', compression=None, encoding=None, self.errors = errors self.newline = newline self.fobjects = [] + if not context: + self.__enter__() def __reduce__(self): return (OpenFile, (self.fs, self.path, self.mode, self.compression, @@ -208,6 +215,7 @@ def get_compression(urlpath, compression): def split_protocol(urlpath): + urlpath = stringify_path(urlpath) if "://" in urlpath: return urlpath.split("://", 1) return None, urlpath @@ -282,7 +290,11 @@ def get_fs_token_paths(urlpath, mode='rb', num=1, name_function=None, "share the same protocol") cls = get_filesystem_class(protocol) paths = [cls._strip_protocol(u) for u in urlpath] - options = cls._get_kwargs_from_urls(paths) + optionss = list(map(cls._get_kwargs_from_urls, paths)) + options = optionss[0] + if not all(o == options for o in optionss): + raise ValueError("When specifying a list of paths, all paths must " + "share the same file-system options") update_storage_options(options, storage_options) fs = cls(**options) paths = expand_paths_if_needed(paths, mode, num, fs, name_function) @@ -312,8 +324,10 @@ def get_fs_token_paths(urlpath, mode='rb', num=1, name_function=None, def _expand_paths(path, name_function, num): if isinstance(path, str): - if path.count('*') != 1: + if path.count('*') > 1: raise ValueError("Output path spec must contain exactly one '*'.") + elif "*" not in path: + path = os.path.join(path, "*.part") if name_function is None: name_function = build_name_function(num - 1) diff --git a/fsspec/implementations/local.py b/fsspec/implementations/local.py index 6eb832626..5fab5f336 100644 --- a/fsspec/implementations/local.py +++ b/fsspec/implementations/local.py @@ -29,6 +29,10 @@ def ls(self, path, detail=False): else: return paths + def glob(self, path): + path = os.path.abspath(path) + return super().glob(path) + def info(self, path): out = os.stat(path, follow_symlinks=False) dest = False diff --git a/fsspec/spec.py b/fsspec/spec.py index a02d4676f..e11ad287e 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -2,7 +2,7 @@ import io import os import logging -from .utils import read_block, tokenize +from .utils import read_block, tokenize, stringify_path logger = logging.getLogger('fsspec') # alternative names for some methods, which get patched to new instances @@ -148,6 +148,7 @@ def _strip_protocol(cls, path): May require FS-specific handling, e.g., for relative paths or links. """ + path = stringify_path(path) protos = (cls.protocol, ) if isinstance( cls.protocol, str) else cls.protocol for protocol in protos: @@ -430,12 +431,11 @@ def glob(self, path): root = '' depth = 20 if "**" in path else 1 allpaths = self.find(root, maxdepth=depth) - pattern = re.compile("^" + path.replace('.', r'\.') - .replace('//', '/') - .rstrip('/') - .replace('**', '.+') - .replace('*', '[^/]*') - .replace('?', '.') + "$") + pattern = "^" + path.replace('.', r'\.').replace('//', '/').rstrip( + '/').replace('?', '.') + "$" + pattern = re.sub('[*]{2}', '=PLACEHOLDER=', pattern) + pattern = re.sub('[*]', '[^/]*', pattern) + pattern = re.compile(pattern.replace("=PLACEHOLDER=", '.*')) out = {p for p in allpaths if pattern.match(p.replace('//', '/').rstrip('/'))} return list(sorted(out)) @@ -1019,7 +1019,7 @@ def read(self, length=-1): length : int (-1) Number of bytes to read; if <0, all remaining bytes. """ - length = int(length) + length = -1 if length is None else int(length) if self.mode != 'rb': raise ValueError('File not in read mode') if length < 0: diff --git a/fsspec/utils.py b/fsspec/utils.py index c653c0cef..6e6064d55 100644 --- a/fsspec/utils.py +++ b/fsspec/utils.py @@ -1,6 +1,7 @@ from hashlib import md5 import math import os +import pathlib import re from urllib.parse import urlsplit @@ -42,7 +43,10 @@ def infer_storage_options(urlpath, inherit_storage_options=None): parsed_path = urlsplit(urlpath) protocol = parsed_path.scheme or 'file' - path = parsed_path.path + if parsed_path.fragment: + path = "#".join([parsed_path.path, parsed_path.fragment]) + else: + path = parsed_path.path if protocol == 'file': # Special case parsing file protocol URL on Windows according to: # https://msdn.microsoft.com/en-us/library/jj710207.aspx @@ -50,6 +54,10 @@ def infer_storage_options(urlpath, inherit_storage_options=None): if windows_path: path = '%s:%s' % windows_path.groups() + if protocol in ["http", "https"]: + # for HTTP, we don't want to parse, as requests will anyway + return {"protocol": protocol, "path": urlpath} + options = { 'protocol': protocol, 'path': path, @@ -61,6 +69,10 @@ def infer_storage_options(urlpath, inherit_storage_options=None): # https://github.com/dask/dask/issues/1417 options['host'] = parsed_path.netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0] + if protocol in ("s3", "gcs", "gs"): + options["path"] = options['host'] + options["path"] + else: + options["host"] = options['host'] if parsed_path.port: options['port'] = parsed_path.port if parsed_path.username: @@ -201,6 +213,8 @@ def read_block(f, offset, length, delimiter=None): if delimiter: f.seek(offset) seek_delimiter(f, delimiter, 2**16) + if length is None: + return f.read() start = f.tell() length -= start - offset From bf6bdc94ac2d2a52fe79dc04b7f293af8bd17d52 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 3 Jul 2019 18:38:35 -0400 Subject: [PATCH 2/6] Further small improvements --- fsspec/compression.py | 2 +- fsspec/core.py | 13 ++++--------- fsspec/implementations/local.py | 9 +++++++-- fsspec/spec.py | 15 +++++++++------ 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/fsspec/compression.py b/fsspec/compression.py index e9dca848c..76f5eb63b 100644 --- a/fsspec/compression.py +++ b/fsspec/compression.py @@ -16,7 +16,7 @@ def unzip(infile, mode='rb', filename=None, **kwargs): z = ZipFile(infile) if filename is None: filename = z.namelist()[0] - return z.open(filename, mode='rb', **kwargs) + return z.open(filename, mode='r', **kwargs) # should be functions of the form func(infile, mode=, **kwargs) -> file-like diff --git a/fsspec/core.py b/fsspec/core.py index 39e74a548..639528d22 100644 --- a/fsspec/core.py +++ b/fsspec/core.py @@ -37,14 +37,9 @@ class OpenFile(object): How to handle encoding errors if opened in text mode. newline : None or str Passed to TextIOWrapper in text mode, how to handle line endings. - context : bool - Normally, instances are designed for use in a ``with`` context, to - ensure pickleability and release of resources. However, - ``context=False`` will open all the file objects immediately, leaving - it up to the calling code to fo ``f.close()`` explicitly. """ def __init__(self, fs, path, mode='rb', compression=None, encoding=None, - errors=None, newline=None, context=True): + errors=None, newline=None): self.fs = fs self.path = path self.mode = mode @@ -53,8 +48,6 @@ def __init__(self, fs, path, mode='rb', compression=None, encoding=None, self.errors = errors self.newline = newline self.fobjects = [] - if not context: - self.__enter__() def __reduce__(self): return (OpenFile, (self.fs, self.path, self.mode, self.compression, @@ -241,6 +234,8 @@ def expand_paths_if_needed(paths, mode, num, fs, name_function): if 'w' in mode and sum([1 for p in paths if '*' in p]) > 1: raise ValueError("When writing data, only one filename mask can " "be specified.") + elif 'w' in mode: + num = max(num, len(paths)) for curr_path in paths: if '*' in curr_path: if 'w' in mode: @@ -319,7 +314,7 @@ def get_fs_token_paths(urlpath, mode='rb', num=1, name_function=None, else: raise TypeError('url type not understood: %s' % urlpath) - return fs, fs.token, paths + return fs, fs._fs_token, paths def _expand_paths(path, name_function, num): diff --git a/fsspec/implementations/local.py b/fsspec/implementations/local.py index 5fab5f336..eddf8832f 100644 --- a/fsspec/implementations/local.py +++ b/fsspec/implementations/local.py @@ -75,7 +75,7 @@ def rm(self, path, recursive=False): os.remove(path) def _open(self, path, mode='rb', block_size=None, **kwargs): - return LocalFileOpener(path, mode, **kwargs) + return LocalFileOpener(path, mode, fs=self, **kwargs) def touch(self, path, **kwargs): """ Create empty file, or update timestamp """ @@ -86,8 +86,9 @@ def touch(self, path, **kwargs): class LocalFileOpener(object): - def __init__(self, path, mode, autocommit=True, **kwargs): + def __init__(self, path, mode, autocommit=True, fs=None, **kwargs): self.path = path + self.fs = fs self.autocommit = autocommit if autocommit or 'w' not in mode: self.f = open(path, mode=mode) @@ -96,6 +97,10 @@ def __init__(self, path, mode, autocommit=True, **kwargs): i, name = tempfile.mkstemp() self.temp = name self.f = open(name, mode=mode) + if 'w' not in mode: + self.details = self.fs.info(path) + self.size = self.details['size'] + self.f.size = self.size def commit(self): if self.autocommit: diff --git a/fsspec/spec.py b/fsspec/spec.py index e11ad287e..ac0b6ca08 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -62,7 +62,7 @@ def __new__(cls, *args, **storage_options): # check for cached instance return cls._cache[token] self = object.__new__(cls) - self.token = token + self._fs_token = token if self.cachable: # store for caching - can hold memory cls._cache[token] = self @@ -101,13 +101,13 @@ def __init__(self, *args, **storage_options): setattr(self, new, getattr(self, old)) def __dask_tokenize__(self): - return self.token + return self._fs_token def __hash__(self): - return int(self.token, 16) + return int(self._fs_token, 16) def __eq__(self, other): - return self.token == other.token + return self._fs_token == other._fs_token @classmethod def clear_instance_cache(cls, remove_singleton=True): @@ -877,7 +877,10 @@ def closed(self, c): self._closed = c def __hash__(self): - return self.fs.checksum(self.path) + if 'w' in self.mode: + return id(self) + else: + tokenize(self.details) def __eq__(self, other): """Files are equal if they have the same checksum, only in read mode""" @@ -1023,7 +1026,7 @@ def read(self, length=-1): if self.mode != 'rb': raise ValueError('File not in read mode') if length < 0: - length = self.size + length = self.size - self.loc if self.closed: raise ValueError('I/O operation on closed file.') logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length)) From 5c1aaed2867c982ef7f2132b09cefd9b42fdd68d Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 4 Jul 2019 19:42:28 -0400 Subject: [PATCH 3/6] URLs without protocol should be local --- fsspec/core.py | 2 ++ fsspec/mapping.py | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/fsspec/core.py b/fsspec/core.py index 639528d22..629aab6dc 100644 --- a/fsspec/core.py +++ b/fsspec/core.py @@ -103,6 +103,8 @@ def open(self): def close(self): """Close all encapsulated file objects""" for f in reversed(self.fobjects): + if 'r' not in self.mode and not f.closed: + f.flush() f.close() self.fobjects = [] diff --git a/fsspec/mapping.py b/fsspec/mapping.py index 29050386b..79c515b09 100644 --- a/fsspec/mapping.py +++ b/fsspec/mapping.py @@ -135,7 +135,10 @@ def get_mapper(url, check=False, create=False, **kwargs): ------- ``FSMap`` instance, the dict-like key-value store. """ - protocol = url.split(':', 1)[0] + if ":" in url: + protocol = url.split(':', 1)[0] + else: + protocol = 'file' cls = get_filesystem_class(protocol) fs = cls(**kwargs) # Removing protocol here - could defer to each open() on the backend From 68cf3a83c61ecf77aa22c63218464dc8959e5afe Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 8 Jul 2019 19:13:47 -0400 Subject: [PATCH 4/6] Remove OpenFile circular reference --- fsspec/core.py | 17 +++++------------ fsspec/implementations/local.py | 4 ++-- fsspec/mapping.py | 4 +++- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/fsspec/core.py b/fsspec/core.py index 629aab6dc..a417fce75 100644 --- a/fsspec/core.py +++ b/fsspec/core.py @@ -64,27 +64,20 @@ def __enter__(self): f = self.fs.open(self.path, mode=mode) - fobjects = [f] + self.fobjects = [f] if self.compression is not None: compress = compr[self.compression] f = compress(f, mode=mode[0]) - fobjects.append(f) + self.fobjects.append(f) if 'b' not in self.mode: # assume, for example, that 'r' is equivalent to 'rt' as in builtin f = io.TextIOWrapper(f, encoding=self.encoding, errors=self.errors, newline=self.newline) - fobjects.append(f) - - self.fobjects = fobjects - try: - # opened file should know its original path - f.__fspath__ = self.__fspath__ - except AttributeError: - # setting that can fail for some C file-like object - pass - return f + self.fobjects.append(f) + + return self.fobjects[-1] def __exit__(self, *args): self.close() diff --git a/fsspec/implementations/local.py b/fsspec/implementations/local.py index eddf8832f..ba4003486 100644 --- a/fsspec/implementations/local.py +++ b/fsspec/implementations/local.py @@ -122,8 +122,8 @@ def __getattr__(self, item): def __enter__(self): self._incontext = True - return self.f + return self.f.__enter__() def __exit__(self, exc_type, exc_value, traceback): - self.f.close() self._incontext = False + self.f.__exit__(exc_type, exc_value, traceback) diff --git a/fsspec/mapping.py b/fsspec/mapping.py index 79c515b09..3cc1da9cd 100644 --- a/fsspec/mapping.py +++ b/fsspec/mapping.py @@ -35,7 +35,8 @@ def __init__(self, root, fs, check=False, create=False): self.fs = fs self.root = fs._strip_protocol(root).rstrip('/') # we join on '/' in _key_to_str if create: - self.fs.mkdir(root) + if not self.fs.exists(root): + self.fs.mkdir(root) if check: if not self.fs.exists(root): raise ValueError("Path %s does not exist. Create " @@ -79,6 +80,7 @@ def __getitem__(self, key, default=None): def __setitem__(self, key, value): """Store value in key""" key = self._key_to_str(key) + self.fs.mkdirs(self.fs._parent(key), exist_ok=True) with self.fs.open(key, 'wb') as f: f.write(value) From 2407dbc5db268b1e52bb5b9e7ad54e5d9b7b3af1 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 8 Jul 2019 19:15:22 -0400 Subject: [PATCH 5/6] Allow gcsfs to fail (but we want to know) Because those tests often rquire a re-record of VCR. --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 43b7056a2..dee8a057c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -47,6 +47,7 @@ matrix: script: - py.test -vv s3fs + allow_failures: - language: generic env: TEST=GCSFS install: From 06f9a9a9de9188c7917d29b19d8609e33a214d28 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 9 Jul 2019 10:39:06 -0400 Subject: [PATCH 6/6] auto-makedirs for local files because of pyarrow --- fsspec/implementations/local.py | 14 ++++++++++++-- fsspec/spec.py | 4 +++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/fsspec/implementations/local.py b/fsspec/implementations/local.py index ba4003486..b6468078f 100644 --- a/fsspec/implementations/local.py +++ b/fsspec/implementations/local.py @@ -7,10 +7,18 @@ class LocalFileSystem(AbstractFileSystem): """Interface to files on local storage - This class requires no initialisation or parameters + Parameters + ---------- + auto_mkdirs: bool + Whether, when opening a file, the directory containing it should + be created (if it doesn't already exist). This is assumed by pyarrow + code. """ root_marker = '/' + def __init__(self, auto_mkdir=True, **kwargs): + super().__init__(**kwargs) + self.auto_mkdir = auto_mkdir def mkdir(self, path, **kwargs): os.mkdir(path, **kwargs) @@ -68,13 +76,15 @@ def mv(self, path1, path2, **kwargs): """ Move file from one location to another """ os.rename(path1, path2) - def rm(self, path, recursive=False): + def rm(self, path, recursive=False, maxdepth=None): if recursive: shutil.rmtree(path) else: os.remove(path) def _open(self, path, mode='rb', block_size=None, **kwargs): + if self.auto_mkdir: + self.makedirs(self._parent(path), exist_ok=True) return LocalFileOpener(path, mode, fs=self, **kwargs) def touch(self, path, **kwargs): diff --git a/fsspec/spec.py b/fsspec/spec.py index ac0b6ca08..948bc0a7d 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -214,7 +214,7 @@ def invalidate_cache(self, path=None): """ pass # not necessary to implement, may have no cache - def mkdir(self, path, **kwargs): + def mkdir(self, path, create_parents=True, **kwargs): """ Create directory entry at path @@ -225,6 +225,8 @@ def mkdir(self, path, **kwargs): ---------- path: str location + create_parents: bool + if True, this is equivalent to ``makedirs`` kwargs: may be permissions, etc. """