diff --git a/fsspec/asyn.py b/fsspec/asyn.py index 4563254df..ecca48847 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -438,7 +438,14 @@ async def _cat( return out[0] async def _cat_ranges( - self, paths, starts, ends, max_gap=None, batch_size=None, **kwargs + self, + paths, + starts, + ends, + max_gap=None, + batch_size=None, + on_error="return", + **kwargs, ): # TODO: on_error if max_gap is not None: @@ -457,7 +464,9 @@ async def _cat_ranges( for p, s, e in zip(paths, starts, ends) ] batch_size = batch_size or self.batch_size - return await _run_coros_in_chunks(coros, batch_size=batch_size, nofiles=True) + return await _run_coros_in_chunks( + coros, batch_size=batch_size, nofiles=True, return_exceptions=True + ) async def _put_file(self, lpath, rpath, **kwargs): raise NotImplementedError diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index 30fac770c..1af2f2f41 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -16,7 +16,7 @@ from ..callbacks import _DEFAULT_CALLBACK from ..core import filesystem, open, split_protocol from ..spec import AbstractFileSystem -from ..utils import isfilelike +from ..utils import isfilelike, merge_offset_ranges logger = logging.getLogger("fsspec.reference") @@ -76,6 +76,8 @@ def __init__( fs=None, template_overrides=None, simple_templates=True, + max_gap=64_000, + max_block=256_000_000, loop=None, **kwargs, ): @@ -85,18 +87,19 @@ def __init__( ---------- fo : dict or str The set of references to use for this instance, with a structure as above. - If str, will use fsspec.open, in conjunction with ref_storage_args to - open and parse JSON at this location. + If str, will use fsspec.open, in conjunction with target_options + and target_protocol to open and parse JSON at this location. target : str For any references having target_url as None, this is the default file target to use ref_storage_args : dict - If references is a str, use these kwargs for loading the JSON file + If references is a str, use these kwargs for loading the JSON file. + Deprecated: use target_options instead. target_protocol : str Used for loading the reference file, if it is a path. If None, protocol will be derived from the given path target_options : dict - Extra FS options for loading the reference file, if given as a path + Extra FS options for loading the reference file ``fo``, if given as a path remote_protocol : str The protocol of the filesystem on which the references will be evaluated (unless fs is provided). If not given, will be derived from the first @@ -119,6 +122,14 @@ def __init__( Whether templates can be processed with simple replace (True) or if jinja is needed (False, much slower). All reference sets produced by ``kerchunk`` are simple in this sense, but the spec allows for complex. + max_gap, max_block: int + For merging multiple concurrent requests to the same remote file. + Neighboring byte ranges will only be merged when their + inter-range gap is <= `max_gap`. Default is 64KB. Set to 0 + to only merge when it requires no extra bytes. Pass a negative + number to disable merging, appropriate for local target files. + Neighboring byte ranges will only be merged when the size of + the aggregated range is <= `max_block`. Default is 256MB. kwargs : passed to parent class """ super().__init__(loop=loop, **kwargs) @@ -128,6 +139,8 @@ def __init__( self.simple_templates = simple_templates self.templates = {} self.fss = {} + self.max_gap = max_gap + self.max_block = max_block if hasattr(fo, "read"): text = fo.read() elif isinstance(fo, str): @@ -156,12 +169,15 @@ def __init__( ) for k, opts in fs.items() } + if None not in self.fss: + self.fss[None] = filesystem("file") return if fs is not None: # single remote FS remote_protocol = ( fs.protocol[0] if isinstance(fs.protocol, tuple) else fs.protocol ) + self.fss[remote_protocol] = fs if remote_protocol is None: # get single protocol from any templates @@ -169,9 +185,9 @@ def __init__( if callable(ref): ref = ref() protocol, _ = fsspec.core.split_protocol(ref) - if protocol: - remote_protocol = protocol - break + if protocol and protocol not in self.fss: + fs = filesystem(protocol, loop=loop, **(remote_options or {})) + self.fss[protocol] = fs if remote_protocol is None: # get single protocol from references for ref in self.references.values(): @@ -179,22 +195,22 @@ def __init__( ref = ref() if isinstance(ref, list) and ref[0]: protocol, _ = fsspec.core.split_protocol(ref[0]) - if protocol: - remote_protocol = protocol - break - if remote_protocol is None: - remote_protocol = target_protocol + if protocol and protocol not in self.fss: + fs = filesystem(protocol, loop=loop, **(remote_options or {})) + self.fss[protocol] = fs + + if remote_protocol and remote_protocol not in self.fss: + fs = filesystem(remote_protocol, loop=loop, **(remote_options or {})) + self.fss[remote_protocol] = fs - fs = fs or filesystem(remote_protocol, loop=loop, **(remote_options or {})) - self.fss[remote_protocol] = fs - self.fss[None] = fs # default one + self.fss[None] = fs or filesystem("file") # default one @property def loop(self): inloop = [fs.loop for fs in self.fss.values() if fs.async_impl] return inloop[0] if inloop else self._loop - def _cat_common(self, path): + def _cat_common(self, path, start=None, end=None): path = self._strip_protocol(path) logger.debug(f"cat: {path}") part = self.references[path] @@ -209,35 +225,44 @@ def _cat_common(self, path): if len(part) == 1: logger.debug(f"Reference: {path}, whole file") url = part[0] - start = None - end = None + start1, end1 = start, end else: - url, start, size = part - logger.debug(f"Reference: {path}, offset {start}, size {size}") - end = start + size + url, start0, size = part + logger.debug(f"Reference: {path} => {url}, offset {start0}, size {size}") + end0 = start0 + size + + if start is not None: + if start >= 0: + start1 = start0 + start + else: + start1 = end0 + start + else: + start1 = start0 + if end is not None: + if end >= 0: + end1 = start0 + end + else: + end1 = end0 + end + else: + end1 = end0 if url is None: url = self.target - return url, start, end + return url, start1, end1 async def _cat_file(self, path, start=None, end=None, **kwargs): - part_or_url, start0, end0 = self._cat_common(path) + part_or_url, start0, end0 = self._cat_common(path, start=start, end=end) if isinstance(part_or_url, bytes): return part_or_url[start:end] protocol, _ = split_protocol(part_or_url) - # TODO: start and end should be passed to cat_file, not sliced - return ( - await self.fss[protocol]._cat_file(part_or_url, start=start0, end=end0) - )[start:end] + return await self.fss[protocol]._cat_file(part_or_url, start=start, end=end) def cat_file(self, path, start=None, end=None, **kwargs): - part_or_url, start0, end0 = self._cat_common(path) + part_or_url, start0, end0 = self._cat_common(path, start=start, end=end) if isinstance(part_or_url, bytes): return part_or_url[start:end] protocol, _ = split_protocol(part_or_url) # TODO: start and end should be passed to cat_file, not sliced - return self.fss[protocol].cat_file(part_or_url, start=start0, end=end0)[ - start:end - ] + return self.fss[protocol].cat_file(part_or_url, start=start0, end=end0) def pipe_file(self, path, value, **_): """Temporarily add binary data or reference as a file""" @@ -277,45 +302,64 @@ def get(self, rpath, lpath, recursive=False, **kwargs): ) def cat(self, path, recursive=False, on_error="raise", **kwargs): + if isinstance(path, str) and recursive: + raise NotImplementedError + if isinstance(path, list) and (recursive or any("*" in p for p in path)): + raise NotImplementedError proto_dict = _protocol_groups(path, self.references) out = {} for proto, paths in proto_dict.items(): - if proto is None: - # binary/string - for p in paths: - try: - out[p] = AbstractFileSystem.cat_file(self, p, **kwargs) - except Exception as e: - if on_error == "raise": - raise - if on_error == "return": - out[p] = e - - elif self.fss[proto].async_impl: - # TODO: asyncio.gather on multiple async FSs - out.update( - sync( - self.loop, - self._cat, - paths, - recursive, - on_error=on_error, - **kwargs, - ) - ) - elif isinstance(paths, list): - if recursive or any("*" in p for p in paths): - raise NotImplementedError - for p in paths: - try: - out[p] = AbstractFileSystem.cat_file(self, p, **kwargs) - except Exception as e: - if on_error == "raise": - raise - if on_error == "return": - out[p] = e + fs = self.fss[proto] + urls, starts, ends = zip(*[self._cat_common(p) for p in paths]) + urls2 = [] + starts2 = [] + ends2 = [] + paths2 = [] + whole_files = set() + for u, s, e, p in zip(urls, starts, ends, paths): + if isinstance(u, bytes): + # data + out[p] = u + elif s is None: + # whole file - limits are None, None, but no further + # entries take for this file + whole_files.add(u) + urls2.append(u) + starts2.append(s) + ends2.append(e) + paths2.append(p) + for u, s, e, p in zip(urls, starts, ends, paths): + if s is not None and u not in whole_files: + urls2.append(u) + starts2.append(s) + ends2.append(e) + paths2.append(p) + new_paths, new_starts, new_ends = merge_offset_ranges( + list(urls2), + list(starts2), + list(ends2), + sort=False, + max_gap=self.max_gap, + max_block=self.max_block, + ) + bytes_out = fs.cat_ranges(new_paths, new_starts, new_ends) + if len(urls2) == len(bytes_out): + # we didn't do any merging + for p, b in zip(paths2, bytes_out): + out[p] = b else: - out.update(AbstractFileSystem.cat_file(self, paths)) + # unbundle from merged bytes - simple approach + for u, s, e, p in zip(urls, starts, ends, paths): + if p in out: + continue # was bytes, already handled + for np, ns, ne, b in zip( + new_paths, new_starts, new_ends, bytes_out + ): + if np == u and (ns is None or ne is None): + out[p] = b[s:e] + elif np == u and s >= ns and e <= ne: + out[p] = b[s - ns : (e - ne) or None] + if len(out) == 1 and isinstance(path, str) and "*" not in path: return _first(out) return out diff --git a/fsspec/implementations/tests/test_reference.py b/fsspec/implementations/tests/test_reference.py index f2e72a6a3..f0b04dd7a 100644 --- a/fsspec/implementations/tests/test_reference.py +++ b/fsspec/implementations/tests/test_reference.py @@ -78,7 +78,11 @@ def test_mutable(server, m): def test_defaults(server): # noqa: F811 refs = {"a": b"data", "b": (None, 0, 5)} fs = fsspec.filesystem( - "reference", fo=refs, target_protocol="http", target=realfile + "reference", + fo=refs, + target_protocol="http", + target=realfile, + remote_protocol="http", ) assert fs.cat("a") == b"data" @@ -337,3 +341,66 @@ def test_missing_nonasync(m): a = zarr.open_array(m) assert str(a[0]) == "nan" + + +def test_fss_has_defaults(m): + fs = fsspec.filesystem("reference", fo={}) + assert None in fs.fss + + fs = fsspec.filesystem("reference", fo={}, remote_protocol="memory") + assert fs.fss[None].protocol == "memory" + assert fs.fss["memory"].protocol == "memory" + + fs = fsspec.filesystem("reference", fs=m, fo={}) + assert fs.fss[None] is m + + fs = fsspec.filesystem("reference", fs={"memory": m}, fo={}) + assert fs.fss["memory"] is m + assert fs.fss[None].protocol == "file" + + fs = fsspec.filesystem("reference", fs={None: m}, fo={}) + assert fs.fss[None] is m + + fs = fsspec.filesystem("reference", fo={"key": ["memory://a"]}) + assert fs.fss[None] is fs.fss["memory"] + + fs = fsspec.filesystem("reference", fo={"key": ["memory://a"], "blah": ["path"]}) + assert fs.fss[None] is fs.fss["memory"] + + +def test_merging(m): + m.pipe("/a", b"test data") + other = b"other test data" + m.pipe("/b", other) + fs = fsspec.filesystem( + "reference", + fo={ + "a": ["memory://a", 1, 1], + "b": ["memory://a", 2, 1], + "c": ["memory://b"], + "d": ["memory://b", 4, 6], + }, + ) + out = fs.cat(["a", "b", "c", "d"]) + assert out == {"a": b"e", "b": b"s", "c": other, "d": other[4:10]} + + +def test_cat_file_ranges(m): + other = b"other test data" + m.pipe("/b", other) + fs = fsspec.filesystem( + "reference", + fo={ + "c": ["memory://b"], + "d": ["memory://b", 4, 6], + }, + ) + assert fs.cat_file("c") == other + assert fs.cat_file("c", start=1) == other[1:] + assert fs.cat_file("c", start=-5) == other[-5:] + assert fs.cat_file("c", 1, -5) == other[1:-5] + + assert fs.cat_file("d") == other[4:10] + assert fs.cat_file("d", start=1) == other[4:10][1:] + assert fs.cat_file("d", start=-5) == other[4:10][-5:] + assert fs.cat_file("d", 1, -3) == other[4:10][1:-3] diff --git a/fsspec/spec.py b/fsspec/spec.py index 4cb0d5f86..6357e5a0b 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -745,7 +745,9 @@ def pipe(self, path, value=None, **kwargs): else: raise ValueError("path must be str or dict") - def cat_ranges(self, paths, starts, ends, max_gap=None, **kwargs): + def cat_ranges( + self, paths, starts, ends, max_gap=None, on_error="return", **kwargs + ): if max_gap is not None: raise NotImplementedError if not isinstance(paths, list): @@ -756,7 +758,16 @@ def cat_ranges(self, paths, starts, ends, max_gap=None, **kwargs): ends = [starts] * len(paths) if len(starts) != len(paths) or len(ends) != len(paths): raise ValueError - return [self.cat_file(p, s, e) for p, s, e in zip(paths, starts, ends)] + out = [] + for p, s, e in zip(paths, starts, ends): + try: + out.append(self.cat_file(p, s, e)) + except Exception as e: + if False: # on_error == "return": + out.append(e) + else: + raise + return out def cat(self, path, recursive=False, on_error="raise", **kwargs): """Fetch (potentially multiple) paths' contents diff --git a/fsspec/utils.py b/fsspec/utils.py index 5eeec5c6e..3814f4132 100644 --- a/fsspec/utils.py +++ b/fsspec/utils.py @@ -496,7 +496,6 @@ def merge_offset_ranges(paths, starts, ends, max_gap=0, max_block=None, sort=Tru order. If the user can guarantee that the inputs are already sorted, passing `sort=False` will skip the re-ordering. """ - # Check input if not isinstance(paths, list): raise TypeError