Skip to content

Commit

Permalink
Merge pull request #64 from martindurant/for_dask
Browse files Browse the repository at this point in the history
changes required for dask integration
  • Loading branch information
martindurant authored Jul 9, 2019
2 parents 6b5b735 + 06f9a9a commit 5f5fa99
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 40 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ matrix:
script:
- py.test -vv s3fs

allow_failures:
- language: generic
env: TEST=GCSFS
install:
Expand Down
2 changes: 1 addition & 1 deletion fsspec/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def unzip(infile, mode='rb', filename=None, **kwargs):
z = ZipFile(infile)
if filename is None:
filename = z.namelist()[0]
return z.open(filename, mode='rb', **kwargs)
return z.open(filename, mode='r', **kwargs)


# should be functions of the form func(infile, mode=, **kwargs) -> file-like
Expand Down
34 changes: 19 additions & 15 deletions fsspec/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
from .compression import compr
from .utils import (infer_compression, build_name_function,
update_storage_options)
update_storage_options, stringify_path)
from .registry import get_filesystem_class
logger = logging.getLogger('fsspec')

Expand Down Expand Up @@ -64,27 +64,20 @@ def __enter__(self):

f = self.fs.open(self.path, mode=mode)

fobjects = [f]
self.fobjects = [f]

if self.compression is not None:
compress = compr[self.compression]
f = compress(f, mode=mode[0])
fobjects.append(f)
self.fobjects.append(f)

if 'b' not in self.mode:
# assume, for example, that 'r' is equivalent to 'rt' as in builtin
f = io.TextIOWrapper(f, encoding=self.encoding,
errors=self.errors, newline=self.newline)
fobjects.append(f)
self.fobjects.append(f)

self.fobjects = fobjects
try:
# opened file should know its original path
f.__fspath__ = self.__fspath__
except AttributeError:
# setting that can fail for some C file-like object
pass
return f
return self.fobjects[-1]

def __exit__(self, *args):
self.close()
Expand All @@ -103,6 +96,8 @@ def open(self):
def close(self):
"""Close all encapsulated file objects"""
for f in reversed(self.fobjects):
if 'r' not in self.mode and not f.closed:
f.flush()
f.close()
self.fobjects = []

Expand Down Expand Up @@ -208,6 +203,7 @@ def get_compression(urlpath, compression):


