Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace fs with fsspec #523

Merged
merged 11 commits into from
Nov 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{
"name": "Fugue Development Environment",
"_image": "fugueproject/devenv:0.7.7",
"image": "mcr.microsoft.com/vscode/devcontainers/python:3.10",
"customizations": {
"vscode": {
"settings": {
"terminal.integrated.shell.linux": "/bin/bash",
"python.pythonPath": "/usr/local/bin/python",
"python.defaultInterpreterPath": "/usr/local/bin/python",
"editor.defaultFormatter": "ms-python.black-formatter",
"isort.interpreter": [
"/usr/local/bin/python"
],
Expand All @@ -16,6 +16,9 @@
],
"pylint.interpreter": [
"/usr/local/bin/python"
],
"black-formatter.interpreter": [
"/usr/local/bin/python"
]
},
"extensions": [
Expand All @@ -24,6 +27,7 @@
"ms-python.flake8",
"ms-python.pylint",
"ms-python.mypy",
"ms-python.black-formatter",
"GitHub.copilot",
"njpwerner.autodocstring"
]
Expand Down
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 0.8.7

- [488](https://github.com/fugue-project/fugue/issues/488) Migrate from fs to fsspec
- [521](https://github.com/fugue-project/fugue/issues/521) Add `as_dicts` to Fugue API
- [516](https://github.com/fugue-project/fugue/issues/516) Use `_collect_as_arrow` for Spark `as_arrow``
- [520](https://github.com/fugue-project/fugue/pull/520) Add Python 3.10 to Windows Tests
- [506](https://github.com/fugue-project/fugue/issues/506) Adopt pandas `ExtensionDType`
- [504](https://github.com/fugue-project/fugue/issues/504) Create Fugue pytest fixtures
- [503](https://github.com/fugue-project/fugue/issues/503) Deprecate python 3.7 support
Expand Down
1 change: 0 additions & 1 deletion fugue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# flake8: noqa
from triad.collections import Schema
from triad.collections.fs import FileSystem

from fugue.api import out_transform, transform
from fugue.bag.array_bag import ArrayBag
Expand Down
173 changes: 84 additions & 89 deletions fugue/_utils/io.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,29 @@
import os
import pathlib
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from urllib.parse import urlparse

import fs as pfs
import pandas as pd
from fsspec import AbstractFileSystem
from fsspec.implementations.local import LocalFileSystem
from triad.collections.dict import ParamDict
from triad.collections.fs import FileSystem
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.io import join, url_to_fs
from triad.utils.pandas_like import PD_UTILS

from fugue.dataframe import LocalBoundedDataFrame, LocalDataFrame, PandasDataFrame


class FileParser(object):
def __init__(self, path: str, format_hint: Optional[str] = None):
last = len(path)
has_glob = False
self._orig_format_hint = format_hint
for i in range(len(path)):
if path[i] in ["/", "\\"]:
last = i
if path[i] in ["*", "?"]:
has_glob = True
break
if not has_glob:
self._uri = urlparse(path)
self._glob_pattern = ""
self._path = self._uri.path
self._has_glob = "*" in path or "?" in path
self._raw_path = path
self._fs, self._fs_path = url_to_fs(path)
if not self.is_local:
self._path = self._fs.unstrip_protocol(self._fs_path)
else:
self._uri = urlparse(path[:last])
self._glob_pattern = path[last + 1 :]
self._path = pfs.path.combine(self._uri.path, self._glob_pattern)
self._path = os.path.abspath(self._fs._strip_protocol(path))

if format_hint is None or format_hint == "":
for k, v in _FORMAT_MAP.items():
Expand All @@ -48,56 +39,64 @@ def __init__(self, path: str, format_hint: Optional[str] = None):
self._format = format_hint

def assert_no_glob(self) -> "FileParser":
assert_or_throw(self.glob_pattern == "", f"{self.path} has glob pattern")
assert_or_throw(not self.has_glob, f"{self.raw_path} has glob pattern")
return self

def with_glob(self, glob: str, format_hint: Optional[str] = None) -> "FileParser":
uri = self.uri
if glob != "":
uri = pfs.path.combine(uri, glob)
return FileParser(uri, format_hint or self._orig_format_hint)

@property
def glob_pattern(self) -> str:
return self._glob_pattern
def has_glob(self):
return self._has_glob

@property
def uri(self) -> str:
return self._uri.geturl()
def is_local(self):
return isinstance(self._fs, LocalFileSystem)

@property
def uri_with_glob(self) -> str:
if self.glob_pattern == "":
return self.uri
return pfs.path.combine(self.uri, self.glob_pattern)
def join(self, path: str, format_hint: Optional[str] = None) -> "FileParser":
if not self.has_glob:
_path = join(self.path, path)
else:
_path = join(self.parent, path)
return FileParser(_path, format_hint or self._orig_format_hint)

@property
def parent(self) -> str:
dn = os.path.dirname(self.uri)
return dn if dn != "" else "."

@property
def scheme(self) -> str:
return self._uri.scheme
return self._fs.unstrip_protocol(self._fs._parent(self._fs_path))

@property
def path(self) -> str:
return self._path

@property
def raw_path(self) -> str:
return self._raw_path

@property
def suffix(self) -> str:
return "".join(pathlib.Path(self.path.lower()).suffixes)
return "".join(pathlib.Path(self.raw_path.lower()).suffixes)

@property
def file_format(self) -> str:
return self._format

def make_parent_dirs(self) -> None:
self._fs.makedirs(self._fs._parent(self._fs_path), exist_ok=True)

def find_all(self) -> Iterable["FileParser"]:
if self.has_glob:
for x in self._fs.glob(self._fs_path):
yield FileParser(self._fs.unstrip_protocol(x))
else:
yield self

def open(self, *args: Any, **kwargs: Any) -> Any:
self.assert_no_glob()
return self._fs.open(self._fs_path, *args, **kwargs)


def load_df(
uri: Union[str, List[str]],
format_hint: Optional[str] = None,
columns: Any = None,
fs: Optional[FileSystem] = None,
fs: Optional[AbstractFileSystem] = None,
**kwargs: Any,
) -> LocalBoundedDataFrame:
if isinstance(uri, str):
Expand All @@ -117,48 +116,36 @@ def save_df(
uri: str,
format_hint: Optional[str] = None,
mode: str = "overwrite",
fs: Optional[FileSystem] = None,
fs: Optional[AbstractFileSystem] = None,
**kwargs: Any,
) -> None:
assert_or_throw(
mode in ["overwrite", "error"], NotImplementedError(f"{mode} is not supported")
)
p = FileParser(uri, format_hint).assert_no_glob()
if fs is None:
fs = FileSystem()
fs, _ = url_to_fs(uri)
if fs.exists(uri):
assert_or_throw(mode == "overwrite", FileExistsError(uri))
try:
fs.remove(uri)
except Exception:
try:
fs.removetree(uri)
except Exception: # pragma: no cover
pass
fs.rm(uri, recursive=True)
except Exception: # pragma: no cover
pass
_FORMAT_SAVE[p.file_format](df, p, **kwargs)


def _get_single_files(
fp: Iterable[FileParser], fs: Optional[FileSystem]
fp: Iterable[FileParser], fs: Optional[AbstractFileSystem]
) -> Iterable[FileParser]:
if fs is None:
fs = FileSystem()
for f in fp:
if f.glob_pattern != "":
files = [
FileParser(pfs.path.combine(f.uri, pfs.path.basename(x.path)))
for x in fs.opendir(f.uri).glob(f.glob_pattern)
]
yield from _get_single_files(files, fs)
else:
yield f
yield from f.find_all()


def _save_parquet(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None:
PD_UTILS.to_parquet_friendly(
df.as_pandas(), partition_cols=kwargs.get("partition_cols", [])
).to_parquet(
p.uri,
p.assert_no_glob().path,
**{
"engine": "pyarrow",
"schema": df.schema.pa_schema,
Expand All @@ -171,34 +158,36 @@ def _load_parquet(
p: FileParser, columns: Any = None, **kwargs: Any
) -> Tuple[pd.DataFrame, Any]:
if columns is None:
pdf = pd.read_parquet(p.uri, **{"engine": "pyarrow", **kwargs})
pdf = pd.read_parquet(p.path, **{"engine": "pyarrow", **kwargs})
return pdf, None
if isinstance(columns, list): # column names
pdf = pd.read_parquet(p.uri, columns=columns, **{"engine": "pyarrow", **kwargs})
pdf = pd.read_parquet(
p.path, columns=columns, **{"engine": "pyarrow", **kwargs}
)
return pdf, None
schema = Schema(columns)
pdf = pd.read_parquet(
p.uri, columns=schema.names, **{"engine": "pyarrow", **kwargs}
p.path, columns=schema.names, **{"engine": "pyarrow", **kwargs}
)
return pdf, schema


def _save_csv(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None:
df.as_pandas().to_csv(p.uri, **{"index": False, "header": False, **kwargs})
with p.open("w") as f:
df.as_pandas().to_csv(f, **{"index": False, "header": False, **kwargs})


def _safe_load_csv(path: str, **kwargs: Any) -> pd.DataFrame:
def _safe_load_csv(p: FileParser, **kwargs: Any) -> pd.DataFrame:
def load_dir() -> pd.DataFrame:
fs = FileSystem()
return pd.concat(
[
pd.read_csv(pfs.path.combine(path, pfs.path.basename(x.path)), **kwargs)
for x in fs.opendir(path).glob("*.csv")
]
)
dfs: List[pd.DataFrame] = []
for _p in p.join("*.csv").find_all(): # type: ignore
with _p.open("r") as f:
dfs.append(pd.read_csv(f, **kwargs))
return pd.concat(dfs)

try:
return pd.read_csv(path, **kwargs)
with p.open("r") as f:
return pd.read_csv(f, **kwargs)
except IsADirectoryError:
return load_dir()
except pd.errors.ParserError: # pragma: no cover
Expand All @@ -224,7 +213,7 @@ def _load_csv( # noqa: C901
header = kw["header"]
del kw["header"]
if str(header) in ["True", "0"]:
pdf = _safe_load_csv(p.uri, **{"index_col": False, "header": 0, **kw})
pdf = _safe_load_csv(p, **{"index_col": False, "header": 0, **kw})
if columns is None:
return pdf, None
if isinstance(columns, list): # column names
Expand All @@ -236,40 +225,46 @@ def _load_csv( # noqa: C901
raise ValueError("columns must be set if without header")
if isinstance(columns, list): # column names
pdf = _safe_load_csv(
p.uri, **{"index_col": False, "header": None, "names": columns, **kw}
p,
**{"index_col": False, "header": None, "names": columns, **kw},
)
return pdf, None
schema = Schema(columns)
pdf = _safe_load_csv(
p.uri, **{"index_col": False, "header": None, "names": schema.names, **kw}
p,
**{"index_col": False, "header": None, "names": schema.names, **kw},
)
return pdf, schema
else:
raise NotImplementedError(f"{header} is not supported")


def _save_json(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None:
df.as_pandas().to_json(p.uri, **{"orient": "records", "lines": True, **kwargs})
with p.open("w") as f:
df.as_pandas().to_json(f, **{"orient": "records", "lines": True, **kwargs})


def _safe_load_json(path: str, **kwargs: Any) -> pd.DataFrame:
def _safe_load_json(p: FileParser, **kwargs: Any) -> pd.DataFrame:
kw = {"orient": "records", "lines": True, **kwargs}

def load_dir() -> pd.DataFrame:
dfs: List[pd.DataFrame] = []
for _p in p.join("*.json").find_all(): # type: ignore
with _p.open("r") as f:
dfs.append(pd.read_json(f, **kw))
return pd.concat(dfs)

try:
return pd.read_json(path, **kw)
with p.open("r") as f:
return pd.read_json(f, **kw)
except (IsADirectoryError, PermissionError):
fs = FileSystem()
return pd.concat(
[
pd.read_json(pfs.path.combine(path, pfs.path.basename(x.path)), **kw)
for x in fs.opendir(path).glob("*.json")
]
)
return load_dir()


def _load_json(
p: FileParser, columns: Any = None, **kwargs: Any
) -> Tuple[pd.DataFrame, Any]:
pdf = _safe_load_json(p.uri, **kwargs).reset_index(drop=True)
pdf = _safe_load_json(p, **kwargs).reset_index(drop=True)
if columns is None:
return pdf, None
if isinstance(columns, list): # column names
Expand Down
Loading
Loading