def split_protocol(urlpath):
urlpath = stringify_path(urlpath)
if "://" in urlpath:
return urlpath.split("://", 1)
return None, urlpath
Expand All @@ -233,6 +229,8 @@ def expand_paths_if_needed(paths, mode, num, fs, name_function):
if 'w' in mode and sum([1 for p in paths if '*' in p]) > 1:
raise ValueError("When writing data, only one filename mask can "
"be specified.")
elif 'w' in mode:
num = max(num, len(paths))
for curr_path in paths:
if '*' in curr_path:
if 'w' in mode:
Expand Down Expand Up @@ -282,7 +280,11 @@ def get_fs_token_paths(urlpath, mode='rb', num=1, name_function=None,
"share the same protocol")
cls = get_filesystem_class(protocol)
paths = [cls._strip_protocol(u) for u in urlpath]
options = cls._get_kwargs_from_urls(paths)
optionss = list(map(cls._get_kwargs_from_urls, paths))
options = optionss[0]
if not all(o == options for o in optionss):
raise ValueError("When specifying a list of paths, all paths must "
"share the same file-system options")
update_storage_options(options, storage_options)
fs = cls(**options)
paths = expand_paths_if_needed(paths, mode, num, fs, name_function)
Expand All @@ -307,13 +309,15 @@ def get_fs_token_paths(urlpath, mode='rb', num=1, name_function=None,
else:
raise TypeError('url type not understood: %s' % urlpath)

return fs, fs.token, paths
return fs, fs._fs_token, paths


def _expand_paths(path, name_function, num):
if isinstance(path, str):
if path.count('*') != 1:
if path.count('*') > 1:
raise ValueError("Output path spec must contain exactly one '*'.")
elif "*" not in path:
path = os.path.join(path, "*.part")

if name_function is None:
name_function = build_name_function(num - 1)
Expand Down
31 changes: 25 additions & 6 deletions fsspec/implementations/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@
class LocalFileSystem(AbstractFileSystem):
"""Interface to files on local storage
This class requires no initialisation or parameters
Parameters
----------
auto_mkdirs: bool
Whether, when opening a file, the directory containing it should
be created (if it doesn't already exist). This is assumed by pyarrow
code.
"""
root_marker = '/'

def __init__(self, auto_mkdir=True, **kwargs):
super().__init__(**kwargs)
self.auto_mkdir = auto_mkdir

def mkdir(self, path, **kwargs):
os.mkdir(path, **kwargs)
Expand All @@ -29,6 +37,10 @@ def ls(self, path, detail=False):
else:
return paths

def glob(self, path):
path = os.path.abspath(path)
return super().glob(path)

def info(self, path):
out = os.stat(path, follow_symlinks=False)
dest = False
Expand Down Expand Up @@ -64,14 +76,16 @@ def mv(self, path1, path2, **kwargs):
""" Move file from one location to another """
os.rename(path1, path2)

def rm(self, path, recursive=False):
def rm(self, path, recursive=False, maxdepth=None):
if recursive:
shutil.rmtree(path)
else:
os.remove(path)

def _open(self, path, mode='rb', block_size=None, **kwargs):
return LocalFileOpener(path, mode, **kwargs)
if self.auto_mkdir:
self.makedirs(self._parent(path), exist_ok=True)
return LocalFileOpener(path, mode, fs=self, **kwargs)

def touch(self, path, **kwargs):
""" Create empty file, or update timestamp """
Expand All @@ -82,8 +96,9 @@ def touch(self, path, **kwargs):


class LocalFileOpener(object):
def __init__(self, path, mode, autocommit=True, **kwargs):
def __init__(self, path, mode, autocommit=True, fs=None, **kwargs):
self.path = path
self.fs = fs
self.autocommit = autocommit
if autocommit or 'w' not in mode:
self.f = open(path, mode=mode)
Expand All @@ -92,6 +107,10 @@ def __init__(self, path, mode, autocommit=True, **kwargs):
i, name = tempfile.mkstemp()
self.temp = name
self.f = open(name, mode=mode)
if 'w' not in mode:
self.details = self.fs.info(path)
self.size = self.details['size']
self.f.size = self.size

def commit(self):
if self.autocommit:
Expand All @@ -113,8 +132,8 @@ def __getattr__(self, item):

def __enter__(self):
self._incontext = True
return self.f
return self.f.__enter__()

def __exit__(self, exc_type, exc_value, traceback):
self.f.close()
self._incontext = False
self.f.__exit__(exc_type, exc_value, traceback)
9 changes: 7 additions & 2 deletions fsspec/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def __init__(self, root, fs, check=False, create=False):
self.fs = fs
self.root = fs._strip_protocol(root).rstrip('/') # we join on '/' in _key_to_str
if create:
self.fs.mkdir(root)
if not self.fs.exists(root):
self.fs.mkdir(root)
if check:
if not self.fs.exists(root):
raise ValueError("Path %s does not exist. Create "
Expand Down Expand Up @@ -79,6 +80,7 @@ def __getitem__(self, key, default=None):
def __setitem__(self, key, value):
"""Store value in key"""
key = self._key_to_str(key)
self.fs.mkdirs(self.fs._parent(key), exist_ok=True)
with self.fs.open(key, 'wb') as f:
f.write(value)

Expand Down Expand Up @@ -135,7 +137,10 @@ def get_mapper(url, check=False, create=False, **kwargs):
-------
``FSMap`` instance, the dict-like key-value store.
"""
protocol = url.split(':', 1)[0]
if ":" in url:
protocol = url.split(':', 1)[0]
else:
protocol = 'file'
cls = get_filesystem_class(protocol)
fs = cls(**kwargs)
# Removing protocol here - could defer to each open() on the backend
Expand Down
35 changes: 20 additions & 15 deletions fsspec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import io
import os
import logging
from .utils import read_block, tokenize
from .utils import read_block, tokenize, stringify_path
logger = logging.getLogger('fsspec')

# alternative names for some methods, which get patched to new instances
Expand Down Expand Up @@ -62,7 +62,7 @@ def __new__(cls, *args, **storage_options):
# check for cached instance
return cls._cache[token]
self = object.__new__(cls)
self.token = token
self._fs_token = token
if self.cachable:
# store for caching - can hold memory
cls._cache[token] = self
Expand Down Expand Up @@ -101,13 +101,13 @@ def __init__(self, *args, **storage_options):
setattr(self, new, getattr(self, old))

def __dask_tokenize__(self):
return self.token
return self._fs_token

def __hash__(self):
return int(self.token, 16)
return int(self._fs_token, 16)

def __eq__(self, other):
return self.token == other.token
return self._fs_token == other._fs_token

@classmethod
def clear_instance_cache(cls, remove_singleton=True):
Expand Down Expand Up @@ -148,6 +148,7 @@ def _strip_protocol(cls, path):
May require FS-specific handling, e.g., for relative paths or links.
"""
path = stringify_path(path)
protos = (cls.protocol, ) if isinstance(
cls.protocol, str) else cls.protocol
for protocol in protos:
Expand Down Expand Up @@ -213,7 +214,7 @@ def invalidate_cache(self, path=None):
"""
pass # not necessary to implement, may have no cache

def mkdir(self, path, **kwargs):
def mkdir(self, path, create_parents=True, **kwargs):
"""
Create directory entry at path
Expand All @@ -224,6 +225,8 @@ def mkdir(self, path, **kwargs):
----------
path: str
location
create_parents: bool
if True, this is equivalent to ``makedirs``
kwargs:
may be permissions, etc.
"""
Expand Down Expand Up @@ -430,12 +433,11 @@ def glob(self, path):
root = ''
depth = 20 if "**" in path else 1
allpaths = self.find(root, maxdepth=depth)
pattern = re.compile("^" + path.replace('.', r'\.')
.replace('//', '/')
.rstrip('/')
.replace('**', '.+')
.replace('*', '[^/]*')
.replace('?', '.') + "$")
pattern = "^" + path.replace('.', r'\.').replace('//', '/').rstrip(
'/').replace('?', '.') + "$"
pattern = re.sub('[*]{2}', '=PLACEHOLDER=', pattern)
pattern = re.sub('[*]', '[^/]*', pattern)
pattern = re.compile(pattern.replace("=PLACEHOLDER=", '.*'))
out = {p for p in allpaths
if pattern.match(p.replace('//', '/').rstrip('/'))}
return list(sorted(out))
Expand Down Expand Up @@ -877,7 +879,10 @@ def closed(self, c):
self._closed = c

def __hash__(self):
return self.fs.checksum(self.path)
if 'w' in self.mode:
return id(self)
else:
tokenize(self.details)

def __eq__(self, other):
"""Files are equal if they have the same checksum, only in read mode"""
Expand Down Expand Up @@ -1019,11 +1024,11 @@ def read(self, length=-1):
length : int (-1)
Number of bytes to read; if <0, all remaining bytes.
"""
length = int(length)
length = -1 if length is None else int(length)
if self.mode != 'rb':
raise ValueError('File not in read mode')
if length < 0:
length = self.size
length = self.size - self.loc
if self.closed:
raise ValueError('I/O operation on closed file.')
logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length))
Expand Down
16 changes: 15 additions & 1 deletion fsspec/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from hashlib import md5
import math
import os
import pathlib
import re
from urllib.parse import urlsplit

Expand Down Expand Up @@ -42,14 +43,21 @@ def infer_storage_options(urlpath, inherit_storage_options=None):

parsed_path = urlsplit(urlpath)
protocol = parsed_path.scheme or 'file'
path = parsed_path.path
if parsed_path.fragment:
path = "#".join([parsed_path.path, parsed_path.fragment])
else:
path = parsed_path.path
if protocol == 'file':
# Special case parsing file protocol URL on Windows according to:
# https://msdn.microsoft.com/en-us/library/jj710207.aspx
windows_path = re.match(r'^/([a-zA-Z])[:|]([\\/].*)$', path)
if windows_path:
path = '%s:%s' % windows_path.groups()

if protocol in ["http", "https"]:
# for HTTP, we don't want to parse, as requests will anyway
return {"protocol": protocol, "path": urlpath}

options = {
'protocol': protocol,
'path': path,
Expand All @@ -61,6 +69,10 @@ def infer_storage_options(urlpath, inherit_storage_options=None):
# https://github.com/dask/dask/issues/1417
options['host'] = parsed_path.netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]

if protocol in ("s3", "gcs", "gs"):
options["path"] = options['host'] + options["path"]
else:
options["host"] = options['host']
if parsed_path.port:
options['port'] = parsed_path.port
if parsed_path.username:
Expand Down Expand Up @@ -201,6 +213,8 @@ def read_block(f, offset, length, delimiter=None):
if delimiter:
f.seek(offset)
seek_delimiter(f, delimiter, 2**16)
if length is None:
return f.read()
start = f.tell()
length -= start - offset

Expand Down

0 comments on commit 5f5fa99

Please sign in to comment.